以 standalone - cluster
模式为例
从 SparkSubmit提交任务 mainMethod
开始执行:
object Client {
def main(args: Array[String]) {
// scalastyle:off println
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
// scalastyle:on println
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
conf.set("spark.rpc.askTimeout", "10")
Logger.getRootLogger.setLevel(driverArgs.logLevel
//初始化 rpcEnv
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
//获得和Master通信的RPCEndpointRef,实际上是一个Seq[RpcEndpointRef]
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
//注册ClientEndpoint
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
}
val rpcEnv = RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
:
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL). map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
:
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
:
ClientEndpoint
不是个RpcEndpoint
嘛,封装了很多处理消息的方法
注意这里的
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
和receiver.offer(data)
,val data = endpoints.get(name)
将获取与name
对应的先前通过new EndpointData(name, endpoint, endpointRef)
创建好的EndpointData
,endpointRefs.put(data.endpoint, data.ref)
添加两者映射关系,然后receiver.offer(data)
将data
添加到receivers队列
,并且会触发Dispatcher
内部的MessageLoop
线程去消费,还有这里的endpointRef
是属于ClientEndpoint
的,显而易见嘛,传参的时候就传的ClientEndpoint
随后就会通过ClientEndpoint
的endpointRef
来调用inbox
中的OnStart
方法,这是它的本质,为什么这个信息类型是OnStart
:因为初始化EndpointData
的inbox
时,自动放置了一个OnStart
消息,所以空的Endpoint
对象对应于OnStart
:
这个写过详细解释,看这里:Spark2.0.2源码分析——RPC 通信机制(消息处理)
最终会在client
中,也就是ClientEndpoint
中调用OnStart
:
先前不是得到了Master
端的EndpointRef
嘛,通过RequestSubmitDriver
向Master
异步发送提交请求,当然,这也是后话了~