yarn-client 模式
SparkSubmit 的main 方法
main()
submit(appArgs)
//这里的childMainClass 是 userclass
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
//运行childMainClass的main方法 其实就是运行用户自己定义的main方法
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
mainMethod.invoke(null, childArgs.toArray)
/*
运行用户自己定义的类main方法
初始化SparkContext, new SparkContext()
sc的变量 _schedulerBackend _taskScheduler _dagScheduler
sc的大量初始化代码部分如下
*/
//其实就是获取_schedulerBackend _taskScheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
/* createTaskScheduler 的方法里面用了模式匹配 来匹配master
我们的是yarn
*/
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
//创建_taskScheduler YarnScheduler
val scheduler = cm.createTaskScheduler(sc, masterUrl)
//YarnClusterManager类的方法
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
sc.deployMode match {
case "cluster" => new YarnClusterScheduler(sc)
case "client" => new YarnScheduler(sc)
case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}
//创建对应的SchedulerBackend
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
//YarnClusterManager类的方法 createSchedulerBackend
//YarnClientSchedulerBackend
case "cluster" =>
new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case "client" =>
new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
cm.initialize(scheduler, backend)
(backend, scheduler)
}
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// _taskScheduler的start方法
_taskScheduler.start()
//TaskSchedulerImpl 的start方法
override def start() {
//这个backend 就是YarnClientSchedulerBackend
backend.start()
//任务切分相关 先不管
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
//来看YarnClientSchedulerBackend的start方法
//start方法的重点代码
//这个client就是 yarn包下面的Client
client = new Client(args, conf)
bindToYarn(client.submitApplication(), None)
//client.submitApplication()
//重点代码
// Set up the appropriate contexts to launch our AM
//准备环境和命令
//这个方法还会根据 部署模式
val containerContext = createContainerLaunchContext(newAppResponse)
//部分代码 "org.apache.spark.deploy.yarn.ExecutorLauncher"
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
//提交到RM上面 ExecutorLauncher 找nm运行 ExecutorLauncher
yarnClient.submitApplication(appContext)
/**
ExecutorLauncher 其实就是ApplicationMaster
RM找一台机器运行EL
EL就一行代码 就是用 ApplicationMaster.main(args)
跟cluster模式就大同小异了
*/
/*cluster模式下 ApplicationMaster的main方法
调用run()方法
*/
if (isClusterMode) {
//cluster模式
runDriver(securityMgr)
} else {
//client模式
runExecutorLauncher(securityMgr)
}
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
val port = sparkConf.getInt("spark.yarn.am.port", 0)
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
clientMode = true)
val driverRef = waitForSparkDriver()
addAmIpFilter()
//注册申请资源
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""),
securityMgr)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
}