先来回顾一下Spark的程序运行架构:
对于任何一个Spark程序,有且仅有一个
SparkContext
,其实一个SparkContext
就对应了一个Driver
;一个
Driver
就是一个进城,运行在一个节点上,程序的main函数就运行在Driver
上;main函数通过分析程序,将程序转化成一些列
Task
,然后分发到各个节点的Executor
上去执行;一个节点可以运行一个或多个Executor
;然后一个Executor
可以同时跑若干个Task
;每个节点有多少个Executor,每个Executor上有多少个Task,都是可以由用户指定的计算资源)
(分布式计算:主要就是需要分布式地调度计算资源和计算任务)
Job执行过程:作业、阶段与任务
Job逻辑执行图
Job的实际执行流程比用户头脑中的要复杂,需要先建立逻辑执行图(或者叫数据依赖图),然后划分逻辑执行图生成DAG型的物理执行图,然后生成具体Task
执行。
如何产生RDD,产生哪些RDD
一些典型的transformation()及其创建的RDD:
iterator(split) 的意思是 foreach record in the partition
RDD的依赖关系
NarrowDependency
完全依赖ShuffleDependency
部分依赖
前三个是完全依赖,RDD x 中的 partition 与 parent RDD 中的 partition/partitions 完全相关。最后一个是部分依赖,RDD x 中的 partition 只与 parent RDD 中的 partition 一部分数据相关,另一部分数据与 RDD x 中的其他 partition 相关。
在 Spark 中,完全依赖被称为 NarrowDependency,部分依赖被称为 ShuffleDependency。其实 ShuffleDependency 跟 MapReduce 中 shuffle 的数据依赖相同(mapper 将其 output 进行 partition,然后每个 reducer 会将所有 mapper 输出中属 于自己的 partition 通过 HTTP fetch 得到)。
Job物理执行图
主要的问题是:给定job的逻辑执行图,如何生成物理执行图(也就是 stages 和 tasks)?
逻辑执行计划到物理执行计划的转化需要执行:
- (1) 划分
Stage
- (2) 生成
Task
Spark Task的类型只有两种:ShuffleMapTask
和ResultTask
问:每个Stage的Task数目?
- First Stage: 由hdfs block或hbase regioin 数目决定
- Other Stages: 由用户设置,默认与第一个阶段相等
Stage划分算法
从后往前推算,遇到ShuffleDependency就断开,遇到NarrowDependency就将其加入该stage。 每个stage里面task的数目由该stage最后一个 RDD 中的partition个数决定
Spark 资源调度和任务调度
从TaskScheduler开始
TaskScheduler的主要作用就是获得需要处理的任务集合,并将其发送到集群进行处理。并且还有汇报任务运行状态的作用。 所以其是在Master端。具体有以下4个作用:
接收来自Executor的心跳信息,使Master知道该Executer的BlockManager还“活着”
对于失败的任务进行重试
对于stragglers(拖后腿的任务)放到其他的节点执行
向集群提交任务集,交给集群运行