文章目录
Spark WordCount运行原理
一.RDD的依赖关系
为什么需要划分Stage
- 数据本地化
- 移动计算,而不是移动数据
- 保证一个Stage内不会发生数据移动
Lineage:血统、遗传
- RDD最重要的特性之一,保存了RDD的依赖关系
- RDD实现了基于Lineage的容错机制
依赖关系
- 1)宽依赖
- 2)窄依赖
宽依赖对比窄依赖
- 宽依赖对应shuffle操作,需要在运行时将同一个父RDD的分区传入到不同的子RDD分区中,不同的分区可能位于不同的节点,就可能涉及多个节点间数据传输
- 当RDD分区丢失时,Spark会对数据进行重新计算,对于窄依赖只需重新计算一次子RDD的父RDD分区
- 结论:相比于宽依赖,窄依赖对优化更有利
DAG工作原理
- 根据RDD之间的依赖关系,形成一个DAG(有向无环图)
- DAGScheduler将DAG划分为多个Stage
- 划分依据:是否发生宽依赖(Shuffle)
- 划分规则:从后往前,遇到宽依赖切割为新的Stage
- 每个Stage由一组并行的Task组成
Spark Shuffle过程
- 在分区之间重新分配数据
- 父RDD中同一分区中的数据按照算子要求重新进入子RDD的不同分区中
- 中间结果写入磁盘
- 由子RDD拉取数据,而不是由父RDD推送
- 默认情况下,Shuffle不会改变分区数量
二.RDD优化
1.RDD持久化
RDD缓存机制:缓存数据至内存/磁盘,可大幅度提升Spark应用性能
- cache=persist(MEMORY)
- persist
缓存策略StorageLevel
- MEMORY_ONLY(默认)
- MEMORY_AND_DISK
- DISK_ONLY
val u1 = sc.textFile("file:///root/data/users.txt").cache
u1.collect//删除users.txt,再试试
u1.unpersist()
缓存应用场景
- 从文件加载数据之后,因为重新获取文件成本较高
- 经过较多的算子变换之后,重新计算成本较高
- 单个非常消耗资源的算子之后
- 使用注意事项
- 1)
cache()或persist()后不能再有其他算子,否则会重新开始计算缓存机制无效
- 2)
cache()或persist()遇到Action算子完成后才生效
检查点:类似于快照
sc.setCheckpointDir("hdfs:/checkpoint0918")
val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
rdd.checkpoint
rdd.collect //生成快照
rdd.isCheckpointed
rdd.getCheckpointFile
检查点与缓存的区别
- 检查点会删除RDD lineage,而缓存不会
- SparkContext被销毁后,检查点数据不会被删除
2.共享遍量
- 广播变量:允许开发者将一个只读变量(Driver端)缓存到每个节点(Executor)上,而不是每个任务传递一个副本
val broadcastVar=sc.broadcast(Array(1,2,3)) //定义广播变量
broadcastVar.value //访问方式
/*注意事项:
1、Driver端变量在每个Executor每个Task保存一个变量副本
2、Driver端广播变量在每个Executor只保存一个变量副本
*/
- 累加器:只允许added操作,常用于实现计数
//初始值是0
val accum = sc.accumulator(0,"My Accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)
accum.value
3.RDD分区设计
分区大小限制为2GB
分区太少
- 不利于并发
- 更容易受数据倾斜影响
- groupBy, reduceByKey, sortByKey等内存压力增大
分区过多
- Shuffle开销越大
- 创建任务开销越大
经验
- 每个分区大约128MB
- 如果分区小于但接近2000,则设置为大于2000
4.数据倾斜
指分区中的数据分配不均匀,数据集中在少数分区中
- 严重影响性能
- 通常发生在groupBy,join等之后
解决方案
- 使用新的Hash值(如对key加盐)重新分区
三.装载数据
1.装载CSV数据源
- 文件预览
- 使用SparkContext
val lines = sc.textFile("file:///home/kgc/data/users.csv")
val fields = lines.mapPartitionsWithIndex((idx, iter) => if (idx == 0) iter.drop(1) else iter).map(l => l.split(","))
val fields = lines.filter(l=>l.startsWith("user_id")==false).map(l=>l.split(",")) //移除首行,效果与上一行相同
- 使用SparkSession
val df = spark.read.format("csv").option("header", "true").load("file:///home/kgc/data/users.csv")
2.装载JSON数据源
- 使用SparkContext
val lines = sc.textFile("file:///home/kgc/data/users.json")
//scala内置的JSON库
import scala.util.parsing.json.JSON
val result=lines.map(l=>JSON.parseFull(l))
- 使用SparkSession
val df = spark.read.format("json").load("file:///home/kgc/data/users.json")
四.基于RDD的Spark应用程序开发
- 典型开发步骤
开发环境
- IDEA+MAVEN+Scala
- pom.xml
词频计数
- 需求:统计HDFS上的某个文件的词频
- 数据格式:制表符作为分隔符
实现思路
- 首先需要将文本文件中的每一行转化成单词数组
- 其次是对每一个出现的单词进行计数
- 最后把所有相同单词的计数相加
spark-submit
--class com.kgc.bigdata.spark.core.WordCount
--master spark://hadoop000:7077
/home/hadoop/lib/spark-1.0.SNAPSHOT.jar /data/wordcount