目录
刚接触Spark那会,还是Spark1.3版本,那时觉得Spark好厉害,但由于能力和工作的原因,没有沉淀下来仔细研究这门技术,只是肤浅地使用,并没有深入了解追溯其本源。接下来,我要仔细研究Spark内核,也算是给自己一个交代吧。
这里我就不写Spark环境搭建的过程了,这个真的是太多太多了。基于Spark2.1.0,从Spark Shell入手,慢慢走向Spark内核深处。
启动Spark Shell
输入spark-shell,启动成功会打印如下日志信息:
日志信息中有几点需要关注一下:
- 默认的日志配置文件是"org/apache/spark/log4j-defaults.properties",日志级别是"WARN";
- 我们可以通过“sc.setLogLevel(newLevel)”方法指定日志级别;
- Spark Context Web UI 地址是"http://192.168.31.115:4040","192.168.31.115"是本机IP地址,4040是端口号;
- Spark Shell部署模式是“master = local[*]”,当前应用ID是“local-153....”;
- Spark Sehll默认创建了SparkContext对象,名叫sc;创建了SparkSession对象,名叫spark。
日志级别的设置
在使用spark-shell编写word count程序之前,我们先设置输出日志的级别,修改为INFO级别,因为WARN级别输出的东西有点少。方法有二,其一:在spark的repl中使用sc.setLogLevel()方法直接设置日志等级。
其二:进入spark的conf目录中,找到“log4j.properties.template”,并改名为“log4j.properties”。
cp log4j.properties.template log4j.properties
编辑“log4j.properties”文件,将“log4j.logger.org.apache.spark.repl.Main=WARN”改成“log4j.logger.org.apache.spark.repl.Main=INFO”。
这里我采用的是第一种方法,下面是完整的编写过程。
解析word count程序
第0步:设置日志级别(“可选”)
第1步:读取文件
逐行读取文件,并创建了一个MapPartitionsRDD,MapPartitionsRDD继承了“org.apache.spark.rdd.RDD”这个抽象类
第2步:将每行的内容根据空格进行拆分成单词
第3步:设置每一个单词的计数为1
虽然此时依旧生成了MapPartitionsRDD,但是它的泛型改变成了“(String,Int)”
第4步:单词根据Key进行计数值累加聚合
把相同单词的计数1,进行累加,生成了一个ShuffledRDD。(这一步非常重要,是spark作业中最容易出现性能问题的一个过程--shuffle过程)
第5步:输出结果与分析
1-4步骤属于transformation操作,最后一个步骤处于action操作,只有在action操作,spark作业才真正的执行。
执行的日志如下:
sc提交的job的ID是0
一共产生的4个RDD,被换分成了ShuffleMapStage(ID=0,尝试号=0)和ResultStage(ID=1,尝试号=0)。
此时对应步骤1,2,3
提交了ShuffleMapStage,做了一些准备工作之后(比如创建广播变量等),开始执行任务。
“Executor: Running task 0.0 in stage 0.0 (TID 0)”,
task 0.0表示任务ID为0,尝试号为0
stage 0.0表示stageID为0也就是ShuffleMapStage,尝试号为0
执行任务时进行读取文件,这里是HadoopRDD进行读取的,也就是说第一个MapPartionsRDD的上游RDD是HadoopRDD。
上图的结尾表示,此时的task执行结束了。
第一个stage的任务运行结束,需要从任务列表中去除,然后查找、等待机新的stage任务,然后寻找到了ID为1的stage,也就是ResultStage。
此时对应步骤4,5
执行ResultStage,同ShuffledStage执行阶段一样先有个初期准备,然后执行任务
“Executor: Running task 0.0 in stage 1.0 (TID 1)”
task 0.0表示任务ID为0,尝试号为0
stage 0.0表示stageID为1也就是ResultStage,尝试号为0
结尾还是表示执行结束,汇报一下工作。
最后从任务列表中去除ResultStage的任务,并宣布ID为0的job也就是整个word count任务结束。