Spark-Core源码学习记录
该系列作为Spark源码回顾学习的记录,旨在捋清Spark分发程序运行的机制和流程,对部分关键源码进行追踪,争取做到知其所以然,对枝节部分源码仅进行文字说明,不深入下钻,避免混淆主干内容。
从本篇文章开始,进入到Spark核心的部分,我们将依次展开。
SparkContext 基石
SparkContext在整个Spark
运行期间都起着重要的作用,并在其中完成了许多重要组件的初始化等内容,因此可以说是Saprk
的基石,下面就从上节中的getOrCreate方法开始,进入SaprkContext一探究竟:
def getOrCreate(): SparkSession = synchronized {
val sparkContext = userSuppliedContext.getOrElse {
// 实例化SparkConf,并添加属性值,options是前面builder里面设置的参数
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }
// set a random app name if not given.
if (!sparkConf.contains("spark.app.name")) {
sparkConf.setAppName(java.util.UUID.randomUUID().toString)
}
// 正式实例化SaprkContext,重要组件,之后会详解。注意:实例化SparkContext后就不可以再改变SparkConf的内容了
SparkContext.getOrCreate(sparkConf)
}
}
首先是初始化SparkConf
,源码比较简单,就是一些赋值存储操作,值得注意的是,无参构造默认会调用SaprkConf(true)
构造,表示需要加载默认的配置文件,// Load any spark.* system properties
,因此在测试阶段的用法经常是SparkConf(false)
构造。
完成SaprkConf初始化后,正式进入SaprkConf的初始化中:
def getOrCreate(config: SparkConf): SparkContext = {
// Synchronize to ensure that multiple create requests don't trigger an exception
// from assertNoOtherContextIsRunning within setActiveContext
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {
// 如果不存在,就新建一个,并在 ActiveContext中记录
setActiveContext(new SparkContext(config))
} else {
// 如果存在,那么新传入的 SparkConf不会起作用,给出警告提示
if (config.getAll.nonEmpty) {
logWarning("Using an existing SparkContext; some configuration may not take effect.")
}
}
activeContext.get()
}
}
下面进到new SparkContext(config)
中,惯例先看注释
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
* SparkContext是整个Spark功能的重要入口,是与集群的连接点,通过SparkContext可以创建RDD、Accumulators和Broadcast。
* @note Only one `SparkContext` should be active per JVM. You must `stop()` the
* active `SparkContext` before creating a new one.
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
下面开始代码段比较长,重要部分已给出注释,始终记住下面代码都是发生在driver
端的,不论部署模式是client还是cluster:
class SparkContext(config: SparkConf) extends Logging {
private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false)
// log out Spark Version in Spark driver log
logInfo(s"Running Spark version $SPARK_VERSION")
/* ------------------------------------------------------------------------------------- *
| Private variables. These variables keep the internal state of the context, and are |
| not accessible by the outside world. They're mutable since we want to initialize all |
| of them to some neutral value ahead of time, so that calling "stop()" while the |
| constructor is still running is safe. |
* ------------------------------------------------------------------------------------- */
private var _conf: SparkConf = _
private var _env: SparkEnv = _
private var _schedulerBackend: SchedulerBackend = _
private var _taskScheduler: TaskScheduler = _
@volatile private var _dagScheduler: DAGScheduler = _
...//这里很多声明的private字段,此处省略掉
/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
| context. |
* ------------------------------------------------------------------------------------- */
...// 这里是对上面个别字段的get和set方法,采用scala的方式,省略掉
// _listenerBus和_statusStore用于监听应用程序状态
_listenerBus = new LiveListenerBus(_conf)
// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
val appStatusSource = AppStatusSource.createSource(conf)
_statusStore = AppStatusStore.createLiveStore(conf, appStatusSource)
listenerBus.addToStatusQueue(_statusStore.listener.get)
// Create the Spark execution environment (cache, map output tracker, etc)
// 这里 createSparkEnv内部调用SparkEnv.createDriverEnv,最终调用 RpcEnv.create(...),创建sparkDriver的Env
//具体过程可以参考前面的 RpcEnv篇中介绍
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
// 这里可以看到,默认的executorMemory是1G
_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
// 实例化driver端的心跳机 Endpoint,用于接收 Executor心跳消息
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
//从这里开始是最重要的部分,出事了三个调度器,每一个都需要我们单独的去追踪流程
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
// 发送 TaskSchedulerIsSet消息,
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
/* heartbeatReceiver自身接到消息后,其实就是绑定了SparkContext的 taskScheduler调度器,然后答复true
case TaskSchedulerIsSet =>
scheduler = sc.taskScheduler
context.reply(true)
*/
// create and start the heartbeater for collecting memory metrics
_heartbeater = new Heartbeater(...)
_heartbeater.start()
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor
_taskScheduler.start()
//下面开始就一些应用程序信息的初始化,比如获得 SparkAppId之类的
//对于yarn模式下,还支持动态资源分配模式,该模式下会构造一个ExecutorAllocationManager对象,主要是根据集群资源动态触发增加或者删除资源策略
//最后是一些cleaner方法和一些运行状态的监控内容,具体源码就不再给出,避免影响阅读主干内容
}
现在可以专注于_schedulerBackend 、_taskScheduler 、_dagScheduler
三者的实例化过程。其中前两个是通过SparkContext.createTaskScheduler(this, master, deployMode)
完成实例化,_dagScheduler
是通过new DAGScheduler(this)
完成。
SchedulerBackend与 TaskScheduler
首先进入createTaskScheduler中查看
/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
/* 引入几个正则,
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r local[N] and local[*]
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r local[N, maxRetries]
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r [N, cores, memory]
val SPARK_REGEX = """spark://(.*)""".r
*/
import SparkMasterRegex._
// 根据传入master 匹配相应的模式
master match {
case "local" =>
case LOCAL_N_REGEX(threads) => ...
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => ...
// 这里我们以 Spark deploy clusters 模式为例
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => ...
case masterUrl => ...
}
}
接下来进入TaskSchedulerImpl的实例化代码:
private[spark] class TaskSchedulerImpl(...)extends TaskScheduler with Logging {
// 用于将来记录 taskset 与 stageid之间的对应关系
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]
private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
// 记录taskid 和 executorid
val taskIdToExecutorId = new HashMap[Long, String]
// Task运行记录
@volatile private var hasReceivedTask = false
@volatile private var hasLaunchedTask = false
val nextTaskId = new AtomicLong(0)
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
// 记录host与executor之间的关系,包括机架位置信息
protected val hostToExecutors = new HashMap[String, HashSet[String]]
protected val hostsByRack = new HashMap[String, HashSet[String]]
protected val executorIdToHost = new HashMap[String, String]
val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
// default scheduler is FIFO
private val schedulingModeConf = conf.get(SCHEDULER_MODE)
val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
... //还有很多方法,此处不再展开
}
之后进入StandaloneSchedulerBackend实例化过程:
/* A [[SchedulerBackend]] implementation for Spark's standalone cluster manager.*/
private[spark] class StandaloneSchedulerBackend(...)
// 其实关键点在于父类的初始化,我们放在最后会回头过来
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with StandaloneAppClientListener
with Logging {
private var client: StandaloneAppClient = null
@volatile private var appId: String = _
private val maxCores = conf.get(config.CORES_MAX)
...
}
除了一些方法外,成员变量并不多,看来重点就在于最后的scheduler.initialize(backend)
了,下面去initialize方法一探究竟:
def initialize(backend: SchedulerBackend) {
// 将 backend记录在 TaskSchedulerImpl自身的成员变量
this.backend = backend
// 默认实例化 FIFOSchedulableBuilder调度策略
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
// FIFOSchedulableBuilder实例的buildPools什么也没干...
schedulableBuilder.buildPools()
}
尴尬的是,initialize
除了绑定了一下backend,设定了调度器策略,其他什么也没干,最终createTaskScheduler
就返回两个干巴巴的实例(backend, scheduler)
。其实,真正的调用其实发生在后面,在这之前先按照流程进入_dagScheduler = new DAGScheduler(this)
的实例化过程:
// 可以看到,重载把刚才实例化的 taskScheduler也传了进去
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
// DAGScheduler作为Spark的最核心组件之一,主要负责Job作业期间的Stage最优化的划分部署,并把所有的Task提交给TaskScheduler
private[spark] class DAGScheduler(...) extends Logging {
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
// 这里实例化了一个 DAGSchedulerEventProcessLoop继承至 EventLoop
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
// 绑定 DAGScheduler到 taskScheduler上
taskScheduler.setDAGScheduler(this)
...//省略很多方法的源代码
// 这里是DAGScheduler实例化的最后一行代码,调用start方法启动 DAGSchedulerEventProcessLoop,内部调用父类的start方法,进而
// 调用父类的 eventThread.start,源码放在下面,可以看到从 eventQueue中取出 event,然后调用子类的 onReceive(event)进行处理
eventProcessLoop.start()
}
private[spark] val eventThread = new Thread(name) {
// 作为守护线程运行
setDaemon(true)
override def run(): Unit = {
try {
// 循环处理
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event)
}}}}
}
然后我们看一下onReceive方法在做什么工作:
/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = {
doOnReceive(event)
}
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
...//还有很多模式以供匹配,省略掉
}
我们发现这个处理逻辑与RpcEnv
中的Dispatcher内部逻辑类似,通过一个循环运行的线程,来处理队列中的传递的消息,前面的文章里,队列里是EndpointData,现在是DAGSchedulerEvent,区别仅此而已。后续过程需要注意eventQueue
队列的状态变化。
完成了DAGScheduler
的实例化,好像还是没有发生什么大事,就剩最后一句代码:
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor
_taskScheduler.start()
接下来就是查看TaskScheduler实现类TaskSchedulerIpml的start方法:
override def start() {
// 直接调用 SchedulerBackend的start方法
backend.start()
// 定时运行 checkSpeculatableTasks,不是重点
if (!isLocal && conf.get(SPECULATION_ENABLED)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
我们此处选择的是Spark deploy clusters
模式,因此SchedulerBackend的实现类为StandaloneSchedulerBackend。进入start方法查看:
override def start() {
// 调用父类 CoarseGrainedSchedulerBackend的start方法,放在下面
super.start()
// The endpoint for executors to talk to us
// 获取自身的 RpcEndpointAddress,driverUrl将在后面作为参数传递给executor启动器,以此来让executor能够与driver进行通信
val driverUrl = RpcEndpointAddress(
sc.conf.get(config.DRIVER_HOST_ADDRESS),
sc.conf.get(config.DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val args = Seq(
"--driver-url", driverUrl, //这里将刚才的 driverUrl放入参数元组
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
// 下面都是executor启动的一些参数设置
val extraJavaOpts = sc.conf.get(config.EXECUTOR_JAVA_OPTIONS)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.get(config.EXECUTOR_CLASS_PATH)
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.get(config.EXECUTOR_LIBRARY_PATH)
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
//Command 是一个模板类
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val webUrl = sc.ui.map(_.webUrl).getOrElse("")
// 设置 coresPerExecutor,默认值为1,这里关系到前面文章中提到的每个 Worker上可以启动多少Executor
val coresPerExecutor = conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt)
// If we're using dynamic allocation, set our initial executor limit to 0 for now.动态分配,前面提及只有yarn模式下可用
// ExecutorAllocationManager will send the real initial limit to the Master later.
val initialExecutorLimit =
if (Utils.isDynamicAllocationEnabled(conf)) {
Some(0)
} else {
None
}
// 模板类
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
// 下面是注册 app的过程,期间我们会看到前文中提到的一些东西
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
// 更新状态等待注册
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
现在我们继续跟踪StandaloneAppClient
实例化以及start的过程,父类的跟踪也即super.start()
放在最后进行。
private[spark] class StandaloneAppClient(...) extends Logging {
// 声明一些成员变量等待后续赋值
private val masterRpcAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
private val endpoint = new AtomicReference[RpcEndpointRef]
private val appId = new AtomicReference[String]
private val registered = new AtomicBoolean(false)
// start方法
def start() {
// Just launch an rpcEndpoint; it will call back into the listener.
// 注册一个名为 AppClient的 RpcEndpoint,并将引用与自身绑定
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}
}
rpcEnv.setupEndpoint
方法在前文中已经分析过,总之会调用传入RpcEndpoint的onStart()
方法,因此我们查看ClientEndpoint的初始化过程以及它的onStart方法内容:
// 是 StandaloneAppClient的内部类,因此有 private修饰
private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint
with Logging {
private var master: Option[RpcEndpointRef] = None
private val registerMasterFutures = new AtomicReference[Array[JFuture[_]]]
// 初始化一个线程池,大小等同与 MasterUrl个数,用于同时注册
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
"appclient-register-master-threadpool",
masterRpcAddresses.length // Make sure we can register with all masters at the same time
)
// 用于重试的线程
private val registrationRetryThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
// onStart在这里
override def onStart(): Unit = {
try {
// 关键方法,具体内容放在下面
registerWithMaster(1)
} catch {...}
}
private def registerWithMaster(nthRetry: Int) {
// 调用 tryRegisterAllMasters()
registerMasterFutures.set(tryRegisterAllMasters())
// 重试线程被启用,具体内容放在下面
registrationRetryTimer.set(registrationRetryThread.schedule(...)
}
先来看tryRegisterAllMasters方法
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
// 注意此处的 yield关键字,将保留表达式运行结果,作为返回结果 Array[JFuture[_]]
for (masterAddress <- masterRpcAddresses) yield {
// 遍历每一个 masterAddress,在线程池中运行一下代码
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
// 已经注册成功的标志,因为是多线程同步注册,所以可能其他线程已经注册成功了
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
// setupEndpointRef方法是一个阻塞方法,内部会拿到对应的引用,逻辑不再展开
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// 向 Master发送 RegisterApplication消息,注意消息体内把自身的引用也传了过去
masterRef.send(RegisterApplication(appDescription, self))
} catch {...}
})
}
}
那我们来到Master
端,看看接收到RegisterApplication消息会作何处理:
// 消息的第二个参数就是driver的引用
case RegisterApplication(description, driver) =>
logInfo("Registering app " + description.name)
// createApplication根据传入参数,实例化一个 ApplicationInfo对象,对象内容放在下面
val app = createApplication(description, driver)
// 绑定一个信息,具体内容放在下面
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
// 给传入的 driver返回一个 RegisteredApplication
driver.send(RegisteredApplication(app.id, self))
// 调用 schedule进行资源调度
schedule()
private[spark] class ApplicationInfo(...) extends Serializable {
@transient var state: ApplicationState.Value = _
@transient var executors: mutable.HashMap[Int, ExecutorDesc] = _
@transient var coresGranted: Int = _
init()
private def init() {
state = ApplicationState.WAITING
executors = new mutable.HashMap[Int, ExecutorDesc]
// 这个参数应该会很熟,之前关于executor启动的部分,用于判断已分配的core核数和剩余可用core之间的计算
coresGranted = 0
}
}
private def registerApplication(app: ApplicationInfo): Unit = {
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
// 这个数组还记得吧,在 schedule()方法的流程中,就是从这里面取得 app,总算是与前文对应上了
waitingApps += app
}
回到driver
端,接收到RegisteredApplication消息:
case RegisteredApplication(appId_, masterRef) =>
appId.set(appId_)
// 注册成功,其他线程可以直接返回
registered.set(true)
//记录注册成功的master
master = Some(masterRef)
listener.connected(appId.get)
然后返回registerWithMaster
private def registerWithMaster(nthRetry: Int) {
// 调用 tryRegisterAllMasters(),上面的内容
registerMasterFutures.set(tryRegisterAllMasters())
// 重试线程被启用,具体内容放在下面
registrationRetryTimer.set(registrationRetryThread.schedule(...)
}
registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
override def run(): Unit = {
if (registered.get) { // 如果已经注册成功
// 遍历返回的结果,结束所有的线程 JFuture.cancel方法
registerMasterFutures.get.foreach(_.cancel(true))
// 关闭注册用的线程池
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
// 这个分支代表重试次数已经用完了,还是没注册成功
markDead("All masters are unresponsive! Giving up.")
} else {
// 重试次数没用完,那就先结束所有线程,然后再次重试
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
现在我们回头看一下spuer.start()
方法的调用,还记的上面我们说关键点其实在于父类的初始化吗?现在进入父类CoarseGrainedSchedulerBackend
中:
private[spark] class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging {
// Total number of executors that are currently registered
protected val totalRegisteredExecutors = new AtomicInteger(0)
// The token manager used to create security tokens.
// start方法主要就是初始化 delegationTokenManager
private var delegationTokenManager: Option[HadoopDelegationTokenManager] = None
// 初始化一个调度线程供下面使用
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
// 重要的内部类 ,Driver在这里
class DriverEndpoint extends ThreadSafeRpcEndpoint with Logging {
// 记录 executor
protected val addressToExecutorId = new HashMap[RpcAddress, String]
// 等待注册时候自动被调用
override def onStart() {
// 调度周期
val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)
// 上面的线程用于周期的向自身发送 RevieOffers消息,至于如何处理,下面单独展开
reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
...
}
// 实例化 DriverEndpoint并注册
val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint())
override def start() {
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenManager = createTokenManager()
delegationTokenManager.foreach {...}
}
}
protected def createDriverEndpoint(): DriverEndpoint = new DriverEndpoint()
...
}
Driver自身接收到 ReviveOffers消息后,调用makeOffers方法:
case ReviveOffers =>
makeOffers()
// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
// 模板类,描述维护executor的状态信息
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
// Mark each slave as alive and remember its hostname
// 维护 hostToExecutors executorIdToHost hostsByRack executorIdToRunningTaskIds,根据 workOffers列表返回 TaskDescription列表,具体内容放在后续篇章 task分发中再介绍
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
// 根据 TaskDescription启动task,具体内容放在后续篇章 task分发中再介绍
launchTasks(taskDescs)
}
}
Driver
自身相当于定期循环的调用makeOffers
方法,来维护绑定关系,更重要的是从schedulableQueue
中取出TaskDescription
来启动task,目前schedulableQueue还是空的,只需知道有一个线程再循环等待着处理schedulableQueue里的内容即可,具体的会在后面篇章中展开。
参考: