版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_34993631/article/details/87099315
TaskScheduler的大目标总共有两个。将DAGScheduler生成的TaskSet中的Task分配给Executor,然后通知Executor去执行启动Task。
以下是几个核心步骤:
- 每一个TaskScheduler会为每一个TaskSet分配一个TaskManager。这个TaskManager会负责这个TaskSet的监视与管理。然后这个TaskManager会进入调度池等待调度(也就是一个队列)。
- 主要的工作是由SparkDeploySchedulerBackend的父类CoarseGraindSchedulerBackend来完成。主要有两个核心的方法。
分配
- resourceOffers()将task分配到executor上去
这里面的一个重要步骤就是资源的分配。在分配的过程中首先要对根据调度策略对资源TaskScheduler进行调度。然后对于一个Executor来说,每从TaskSet中拿出一个task来就会去按照一定的本地话级别去判断能否在当前的Executor上面分配。如果不能则降低本地化级别的要求。
这几种本地化级别分别是:
- PROCESS_LOCAL,进程本地化,RDD的partition和task在一个Executor中效率是最高的。
- NODE_LOCAL, RDD的partition和task不在同一个Executor中但是在一个Worker上。
- NO_PREF,没有所谓的本地化级别。
- RANK_LOCAL,机架本地化,至少RDD的partition和task在同一个机架上。
- ANY,任意的本地化级别
通知启动
- LaunchTask()发送LauntchTask消息到对应的executor启动并执行task。
具体的,首先将每个executor要执行的task进行序列化。然后找到LauntchTask,在Executor上面启动Task。
Executor层面的响应
在Executor反向注册到Driver的时候会发送一个RegisteredExecutor消息。这时候CoarseGrainedExecutorBackend会创建一个Executor对象,作为执行对象。对于每一个Task来说Executor对象会首先将它反序列化然后使用TaskRunner去封装这个Task。然后将这个TaskRunner放到一个ConcurrentHashMap中(也就是放入内存中),然后使用线程池中取出一个线程去执行对应的Task。