1、 术语定义
1、Application:Spark应用程序
指的是用户编写的Spark应用程序,包含了Driver功能代码和分布在集群中多个节点上运行的Executor代码。
Spark应用程序,由一个或多个作业JOB组成,如下图所示:
2、Driver:驱动程序
Spark中的Driver即运行上述Application的Main()函数并且创建SparkContext,其中创建SparkContext的目的是为了准备Spark应用程序的运行环境。在Spark中由SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。通常SparkContext代表Driver,如下图所示:
3、Cluster Manager:资源管理器
指的是在集群上获取资源的外部服务。目前有三种类型,目前有三种类型:
- Standalon : spark原生的资源管理,由Master负责资源的分配
- Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架
- Hadoop Yarn: 主要是指Yarn中的ResourceManager
如下图所示:
4、Executor:执行器
某个Application运行在worker节点上的一个进程, 该进程负责运行某些Task, 并且负责将数据存到内存或磁盘上,每个Application都有各自独立的一批Executor, 在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutor Backend。一个CoarseGrainedExecutor Backend有且仅有一个Executor对象, 负责将Task包装成taskRunner,并从线程池中抽取一个空闲线程运行Task, 这个每一个oarseGrainedExecutor Backend能并行运行Task的数量取决与分配给它的cpu个数 如下图所示:
5、Worker:计算节点
集群中任何可以运行Application代码的节点,类似于Yarn中的NodeManager节点。在Standalone模式中指的就是通过Slave文件配置的Worker节点,在Spark on Yarn模式中指的就是NodeManager节点,在Spark on Messos模式中指的就是Messos Slave节点,如下图所示:
6、RDD:弹性分布式数据集
Spark的基本计算单元,可以通过一系列算子进行操作(主要有Transformation和Action操作),如下图所示:
7、窄依赖
父RDD每一个分区最多被一个子RDD的分区所用;表现为一个父RDD的分区对应于一个子RDD的分区,或两个父RDD的分区对应于一个子RDD 的分区。如图所示:
常见的窄依赖有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned :如果JoinAPI之前被调用的RDD API是宽依赖(存在shuffle), 而且两个join的RDD的分区数量一致,join结果的rdd分区数量也一样,这个时候join api是窄依赖)。
8、宽依赖
父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区。如图所示:
常见的宽依赖有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned :除此之外的,rdd 的join api是宽依赖)。
9、DAG:有向无环图
Directed Acycle graph,反应RDD之间的依赖关系,如图所示:
10、DAGScheduler:有向无环图调度器
基于DAG划分Stage 并以TaskSet的形势提交Stage给TaskScheduler;负责将作业拆分成不同阶段的具有依赖关系的多批任务;最重要的任务之一就是:计算作业和任务的依赖关系,制定调度逻辑。在SparkContext初始化的过程中被实例化,一个SparkContext对应创建一个DAGScheduler。
11、TaskScheduler:任务调度器
将Taskset提交给worker(集群)运行并回报结果;负责每个具体任务的实际物理调度。如图所示:
12、Job:作业
由一个或多个调度阶段所组成的一次计算作业;包含多个Task组成的并行计算,往往由Spark Action催生,一个JOB包含多个RDD及作用于相应RDD上的各种Operation。如图所示:
13、Stage:调度阶段
一个任务集对应的调度阶段;每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;Stage分成两种类型ShuffleMapStage、ResultStage。如图所示:
14、TaskSet:任务集
由一组关联的,但相互之间没有Shuffle依赖关系的任务所组成的任务集。如图所示:
提示:
- 一个Stage创建一个TaskSet;
- 为Stage的每个Rdd分区创建一个Task,多个Task封装成TaskSet
15、Task:任务
被送到某个Executor上的工作任务;单个分区数据集上的最小处理流程单元。如图所示:
2、Spark内核架构深度剖析
执行的流程如下:
- 首先是提交打包的应用程序,使用Spark submit或者spark shell工具执行。
- 提交应用程序后后台会在后台启动Driver进程(注意:这里的Driver是在Client上启动,如果使用cluster模式提交任务,Driver进程会在Worker节点启动)。
- 开始构建Spark应用上下文。一般的一个Spark应用程序都会先创建一个SparkConf,然后来创建SparkContext。如下代码所示:
val conf=new SparkConf() val sc=new SparkContext(conf)
。在创建SparkContext对象时有两个重要的对象,DAGScheduler和TaskScheduler。 - 构建好TaskScheduler后,它对应着一个后台进程,接着它会去连接Master集群,向Master集群注册Application。
- Master节点接收到应用程序之后,会向该Application分配资源,启动一个或者多个Worker节点。
- 每一个Worker节点会为该应用启动一个Executor进程来执行该应用程序。
- 向Master节点注册应用之后,Master为应用分配了节点资源,在Worker启动Executor完成之后,此时,Executor会向TaskScheduler反向注册,以让它知道Master为应用程序分配了哪几台Worker节点和Executor进程来执行任务。到此时为止,整个SparkContext创建完成。
- 创建好SparkContext之后,继续执行我们的应用程序,每执行一个action操作就创建为一个job,将job交给DAGScheduler执行,然后DAGScheduler会将多个job划分为stage(stage划分算法)。然后每一个stage创建一个TaskSet。
- 实际上TaskScheduler有自己的后台进程会处理创建好的TaskSet。
- 然后就会将TaskSet中的每一个task提交到Executor上去执行。(task分配算法)
- Executor会创建一个线程池,当executor接收到一个任务时就从线程池中拿出来一个线程将Task封装为一个TaskRunner。
- 在TaskRunner中会将我们程序的拷贝,反序列化等操作,然后执行每一个Task。对于这个Task一般有两种,ShufflerMapTask和ResultTask,只有最后一个stage的task是ResultTask,其它的都是ShufflerMapTask。
- 最后会执行完所有的应用程序,将stage的每一个task分批次提交到executor中去执行,每一个Task针对一个RDD的partition,执行我们定义的算子和函数,直到全部执行完成。