背景
当使用start-slaves.sh启动Worker实例的时候,启动的实际上是Worker.scala的实例,启动之后,就会向Master进行注册,注意Executor启动的时候并不会向Master注册,原因请看博文:点击打开链接,具体的Master注册过程如下文
涉及到的文件:
(1) Worker.scala
(2) Master.scala正文
0.Worker.scala
可以看到Worker世纪城了ThreadSafeRpcEndpoint,意味着Worker是一个消息循环体,持有该对象的引用即可向该对象发送消息,Master同理,这也是Master和Woker进行交流的主要方式。
private[deploy] class Worker( override val rpcEnv: RpcEnv, webUiPort: Int, cores: Int, memory: Int, masterRpcAddresses: Array[RpcAddress], endpointName: String, workDirPath: String = null, val conf: SparkConf, val securityMgr: SecurityManager) extends ThreadSafeRpcEndpoint with Logging
1.Worker.main()
main方法位于Worker的伴生对象中,伴生对象会在main方法中new Worker对象,main方法详细代码,如下:
def main(argStrings: Array[String]) { Utils.initDaemon(log) val conf = new SparkConf //初始化参数 val args = new WorkerArguments(argStrings, conf) //开启endpoint 在该方法中实例化Worker对象,因为Worker本身就是一个EndPoint val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir, conf = conf) rpcEnv.awaitTermination() }main中主要方法是startRpcEnvAndEndPoint(),首先使用IP、port和conf实例化了一个RpcEnv对象,然后调用rpcEndPoint.setEndPoint(),set一个Worker 对象进去,具体代码如下:
def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None, conf: SparkConf = new SparkConf): RpcEnv = { // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("") val securityMgr = new SecurityManager(conf) //创建rpcEnv val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) //实例化Worker对象,并设置rpcEndPoint rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr)) rpcEnv }
2.Worker.onStart()
该Worker就不是伴生对象了,而是上一步实例化的对象了,因为继承了ThreadSafeEndPoint,所以会执行onstart()方法,具体代码展示如下,其中核心代码是registerWithMaster():
override def onStart() { assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( host, port, cores, Utils.megabytesToString(memory))) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") logInfo("Spark home: " + sparkHome) //创建worker 目录,默认是spark_home/worker,可以在启动的时候附带 --worker-dir,或者设置环境变量 createWorkDir() //启动shuffer服务 //创建自己的web服务 shuffleService.startIfEnabled() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}" //核心方法,向Master注册 registerWithMaster() metricsSystem.registerSource(workerSource) metricsSystem.start() // Attach the worker metrics servlet handler to the web ui after the metrics system is started. //向Master的页面添加自己的信息 metricsSystem.getServletHandlers.foreach(webUi.attachHandler) }
3.Worker.registerWithMaster()
registerWithMaster会调用tryRegisterAllMaster()向所有的Master进行注册
private def registerWithMaster() { // onDisconnected may be triggered multiple times, so don't attempt registration // if there are outstanding registration attempts scheduled. registrationRetryTimer match { case None => registered = false //核心代码,向所有的Master进行注册 registerMasterFutures = tryRegisterAllMasters() connectionAttemptCount = 0 registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) } }, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, TimeUnit.SECONDS)) case Some(_) => logInfo("Not spawning another attempt to register with the master, since there is an" + " attempt scheduled already.") } }
4.Worker.tryRegisterAllMaster()
因为Master可有多个,所以这里使用了线程池。private def tryRegisterAllMasters(): Array[JFuture[_]] = { masterRpcAddresses.map { masterAddress => //因为可能有多个master,这里使用了线程池 registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { logInfo("Connecting to master " + masterAddress + "...") val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) //使用masterEndPoint进行注册 sendRegisterMessageToMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } } }) } }Spark2.2版本,sendRegiterMessageToMaster,只是向Master发送消息之后,无其他注册,Master会使用WorkerEndPont发送response信息,如果失败,registerWithMaster代码会重试注册。
private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = { masterEndpoint.send(RegisterWorker( workerId, host, port, self, cores, memory, workerWebUiUrl, masterEndpoint.address)) }
5.Master.receive()
上文,使用masterEndPoint发送了RegisterWorker信息,会在master的receive中进行处理,代码如下:case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) => logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) //master处于standby 不进行注册操作 if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else if (idToWorker.contains(id)) { //已经注册的 workerRef.send(RegisterWorkerFailed("Duplicate worker ID")) } else { //正式进行注册 val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, workerRef, workerWebUiUrl) //注册方法 if (registerWorker(worker)) { persistenceEngine.addWorker(worker) //返回注册信息response workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress)) //新加入worker,重新调度 schedule() } else { val workerAddress = worker.endpoint.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)) } }
registerWorker只最后具体在master执行注册的代码,检查当前worker的状态,并在数据结构中添加,我们来看一下:
private def registerWorker(worker: WorkerInfo): Boolean = { // There may be one or more refs to dead workers on this same node (w/ different ID's), // remove them. //dead 处理 workers.filter { w => (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD) }.foreach { w => workers -= w } //unknow 处理 val workerAddress = worker.endpoint.address if (addressToWorker.contains(workerAddress)) { val oldWorker = addressToWorker(workerAddress) if (oldWorker.state == WorkerState.UNKNOWN) { // A worker registering from UNKNOWN implies that the worker was restarted during recovery. // The old worker must thus be dead, so we will remove it and accept the new worker. removeWorker(oldWorker) } else { logInfo("Attempted to re-register worker at same address: " + workerAddress) return false } } //添加worker信息 workers += worker idToWorker(worker.id) = worker addressToWorker(workerAddress) = worker if (reverseProxy) { webUi.addProxyTargets(worker.id, worker.webUiAddress) } true }
在该方法注册成功后,还用调用persistenceEngine.addWorker(worker)方法,该方法是持久化worker信息,比如在zookeeper中持久化该worker信息。
6.Worker.receive()
master最后使用workerEndPoint的应用发挥了registerWorker消息,在worker的receive方法中,有如下case:
case msg: RegisterWorkerResponse => handleRegisterResponse(msg)其中handleRegisterResponse,是最后worker处理注册成功的处理,主要有两个操作:
a.设置当前worker的registered变量为true b.开始发送心跳线程
总结:
worker的启动一般是管理员手动使用脚本进行启动的,/bin/*下的脚本,脚本首先使用Worker的伴生对象中的main方法,实例化一个Worker对象,又因为Worker是一个ThreadSafeRpcEndPoint,会执行onstart中的方法,在onstart中进行注册相关的发起操作,之间的操作主要是使用master和worker自身的引用完成的通信。
之后,worker会接收master启动executor的请求,创建backend,之后就是executorBackEnd和driver之间的通信了。