文章目录
零、本讲学习目标
- Spark RDD实现单词计数
- Spark RDD实现分组求TopN
- Spark RDD实现二次排序
- Spark RDD实现计算平均分
- Spark RDD实现倒排索引统计每日新增用户
- Spark RDD读写HBase
- Spark RDD数据倾斜问题解决
一、案例分析:Spark RDD实现单词计数
(一)案例概述
- 单词计数是学习分布式计算的入门程序,有很多种实现方式,例如MapReduce;使用Spark提供的RDD算子可以更加轻松地实现单词计数。
- 在IntelliJ IDEA中新建Maven管理的Spark项目,并在该项目中使用Scala语言编写Spark的WordCount程序,最后将项目打包提交到Spark集群(Standalone模式)中运行。
(二)实现步骤
1、新建Maven管理的Spark项目
-
在IDEA中选择File→new→Project…,在弹出的窗口中选择左侧的Maven项,然后在右侧勾选Create fromarchetype复选框并选择下方出现的
org.scala-tools.archetypes:scala-archetype-simple
项(表示使用scala-archetype-simple模板构建Maven项目)。
-
在弹出的窗口中填写GroupId与ArtifactId,Version保持默认设置即可,然后单击Next按钮
-
在弹出的窗口中从本地系统选择Maven安装的主目录的路径、Maven的配置文件settings.xml的路径以及Maven仓库的路径,然后单击Next按钮
-
在弹出的窗口中项目名称为
WordCount
,就是先前设置的ArtifactId的值,当然也可以修改,然后单击Finish按钮
2、添加Scala和Spark依赖
- 启动spark-shell,可以看到Spark2.4.4使用Scala2.11.12
- 在pom.xml文件里添加Scala2.11.12和Spark 2.4.4依赖,添加Maven构建插件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.hw.spark</groupId>
<artifactId>WordCount</artifactId>
<version>1.0-SNAPSHOT</version>
<inceptionYear>2008</inceptionYear>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.4</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!--设置Spark应用的入口类-->
<mainClass>net.hw.spark.WordCount</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3、创建WordCount对象
- 在
net.hw.spark
包里创建WordCount
对象
package net.hw.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{
SparkConf, SparkContext}
/**
* 功能:统计单词个数
* 作者:华卫
* 日期:2022年04月17日
*/
object WordCount {
def main(args: Array[String]): Unit = {
// 创建SparkConf对象,存储应用程序配置信息
val conf = new SparkConf()
.setAppName("Spark-WordCount") // 设置应用程序名称,可在Spark WebUI中显示
.setMaster("spark://master:7077") // 设置集群Master节点访问地址
// 创建SparkContext对象,该对象是提交Spark应用程序的入口
val sc = new SparkContext(conf)
// 读取指定路径(程序执行时传入的第一个参数)的文件内容,生成一个RDD
val rdd: RDD[String] = sc.textFile(args(0))
// 对rdd进行处理
rdd.flatMap(_.split(" ")) // 将RDD的每个元素按照空格进行拆分并将结果合并为一个新RDD
.map((_, 1)) //将RDD中的每个单词和数字1放到一个元组里,即(word,1)
.reduceByKey(_ + _) //对单词根据key进行聚合,对相同的key进行value的累加
.sortBy(_._2, false) // 按照单词数量降序排列
.saveAsTextFile(args(1)) //保存结果到指定的路径(取程序执行时传入的第二个参数)
//停止SparkContext,结束该任务
sc.stop();
}
}
4、对于程序代码进行解析
- SparkConf对象的setMaster()方法用于设置Spark应用程序提交的URL地址。若是Standalone集群模式,则指Master节点的访问地址;若是本地(单机)模式,则需要将地址改为local或local[N]或local[*],分别指使用1个、N个和多个CPU核心数。本地模式可以直接在IDE中运行程序,不需要Spark集群。
- 此处也可不设置。若将其省略,则使用
spark-submit
提交该程序到集群时必须使用--master
参数进行指定。 - SparkContext对象用于初始化Spark应用程序运行所需要的核心组件,是整个Spark应用程序中很重要的一个对象。启动Spark Shell后默认创建的名为sc的对象即为该对象。
- textFile()方法需要传入数据来源的路径。数据来源可以是外部的数据源(HDFS、S3等),也可以是本地文件系统(Windows或Linux系统),路径可以使用以下3种方式:
(1)文件路径:例如textFile("/input/data.txt ")
,此时将只读取指定的文件。
(2)目录路径:例如textFile("/input/words/")
,此时将读取指定目录words下的所有文件,不包括子目录。
(3)路径包含通配符:例如textFile("/input/words/*.txt")
,此时将读取words目录下的所有TXT文件。 - 该方法将读取的文件中的内容按行进行拆分并组成一个RDD集合。假设读取的文件为
words.txt
,则上述代码的具体数据转化流程如下图所示。
5、将Spark项目编译和打包
- 展开IDEA右侧的Maven Projects窗口,双击其中的package项,将编写好的
WordCount
项目进行编译和打包
- 生成两个jar包,一个没有带依赖,一个带了依赖,我们使用没有带依赖的jar包
6、上传Spark应用程序到master虚拟机
- 将
WordCount-1.0-SNAPSHOT.jar
上传到master虚拟机/home/howard
目录
7、启动HDFS服务
- 执行命令:
start-dfs.sh
8、启动Spark集群
- 执行命令:
$SPARK_HOME/sbin/start-all.sh
9、上传单词文件到HDFS指定目录
- 创建单词文件
words.txt
- 上传到HDFS的
/wordcount
目录
10、执行WordCount程序
(1)提交应用程序到集群中运行
- 执行命令:
spark-submit --master spark://master:7077 --class net.hw.spark.WordCount WordCount-1.0-SNAPSHOT.jar hdfs://master:9000/wordcount hdfs://master:9000/wordcount_output
(2)命令参数解析
- –master:Spark Master节点的访问路径。由于在WordCount程序中已经通过setMaster()方法指定了该路径,因此该参数可以省略。
- –class:SparkWordCount程序主类的访问全路径(包名.类名)。
- hdfs://master:9000/wordcount:单词数据的来源路径。该路径下的所有文件都将参与统计。
- hdfs://master:9000/wordcount_output:统计结果的输出路径。与MapReduce一样,该目录不应提前存在,Spark会自动创建。
(3)Spark WebUI界面查看应用程序信息
- 应用程序运行的过程中,可以访问Spark的WebUI http://master:8080/,查看正在运行的应用程序的状态信息(也可以查看已经完成的应用程序)
- 可以看到,有一个名称为Spark-WordCount的应用程序正在运行,该名称即为SparkWordCount程序中通过方法setAppName(“Spark-WordCount”)所设置的值。
- 在应用程序运行的过程中,也可以访问Spark的WebUI http://master:4040/,查看正在运行的Job(作业)的状态信息,包括作业ID、作业描述、作业已运行时长、作业已运行Stage数量、作业Stage总数、作业已运行Task任务数量等(当作业运行完毕后,该界面将不可访问)
- 单击矩形选框里的超链接,将跳转到作业详情页面,该页面显示了作业正在运行的Stage信息(Active Stages)和等待运行的Stage信息(Pending Stages),包括Stage ID、Stage描述、Stage提交时间、Stage已运行时长、Stage包括的Task任务数量、已运行的Task任务数量等
- 单击矩形选框里的超链接(DAG Visualization),可以查看本次作业的DAG可视图
- 可以看出,本次作业共划分了两个Stage。由于reduceByKey()操作会产生宽依赖,因此在执行reduceByKey()操作之前进行划分。
11、查看程序执行结果
- 执行命令:
hdfs dfs -ls /wordcount_output
,查看生成的结果文件
- 可以看到,与MapReduce一样,Spark会在结果目录中生成多个文件。_SUCCESS为执行状态文件,结果数据则存储在文件part-00000和part-00001中。
- 执行命令:
hdfs dfs -cat /wordcount_output/*
,查看结果文件里的数据
- 至此,使用Scala语言编写的Spark版WordCount程序运行成功。