Spark任务提交全流程的源码的类调用时序图
本篇博客主要是Spark任务提交到执行的全流程中的第一部分:从spark-submit.sh脚本调用到Executor被启动起来并注册到Driver的源码解析。
1、spark-submit.sh的脚本中
在spark-submit.sh的脚本中可以看到来启动SparkSubmit对象。
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@" 可以看到是调用的spark-class.sh
阅读spark-class.sh脚本中其中可以看到以下代码,从中可以看到是通过java -cp的方式执行类Main的main方法。
RUNNER="${JAVA_HOME}/bin/java"
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
}
阅读org.apache.spark.launcher.Main类的代码:可以发现在这个类里面将org.apache.spark.deploy.SparkSubmit通过字符串拼接好要执行的shell命令最后返回spark-class.sh脚本中的CMD变量,接下来
exec "${CMD[@]}" 执行该shell命令启动了org.apache.spark.deploy.SparkSubmit类。
2、SparkSubmit类中
这里面主要是解析Spark-submit.sh中传入的参数并且启动用户的jar。
1、在main()方法中
// TODO 先实例化了SparkSubmit对象
val submit = new SparkSubmit()
// TODO 解析SparkSubmit脚本外部传入的参数
new SparkSubmitArguments(args)
// TODO 接下来调用doSubmit方法
submit.doSubmit(args)
2、在doSubmit()方法中
// TODO 这里开始解析参数
val appArgs = parseArguments(args)
appArgs.action match {
// TODO 当是任务提交的时候执行 submit(appArgs, uninitLog) 方法
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
4、 在submit()方法中
// TODO 首先,我们通过设置来准备启动环境的适当的类路径、系统属性和应用程序参数基于集群管理器和部署模式运行子主类。
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// TODO 这里掉用doRunMain(), 上面只是定义了
doRunMain()
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
5、 在runMain()方法中
// TODO 设置线程的类加载器
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
// TODO 添加外部依赖的jar到classpath中
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
// TODO 反射加载用户任务的class类
mainClass = Utils.classForName(childMainClass)
// TODO 实例化反射的类对象
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass){
start(childArgs.toArray, sparkConf){
// TODO 这里获取main()方法
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
// TODO 开始执行main()方法, 通过反射获取到类对象然后执行main方法,到这里就把用户自定义的jar给启动起来了
mainMethod.invoke(null, args)
}
}
// TODO 调用用户任务自定义类的main()方法,从这以后开始实例化SparkContext()来执行用户的自定义任务
app.start(childArgs.toArray, sparkConf)
3、SparkContext类
这里面主要代码主要内容是:
1、创建了SparkEnv,并且在里面创建了名为sparkDriver的RpcEnv(注意后面很多地方都有用到以及后面的CoarseGrainedSchedulerBackend中的DriverEndpoint和AppClient中都是使用的该RpcEnv)、BroadcastManager、MapOutputTrackerMaster、ShuffleManager、MemoryManager、BlockManagerMaster、BlockManager、OutputCommitCoordinator等组件。
2、实例化SchedulerBackend、TaskScheduler、DAGScheduler。
3、封装申请要启动的Executor的类路径以及classpath等信息为Command对象,以及将申请的Executor内存和Core等信息封装到ApplicationDescription对象中。并且向Master注册该Application。
/** 在SparkContext的构造方法中 **/
// TODO 在这里创建SparkEnv, 这里主要是创建Driver的RpcEndpoint
_env = createSparkEnv(_conf, isLocal, listenerBus)
{
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf)){
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
bindAddress,
advertiseAddress,
Option(port),
isLocal,
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
){
// TODO 为Driver或者Executot创建一个RpcEnv用于Rpc通信, 当该Task的代码序列化分发到Worker上执行的时候会判断是Driverh还是Executor
val systemName = if (isDriver) driverSystemName else executorSystemName
// 这里创建Driver的RpcEndPoint
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,securityManager, numUsableCores, !isDriver)
// TODO 实例化BroadcastManager
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
// TODO 实例化MapOutputTracker, 如果是Driver则是Master,否则则是Worker
// TODO 这里主要记录Map任务或者Reducer任务计算好的结果的输出位置
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
new MapOutputTrackerWorker(conf)
}
// TODO 初始化后必须分配Tacker的RpcEndPoint,因为MapOutputTracker的EndPOint需要MapOutputTracker本身
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
// TODO 这里通过反射实例化ShufflerManager,默认的是SortShuffleManager
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
// TODO 在这里实例化MemoryManager, 分为1.6之前的StaticMemoryManager和之后的UnifiedMemoryManager
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
// TODO 所有的APP的关于内存分配的代码在这里面
UnifiedMemoryManager(conf, numUsableCores)
}
// TODO 在这里实例化NettyBlockTransferService, 这个用来向远程的BlockManager请求Block
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,blockManagerPort, numUsableCores)、
// TODO 在Driver上实例化BlockManagerMaster, 并且实例化BlockManagerMaster的RpcEndpoint
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),conf, isDriver)
// TODO 根据传入的executorId实例化Executor上的BlockManager, 并且只有在initialize()调用之后才生效
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
// TODO 这个OutputCommitCoordinator主要确定任务是否可以把输出提到到HFDS的管理者。 使用先提交者胜的策略。
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
// TODO 这里初始化OutputCommitCoordinator的RpcEndpoint
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
// TODO 在这里面调用SparkEnv的构造器实例化SparkEnv
val envInstance = new SparkEnv(
executorId,
rpcEnv,
serializer,
closureSerializer,
serializerManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockManager,
securityManager,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)
}
}
SparkEnv.set(_env)
// TODO 在createTaskScheduler之前注册心跳接受器
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
// TODO 实例化SchedulerBackend和TaskScheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode){
master match {
case SPARK_REGEX(sparkUrl) =>
// TODO 实例化TaskSchedulerImpl类
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
// TODO 实例化StandaloneSchedulerBackendl类
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
// TODO 调用initialize()方法实例化FIFOSchedulableBuilder
scheduler.initialize(backend){
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
(backend, scheduler)
// TODO 当为Yarn, K8s, Mesos这些任务调度器的时候
case masterUrl =>
// TODO 获取ClusterManager
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
// TODO 创建TaskScheduler
val scheduler = cm.createTaskScheduler(sc, masterUrl)
// TODO 这里创建SchedulerBackend
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}
_schedulerBackend = sched
_taskScheduler = ts
// TODO 实例化DAGScheduler
_dagScheduler = new DAGScheduler(this){
/*
DAGScheduler中的重要概念:
实现面向Stage的调度的高级调度层。 它为每个作业计算Stage的DAG,跟踪实现了哪些RDD和Stage输出,并找到运行该作业的最小计划。
然后,它将Stage作为TaskSet提交给在群集上运行它们的基础TaskScheduler实现。 TaskSet包含完全独立的任务,这些任务可以基于集群中已经存在的数据
(例如,先前Stage的映射输出文件)立即运行,尽管如果此数据不可用可能会失败。
通过在Shuffle Stage处划分RDD Graph来创建Spark Stage。
具有“窄”依赖关系的RDD操作(例如map()和filter())在每个Stage都通过管道传递到一组任务中,
但是具有Shuffle依赖关系的操作需要多个Stage(一个Stage编写一组Map输出文件,另一个Stage在barrier后读取这些文件)。
最后,每个Stage将只有对其他Stage进行shuffle依赖性,并且可以在其中计算多个操作。 这些操作的实际pipelining发生在各种RDD的RDD.compute()函数中
除了提供Stage的DAG之外,DAGScheduler还根据当前缓存状态确定运行每个任务的首选位置,并将这些位置传递给低级TaskScheduler。
此外,它可以处理由于Shuffle输出文件丢失而导致的故障,在这种情况下,可能需要重新提交旧Stage。
TaskScheduler处理*Stage内*不是由随机文件丢失引起的故障,TaskScheduler会在取消整个Stage之前重试每个任务几次。
以下有一些重要概念:
1、 Job(由[[ActiveJob]]表示)是提交给调度程序的顶级工作项。 例如,当用户调用诸如count()之类的动作时,将通过SubmitJob提交作业。
每个作业可能需要执行多个Stage才能构建中间数据。
2、 Stage([[Stage]])是计算作业中中间结果的任务集,其中每个任务在相同RDD的分区上计算相同功能。
Stage在Shuffle边界处分开,这会引入barrier(在这里我们必须等待上一个Stage完成才能获取输出)。
有两种类型的Stage:[[ResultStage]],用于执行动作的最后Stage,以及[[ShuffleMapStage]],用于编写Map的输出文件。
如果这些作业重复使用相同的RDD,则Stage通常在多个作业之间共享。
3、 Task是各个工作单元,每个Task都发送到一台机器。
4、 缓存跟踪:DAGScheduler会计算出缓存了哪些RDD以避免重新计算它们,并且同样记住哪些ShuffleMapStage已经生成了输出文件,以避免
重做shuffle的Map端的输出文件。
5、 Preferred locations:DAGScheduler还会根据其基础RDD的首选位置,或缓存或混洗数据的位置,计算Stage中每个任务的运行位置。
6、 Cleanup:所有数据结构(取决于它们的正在运行的作业)完成后都会被清除,以防止长时间运行的应用程序中的内存泄漏。
为了从故障中恢复,同一Stage可能需要运行多次,这称为“尝试”。 如果TaskScheduler报告由于丢失了上一个Stage的Map输出文件而导致任务失败,
则DAGScheduler将重新提交该失去的Stage。 这是通过带有FetchFailed的CompletionEvent或ExecutorLost事件检测到的。
DAGScheduler将等待一小段时间,以查看其他节点或任务是否失败,然后针对计算丢失任务的任何丢失Stage重新提交TaskSet。
作为此过程的一部分,我们可能还必须为旧的(完成的)Stage创建Stage对象,在此之前我们已经清理了Stage对象。
由于来自Stage的旧尝试的任务仍然可以运行,因此必须注意映射在正确的Stage对象中接收到的所有事件。
这是在对此类进行更改或检查更改时要使用的清单:
1、 当涉及它们的作业结束时,应清除所有数据结构,以避免在长时间运行的程序中无限期累积状态。
2、 添加新数据结构时,请更新DAGSchedulerSuite.assertDataStructuresEmpty以包括新结构。 这将有助于捕获内存泄漏。
*/
}
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// TODO 在这启动了TaskScheduler, 主要是调用SchedulerBackend.start()启动一个Client RpcEnv
_taskScheduler.start() {
// TODO 在这里面调用了SchedulerBackend的start()
backend.start(){
1、 当backend为ExternalClusterManager的时候执行这里的操作,这里执行其他的任务调度的处理步骤
2、 当backend为StandaloneSchedulerBackend的时候执行这里的操作
super.start() // StandaloneSchedulerBackend的父类是CoarseGrainedSchedulerBackend {
// TODO 在这里面创建了DriverEndPoint
// TODO 这里创建了CoarseGrainedSchedulerRpcEndpoint
driverEndpoint = createDriverEndpointRef(properties){
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
new DriverEndpoint(rpcEnv, properties) {
override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
}
}
}
// TODO 将CoarseGrainedExecutorBackend的类路径封装到Command消息中
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
// TODO 任务提交时指定的spark.executor.cores为每个Executor的Core数量
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
// TODO 将前面封装Executor的类路径和其他的启动参数封装到ApplicationDescription中
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
// TODO 将ApplicationDescription封装到StandaloneAppClient中
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
// TODO 调用StandaloneAppClient的start()方法
client.start(){
// TODO 只是启动一个名为AppClient的rpcEndpoint即可【注意:这里传入的rpcEnv还是Driver的RpcEnv, 只是又做了一层封装而已】
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv){
// ClientEndpoint其实使用的还是之前创建的封装在SparkEnv中的名叫"sparkDriver"的RpcEndpoint
// TODO 由于每个RpcEndpoint初始化的时候都会带有一个InBox,在InBox实例化的时候会自带一条OnStart消息,
// TODO 当从InBox中处理OnStart消息的时候会调用RpcEndpoint对应的OnStart()方法
override def onStart(): Unit = {
try {
// TODO CLient向Master注册
registerWithMaster(1){
// TODO 调用开始注册的tryRegisterAllMasters()方法
registerMasterFutures.set(tryRegisterAllMasters(){
// TODO 向所有的Master注册(由于高可用模式有多个Master,当为StandBy时不会有任何响应)
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
// TODO 根据Master的地址和名字得到MasterRpcRef
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// TODO Driver向Master发送注册App的消息RegisterApplication
masterRef.send(RegisterApplication(appDescription, self))
}
})
}
})
// TODO 另起一个线程以固定的速率调用自己去注册
registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
override def run(): Unit = {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
// TODO 如果注册超过三次则表示注册失败放弃注册
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
stop()
}
}
}))
}
}
}
/**
使用给定的appId初始化BlockManager。 在构造函数中不执行此操作,因为在BlockManager实例化时可能不知道appId(尤其对于驱动程序,
只有在TaskScheduler注册后才能获知)。 此方法初始化BlockTransferService和ShuffleClient,向BlockManagerMaster注册,
启动BlockManagerWorker端点,并向本地shuffle服务注册(如果已配置)。
**/
// TODO 这里去初始化Driver端的Blockmanager,只有这里调用了才会生效
_env.blockManager.initialize(_applicationId)
4、Master类
1、Master中主要处理接受到Driver发来的RegisterApplication消息,计算在哪些Worker调度Executor的运行。
2、当Master计算完Executor在哪些Worker上运行后,就向Worker发送LaunchExecutor消息启动Executor,并向Driver发送ExecutorAdded的消息。
Master.scala
{
override def receive: PartialFunction[Any, Unit] = {
// TODO 收到Client端发送的注册App的消息, 这里的driver其实是client
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver) {
// TODO App注册成功后将App的消息封装到ApplicationInfo中
new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
}
// TODO 这里将App注册到Master的内存中记录
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
// TODO 将ApplicationDescription持久化到Zookeeper中
persistenceEngine.addApplication(app)
// TODO 向Client响应一个RegisteredApplication消息,并返回appId和Master的RpcEndpointRef
driver.send(RegisteredApplication(app.id, self))
// TODO 开始调度App为App计划在Worker上的Executor的Core和Memory资源, 这里是整个Application的资源调度的核心代码
schedule() {
// TODO 先取出活着的Worker, 并且打乱顺序
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// TODO 轮询的方式为等待的Driver分配Worker,此处是为Driver分配启动的Worker机器
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
// TODO 当遍历过的Worker的数量小于存活的Worker数量,并且该Worker没有被启动过
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
// TODO 当Worker的可用的内存大于等于Driver所需的内存 并且 Worker可用的Core数量大于等于Driver需要的Core数量
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
// TODO 如果满足以上的条件则在该Worker上启动Driver
launchDriver(worker, driver)
// TODO 分配完之后要从等待分配Worker的Driver列表中删除掉已分配的Driver, 并更新状态为已启动Worker
waitingDrivers -= driver
launched = true
}
// TODO 将索引移动到下一个活着的Worker, 此处求模主要是房子下表越界,因为没有做大小判断
curPos = (curPos + 1) % numWorkersAlive
}
}
// TODO 开始调度Job和在Worker上启动Executor
startExecutorsOnWorkers() {
// TODO 遍历每一个等待调度的App, 依次调度App
for (app <- waitingApps) {
// TODO 取出用户提交的spark.excutor.cores指定的每个Excutor上申请的Core数量
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
// If the cores left is less than the coresPerExecutor,the cores left will not be allocated
// TODO 当App剩余要分配的Cores的数量大于等于每个Excutor上申请的Core数量才进行分配
if (app.coresLeft >= coresPerExecutor) {
// Filter out workers that don't have enough resources to launch an executor
// TODO 取出活着的Worker, 然后取出Worker可用内存和可用的Core都大于App在每个Executor上
// TODO 申请的内存和Core的数量, 最后按照可用Core的数量倒序排
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
// TODO 接下来是真正计算在哪些Worker上分配Executor和Core的方法
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) {
// 申请的每个Excutor上要分配的Core的数量
val coresPerExecutor = app.desc.coresPerExecutor
// 如果spark.executor.cores属性没有制定则默认为一个
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
// 是否每个Worker上智能启动一个Executor, 由spark.executor.instances=1指定
val oneExecutorPerWorker = coresPerExecutor.isEmpty
// 申请的每个Executor上要分配的内存大小
val memoryPerExecutor = app.desc.memoryPerExecutorMB
// 可用的worker(指满足之前以上的约束条件的)的数量
val numUsable = usableWorkers.length
// 每个Worker上要分配的Core的数量, 用数组封装
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
// 每个Worker上要分配的Executor的数量
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
// 当前App要申请的Core的数量, 当集群的所有的Core的数量小于申请的数量则集群全部的Core都给这个App, 否则只分配申请的Core的数量
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
/** Return whether the specified worker can launch an executor for this app. */
// TODO 以下每次都要减去之前分配的Core和内存来判断是因为当前的Core还没有真正的分配
def canLaunchExecutor(pos: Int): Boolean = {
// TODO 当整个集群能为App分配的Core的数量大于等于每个Executor上申请分配的Core的数量的时候才继续调度
val keepScheduling = coresToAssign >= minCoresPerExecutor
// TODO 当前Worker上可用的Core的数量减去之前在这个Worker上分配的数量的大于等于每个Executor上申请
// TODO 分配的Core的数量的时候才继续调度, 因为当没有配置spark.executor.cores的时候就是每次只分配一个Core直到分配满申请要的总的Core的数量
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
// TODO 当指定每个Worker上可以启动多个Executor的时候 或者 还没有在当前的Worker上分配Core的时候才启动新的Executor
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
// TODO 这个IF后main表示要启动新的Executor进程
if (launchingNewExecutor) {
// TODO 计算当前Worker上要分配的内存, 计算方式为: 当前Worker上分配的Executor的数量 * 申请的每个Executor的内存
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
// TODO 判断当前Worker上可用的内存减去之前已经分配的内存是否能满足新分配的Executor的内存
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
// TODO 判断当前分配的Executor的数量 + 本次App申请的Executor的数量是否小于系统规定的App的最大的Executor的限制
// PS : app.executorLimit为INT.MAX_VALUE的值,所以肯定不不会的啦 :-)
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
keepScheduling && enoughCores
}
}
// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
// TODO 根据以上调度好的返回的数组开始在Worker上启动Executor
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos)) {
// TODO 根据在每个Worker上分配的Core的数量除以每个Executor申请的Core的数量得到每个Worker上要启动的Executor的数量,
// TODO 当spark.executor.cores没有设置的时候, 整个Worker上只有一个Executor
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
// TODO 获取每个Executor上要启动的Core的数量, 默认为coresPerExecutor, 当spark.executor.cores没有设置的时候,
// TODO 由于一个Worker上只有一个Executor则启动当前Worker上分配的所有的Core
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
// TODO 表示当前的App在那个Worker启动Executor
val exec = app.addExecutor(worker, coresToAssign)
// TODO 此处真正开始启动Executor
launchExecutor(worker, exec){
// TODO 标记当前的Executor在那个Worker上启动
worker.addExecutor(exec)
// TODO Master向Worker发送LaunchExecutor的消息
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
// TODO Master向Driver发送ExecutorAdded的消息
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
app.state = ApplicationState.RUNNING
}
}
}
}
}
}
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}
}
}
5、Worker类
1、接受Master发送的启动Executor的LaunchExecutor消息,然后Worker将从服务端接受到ApplicationDesccription封装成ExecutorRunner然后另外启动一个线程异步的去启动Executor.
2、当Executor被成功启动之后,Worker会向Master发送ExecutorStateChanged消息表示Executor启动成功,当Master收到该消息后会向Driver发送ExecutorUpdated消息表示申请的Executor已经启动起来了。并且Master这时候会去调用schedule()方法去调度任务执行。
// TODO spark中Worker进程
Worker.scala {
override def receive: PartialFunction[Any, Unit] = synchronized {
// TODO Worker收到Master发送的启动Executor的消息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// Create local dirs for the executor. These are passed to the executor via the
// SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
// application finishes.
// TODO 为Executor创建本地目录
val appLocalDirs = appDirectories.getOrElse(appId, {
val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
val dirs = localRootDirs.flatMap { dir =>
try {
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
Utils.chmod700(appDir)
Some(appDir.getAbsolutePath())
} catch {
case e: IOException =>
logWarning(s"${e.getMessage}. Ignoring this directory.")
None
}
}.toSeq
if (dirs.isEmpty) {
throw new IOException("No subfolder can be created in " +
s"${localRootDirs.mkString(",")}.")
}
dirs
})
appDirectories(appId) = appLocalDirs
// TODO 实例化一个ExecutorRunner
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
// TODO 调用Executor.start()方法启动Executor进程
manager.start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
// TODO 这里启动了一个线程调用fetchAndRunExecutor()启动Executor,不阻塞主线程
override def run() { fetchAndRunExecutor() {
// Launch the process
// TODO 以下主要是根据Spark-Submit中指定的Executor的core, memory等参数,去组装Java进程启动的参数
// TODO 启动的时候的类是CoarseGrainedExecutorBackend,其中有main方法,到时候java去启动该类的时候去SPARK_HOME下面找
val subsOpts = appDesc.command.javaOpts.map {
Utils.substituteAppNExecIds(_, appId, execId.toString)
}
val subsCommand = appDesc.command.copy(javaOpts = subsOpts)
val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")
builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
val baseUrl =
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
} else {
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
}
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
// TODO 此处真正的启动Executor进程
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)
// Redirect its stdout and stderr to files
// TODO 将程序的标准输出和错误输出到之前创建的Executor的工作目录中
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
// TODO Executor进程启动起来后,向Worker发送ExecutorStateChanged消息,表示Executor启动起来了
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
} }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
// It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
// be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
if (state == ExecutorState.RUNNING) {
state = ExecutorState.FAILED
}
killProcess(Some("Worker shutting down")) }
}
// TODO 将当前所消费的Core和Memory的数量记录记录在已使用里面
coresUsed += cores_
memoryUsed += memory_
// TODO Worker向Master发送ExecutorStateChanged表示Executor已经启动起来
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
case e: Exception =>
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
}
}
}
}
6、CoarseGrainedExecutorBackend类
1、这个是Executor的进程执行的类,可以看到这里面有main()方法,可以通过java命令直接启动执行。
2、当Executor的main方法一执行会实例Executor的RpcEndpoint对象,又由于RpcEndpoint对象一生成就会执行生命周期方法onStart(),在onStart()方法里面会去向Driver发送RegisterExecutor消息来注册自己。
3、当收到Driver返回的RegisteredExecutor消息后,会在Executor中创建线程池,注意以后的任务提交到Executor中都会在线程池中执行的。
// TODO 这是Spark中Executor执行的进程的名字
CoarseGrainedExecutorBackend.scala {
// TODO 此处是Executor进程启动的入口方法
def main(args: Array[String]) {
// TODO 此处开始执行Executor的代码
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath){
// TODO 根据传入的--driver-url获取到Driver的RpcEndpoint地址
val driver = fetcher.setupEndpointRefByURI(driverUrl)
// TODO 此处创建Executor的RpcEnv, 用于Rpc通信
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
// TODO 将创建的Netty连接和Executor相关联, 因为Executor继承了ThreadSafeRpcEndpoint类
// TODO 并且同时实例化了CoarseGrainedExecutorBackend的对象
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
// TODO 连接到Worker的RpcEndPoint,如果连接中断,则终止JVM。提供worker及Executor之间的存活性通知。
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
// TODO 这里阻塞主线程等待RpcEndpoint执行完成
env.rpcEnv.awaitTermination()
}
}
// TODO 这是Executor的生命周期方法
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
// TODO Executor的RpcEnv启动之后将会调用这个方法, 这里表示Executor启动成功之后像Driver注册自己
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
// TODO Executor在启动起来之后向Driver发送RegisterExecutor消息去注册自己
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
override def receive: PartialFunction[Any, Unit] = {
// TODO 当收到Driver返回的注册Executor成功的消息
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
// TODO 这里很重要,主要是创建了一个线程池用来执行Task的地方
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
case StopExecutor =>
stopping.set(true)
logInfo("Driver commanded a shutdown")
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
// a message to self to actually do the shutdown.
self.send(Shutdown)
}
}
7、CoarseGrainedSchedulerBackend类
1、这个类很重要他是在SparkContext.createTaskScheduler()方法创建的,它里面包含了DriverEndpoint类,不过这里面的Driver的RpcEnv还是引用的SparkEnv里面创建的名为sparkDriver的RpcEnv。
2、这里DriverEndpoint一被实例化就会执行生命周期方法onstart()。这里DriverEndpoint一启动就向自己发送ReviveOffers消息,并去调用调度Task任务。
3、当收到Executor发送的RegisterExecutor消息来注册自己,这时会将Executor保存到CoarseGrainedSchedulerBackend的executorDataMap中,并且会向Executor响应一个RegisteredExecutor消息。接下来调用makeOffers()开始调度任务执行。
CoarseGrainedSchedulerBackend.scala {
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging {
override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
// TODO 这里是按制定的速率定时给自己发送ReviveOffers消息
reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
// TODO 这里接受消息为什么不在Revieve方法中,是因为这个是通过RpcEndpoint.ask()方法发送来的
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// TODO 此处是Driver运行的类, 收到Executor发来的注册Executor的消息
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
// TODO 当要注册的Executor已经向Driver注册过了, 返回一个RegisterExecutorFailed消息
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else if (scheduler.nodeBlacklist.contains(hostname)) {
// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
logInfo(s"Rejecting $executorId as it has been blacklisted.")
// TODO 当要注册的Executor在Driver的黑名单中, 返回一个RegisterExecutorFailed消息
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
// TODO 以下开始注册Executor
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
// TODO 将注册的Executor信息封装成ExecutorData对象进行注册
val data = new ExecutorData(executorRef, executorAddress, hostname,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
// TODO 当注册成功的话向Executor响应一个RegisteredExecutor的消息
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
// TODO 这个方法很重要, 其中开始调度Task
makeOffers()
}
// TODO 调用makeOffers方法, 向所有Executor提供虚假资源
case ReviveOffers =>
makeOffers()
}
}
}