Spark官网上有一张集群交互图
部署完spark编译包之后,我们执行spark脚本./sbin/start-all.sh,脚本中又调用了其他脚本,就不说这部分内容了,最终JVM会加载Master类。
Master的main函数:
private[deploy] object Master extends Logging {
val SYSTEM_NAME = "sparkMaster"
val ENDPOINT_NAME = "Master"
def main(argStrings: Array[String]) {
Utils.initDaemon(log)
val conf = new SparkConf // 实例化spark配置信息
val args = new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) // 这是master启动入口的重要方法
rpcEnv.awaitTermination()
}
/**
* Start the Master and return a three tuple of:
* (1) The Master RpcEnv // 返回master的RPC环境对象
* (2) The web UI bound port // webUI访问绑定端口
* (3) The REST server bound port, if any
*/
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
}
RpcEnv.create方法创建master运行环境对象,跟进去
private[spark] object RpcEnv { def create( name: String, host: String, port: Int, conf: SparkConf, securityManager: SecurityManager, clientMode: Boolean = false): RpcEnv = { create(name, host, host, port, conf, securityManager, clientMode) } def create( name: String, bindAddress: String, advertiseAddress: String, port: Int, conf: SparkConf, securityManager: SecurityManager, clientMode: Boolean): RpcEnv = { val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager, clientMode) new NettyRpcEnvFactory().create(config) } }
def create(config: RpcEnvConfig): RpcEnv = { val sparkConf = config.conf // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance val javaSerializerInstance = new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance] val nettyEnv = new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, config.securityManager) if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } try { Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } } nettyEnv }
def startServer(bindAddress: String, port: Int): Unit = { val bootstraps: java.util.List[TransportServerBootstrap] = if (securityManager.isAuthenticationEnabled()) { java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager)) } else { java.util.Collections.emptyList() } server = transportContext.createServer(bindAddress, port, bootstraps) dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) }跟到这里之后可以看到,spark最终调用的是java代码进行netty容器的创建.
再回到Master中,从RpcEnv.create方法王下看:
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))这里实例化了Master,并在RPC分发器中注册了引用信息。
再看一下Master的代码,这个时候,就可以从Master的第一行开始看起了。
private val forwardMessageThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) // For application IDs private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000 private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200) private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200) private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15) private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE") private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10) val workers = new HashSet[WorkerInfo] val idToApp = new HashMap[String, ApplicationInfo] private val waitingApps = new ArrayBuffer[ApplicationInfo] val apps = new HashSet[ApplicationInfo] ......创建这些私有成员变量或者公共成员变量,用于各种信息的存储。
最主要的代码就是onStart()函数:
override def onStart(): Unit = { logInfo("Starting Spark master at " + masterUrl) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") webUi = new MasterWebUI(this, webUiPort) // 实例化master的webUI,方便于整个spark环境的查看 webUi.bind() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort if (reverseProxy) { masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl) logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " + s"Applications UIs are available at $masterWebUiUrl") } checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) } }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) if (restServerEnabled) { val port = conf.getInt("spark.master.rest.port", 6066) restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl)) } restServerBoundPort = restServer.map(_.start()) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() // Attach the master and app metrics servlet handler to the web ui after the metrics systems are // started. masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) val serializer = new JavaSerializer(conf) val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") val zkFactory = new ZooKeeperRecoveryModeFactory(conf, serializer) (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) case "FILESYSTEM" => val fsFactory = new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) case "CUSTOM" => val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory")) val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]) .newInstance(conf, serializer) .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) case _ => (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) } persistenceEngine = persistenceEngine_ leaderElectionAgent = leaderElectionAgent_ }
在onStart()函数中,创建了webui服务,以及启动了容器服务,开启了程序运行时一些系统服务的度量工具。
还启动了与worder通信的进程,默认等待worder开启的时间时1分钟。
Master中其他的代码主要是与worder、driver等等进行交互的代码。
比如worder注册,driver注册,通知worder启动executor等等等等...