大数据Spark系列——Spark案例实践


案例实践

Spark Shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。

一、WordCount程序

1、创建一个Maven项目WordCount并导入依赖

如果是创建父子项目的话,如果只是一个项目则一起写在一个xml文件中即可

父module的xml文件编写
<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
</dependencies>

子module的xml文件编写
<build>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>com.atguigu.WordCount</mainClass>
                    </manifest>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

2、编写代码

def main(args: Array[String]): Unit = {
  //创建配置文件
  val conf = new SparkConf().setAppName("wc")

  //创建SparkContext
  val sc = new SparkContext(conf);

  //读取数据
  val lines = sc.textFile(args(0))

  //flatmap压平(一行行读取数据)
  val words = lines.flatMap(_.split(" "))

  //map(word,1)
  val k2v = words.map((_,1))

  //reduceByKey(word,1)
  val result = k2v.reduceByKey(_+_)

  //保存数据
  result.saveAsTextFile(args(1))

  //关闭连接
  sc.stop();
}

3、打包成jar包到集群中运行(一般公司运行调试)

将jar包上传到集群的spark下,最好是把路径修改为HDFS的,本地路径可能有问题

bin/spark-submit \
--class com.WordCount \
--master spark://hadoop102:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
./wordcount.jar \
hdfs://hadoop102:9000/red.txt \
hdfs://hadoop102:9000/out1

4、本地测试(一般用于Debug)

本地Spark程序调试需要使用local提交模式,即将本机当做运行环境,Master和Worker都为本机。运行时直接加断点调试即可。





执行结果:





5、远程调试

将jar包放在本地,通过集群远程调试代码,输出结果保存到hdfs上

def main(args: Array[String]): Unit = {
  //创建配置文件
  val conf = new SparkConf().setAppName("wc").setMaster("spark://hadoop102:7077")
    .setJars(List("E:\\IdeaProjects\\spark0922\\sparkCore\\target\\sparkCore-1.0-SNAPSHOT.jar"))

  //创建SparkContext
  val sc = new SparkContext(conf);

  //读取数据
  val lines = sc.textFile("hdfs://hadoop102:9000/red.txt")

  //flatMap压平
  val words = lines.flatMap(_.split(" "))

  //map(word,1)
  val k2v = words.map((_,1))

  //reduceByKey(word,1)
  val result = k2v.reduceByKey(_+_)

  //保存数据
  result.saveAsTextFile("hdfs://hadoop102:9000/out3")

  //关闭连接
  sc.stop()





坚持原创技术分享,您的支持将鼓励我继续创作!