Spark架构及运行机制(5) - RDD有向无环图拆分

    一个Spark应用程序会被分解成多个作业(Job)提交给Spark集群进行处理。然而作业并不是应用程序被拆分的最小计算单元,Spark在收到作业后将作业进行2次切分和规划。第一步是将作业按照RDD转换操作切分为最小处理单元。即任务(Task);第二步是对任务进行规划,生成包含多个任务的阶段(Stage)。这两步都是由DAGScheduler实例进行,输入“RDD有向无环图”,输出“一系列任务”,这个步骤成为“RDD有向无环图拆分”。
    每一个数据单元RDD可以被切分成更小的数据块在不同的计算节点中处理,这样的数据块就是分区(patition)。在RDD DAG中,子RDD与父RDD存在依赖关系,由于父子RDD间进行转换的分区对应关系不同,RDD间的依赖分为两个类型:窄依赖和宽依赖。
    下图矩形表示一个RDD,椭圆块表示一个分区:

  1. 窄依赖:父RDD的一个分区,在RDD转换过程中,最多只会被一个子RDD的分区使用,比如(a)中的map、union
  2. 宽依赖:父RDD的一个分区,在RDD转换过程中,被多个子RDD的分区使用,比如(b)中的groupByKey


窄依赖与宽依赖的区别
窄依赖:分区间是一对一的转换,可以在一个计算节点内进行。如果有多个这样的窄依赖,可以在一个节点内流水线般操作。如果分区间有多对一(多个父RDD分区对应一个子RDD分区)的窄依赖转换,可以在多个节点间并行执行,相互间没有影响。窄依赖的子RDD计算过程中某个分区出错了,只需要获得父RDD对应的分区即可恢复子RDD的这个分区;

宽依赖:宽依赖要依赖父RDD的所有分区数据,才能计算得到子RDD,这就必然会带来网络开销、中间结果存储等一系列开销较大的问题。对于计算出错的回复,代价远大于窄依赖;

    由于窄依赖和宽依赖的差异,Spark将应用程序拆分为任务后,并不是直接将一个一个的转换操作对应的任务直接进行分配,而是加入了一个对任务进行规划的过程,将适合放在一起的任务合并到一个阶段中。
    这一过程由DAGScheduler实例完成, 原则是:
    如果子RDD到父RDD是窄依赖,就将多个算子操作一起处理,最后再进行一次统一的同步操作,既减少了大量的全局同步,又无需存储很多中间结果。
    如果是宽依赖,则尽量切分到不同的阶段中,以避免过大的网络传输和计算开销。

    为了达到这一目的,应用程序向Spark提交Job作业后,DAGScheduler会遍历RDD DAG。在遍历过程中,如果需要连续窄依赖RDD转换,则尽量多地放入同一个阶段中。如果遇到宽依赖,则生成一个新的阶段。
    下图,A转换为B是一个宽依赖,生成了一个stage1,C转换为D,D、E转换为F都是窄依赖,同被放入stage2,B、F转换为G是宽依赖,又生成一个stage3.


    通过这样的一个过程,DAGScheduler实现了将依赖链进行分割的操作。整个依赖链被划分为多个Stage阶段,每个Stage内都是一组相互关联,但是彼此之间没有shuffle依赖关系的任务集合,成为“任务集”(TaskSet)。每个TaskSet包含多个任务。
    DAGScheduler会根据分区的个数,来具体确定会发生多少个任务。一个分区对应一个任务,同一个阶段的任务的执行是并行执行的。作业(Job)、阶段(Stage)、任务集(TaskSet)和任务(Task)之间的关系如下图所示:




DAGScheduler还维护了3个集合用以存储阶段的执行状态:

1.waitingStages集合:同RDD父子依赖关系一样,在一串相互依赖的Stage中,后续的Stage被称为子Stage,其依赖的Stage被称为父Stage。如果一个Stage的父Stage尚未完成,则waitingStages集合负责记录该子Stage;

2.runningStage集合:为了方式Stage的重复提交,runningStage集合中保存正在执行的Stage;

3.failedStage集合:保存了执行失败的Stage;


    DAGScheduler会根据Stage的运行状态合理调度所有Stage提交到集群。DAGScheduler为每一个Stage分配一个StageID,用来表示Stage的优先级,StageID越小优先级越高。
    DAGScheduler是反向遍历RDD依赖链的,因此,最后一个RDD生成的FinalStage的StageID最小,应该首先被提交。
    以WordCount为例,Spark首先从RDD依赖链的最后一个RDD进行判断,当遍历到reduceByKey产生一个ShuffleRDD时,Spark对整个依赖链进行了一次分割,由于WordCount中只有一个宽依赖,因此最终DAGScheduler将WordCount分割成2个Stage。


猜你喜欢

转载自margaret0071.iteye.com/blog/2387160