Spark-Core源码学习记录
该系列作为Spark源码回顾学习的记录,旨在捋清Spark分发程序运行的机制和流程,对部分关键源码进行追踪,争取做到知其所以然,对枝节部分源码仅进行文字说明,不深入下钻,避免混淆主干内容。
本文为RpcEnv、RpcEndpoint、RpcEndpointRef
相关内容的介绍,此为阅读后续源码的基础。
自Spark 2.1.0之后的底层实现只有Netty,移除了Akka的依赖,下面就从create方法开始追踪源码:
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
RpcEnv
首先是静态类RpcEnv,包含两个重载的create方法,其中RpcEnvConfig
是一个模板类,因此create方法最终仍是通过工厂类NettyRpcEnvFactory
的create方法进行初始化。
/**
* A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
* so that it can be created via Reflection.
*/
private[spark] object RpcEnv {
def create(name: String,bindAddress: String,advertiseAddress: String,port: Int,conf: SparkConf,securityManager: SecurityManager,numUsableCores: Int,clientMode: Boolean): RpcEnv = {
//RpcEnvConfig是一个模板类
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager, numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}
}
查看工厂类的create方法:
private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
def create(config: RpcEnvConfig): RpcEnv = {
val sparkConf = config.conf
// 初始化序列器
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
// 实例化NettyRpcEnv,该部分为重要内容
val nettyEnv = new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, config.securityManager, config.numUsableCores)
if (!config.clientMode) {// clientMode默认为false
//此处为伪代码,便于理解
nettyEnv.startServer(config.bindAddress, config.port)
}
nettyEnv
}
}
进入NettyRpcEnv实例化的部分源码,此处初始化了一个Dispatcher实例对象
private[netty] class NettyRpcEnv(val conf: SparkConf,javaSerializerInstance: JavaSerializerInstance,host: String,securityManager: SecurityManager,numUsableCores: Int) extends RpcEnv(conf) with Logging {
//初始化一个分配器,此为关键调度组件
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
//初始化一个transportContext实例,以供后续分发程序使用
private val transportContext = new TransportContext(transportConf,new NettyRpcHandler(dispatcher, this, streamManager))
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
}
Dispatcher
进入Dispatcher实例化源码,声明一个EndpointData结构体,并初始化几个存放数据的容器。
package org.apache.spark.rpc.netty
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
//看做一个重要的数据结构,其中inbox是存放数据的结构体,下面会详细查看
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
}
//初始化两个线程安全的HashMap,分别用来记录 name到 EndpointData的关联关系,和 RpcEndpoint与 RpcEndpointRef的关联关系
private val endpoints: ConcurrentMap[String, EndpointData] = new ConcurrentHashMap[String, EndpointData]
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
// Track the receivers whose inboxes may contain messages.
//一个队列,存放等待处理的 EndpointData
private val receivers = new LinkedBlockingQueue[EndpointData]
下面是Dispatcher实例过程中初始化的关键线程池:
/** Thread pool used for dispatching messages. */
private val threadpool: ThreadPoolExecutor = {
// 计算线程池大小,根据配置文件和当前可用 CPU进程数
val availableCores = if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS).getOrElse(math.max(2, availableCores))
// newDaemonFixedThreadPool线程池,均去执行MessageLoop实例的run方法
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
}
pool
}
/** Message loop used for dispatching messages. */
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
//分发线程从 Dispatcher实例化后就一直循环运行
while (true) {
try {
//取出一条 EndpointData,这些 EndpointData是在注册的时候加入到receivers的,后面会看到相关源码
val data = receivers.take()
if (data == PoisonPill) { // PoisonPill作为结束线程的标志
// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)
return
}
// 调用EndpointData结构体重的 inbox.process方法
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
/** A poison endpoint that indicates MessageLoop should exit its message loop. */
private val PoisonPill = new EndpointData(null, null, null)
EndpointData 与 inbox
我们来查看EndpointData结构体中的 inbox初始化及process方法都做了什么:
/* An inbox that stores messages for an [[RpcEndpoint]] and posts messages to it thread-safely.*/
private[netty] class Inbox(val endpointRef: NettyRpcEndpointRef,val endpoint: RpcEndpoint)extends Logging {
@GuardedBy("this")
protected val messages = new java.util.LinkedList[InboxMessage]()
/** True if the inbox (and its associated endpoint) is stopped. */
@GuardedBy("this")
private var stopped = false
// OnStart should be the first message to process
inbox.synchronized {
//private[netty] case object OnStart extends InboxMessage
//从这里可以看到,每一个 EndpointData初始化时,都会讲一个Onstart模板类加入到messages中
messages.add(OnStart)
}
下面是process方法的内容:
/* Process stored messages.*/
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
// 可以看到取出 first element of this messages list,即为OnStart
message = messages.poll()
if (message != null) {
numActiveThreads += 1
} else {return}
}
while (true) {
safelyCall(endpoint) {
message match {
case RpcMessage(_sender, content, context) =>
try {
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg => throw new SparkException(s"Unsupported message $message from ${_sender}")})
} catch {}
case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
case OnStart =>
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
inbox.synchronized {
// "enableConcurrent" will be set to false after `onStop` is called, so we should check it
// every time.
if (!enableConcurrent && numActiveThreads != 1) {
// If we are not the only one worker, exit
numActiveThreads -= 1
return
}
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
return
}
}
可以看到调用process方法后会循环触发messages中的全部内容。message的类型主要有RpcMessage、OneWayMessage、OnStart、OnStop、RemoteProcessConnected
等等。
让我们回到最初的地方
回到文章最开头,我们来看看这些消息是什么时间和方式被加入到这些队列中的,
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
registerRpcEndpoint
首先是rpcEnv.setupEndpoint
,参数为NAME和Endpoint子类对象,这里Endpoint可以简单理解为封装了很多属性与一些既定方法的对象,其中包含覆写的OnStart方法。而NettyRpcEnv
中覆写的方法直接去实例化一个dispatcher.registerRpcEndpoint
并返回一个NettyRpcEndpointRef
。
override def setupEndpoint(name: String, endpoint: RpcEndpoint)
: RpcEndpointRef = {
dispatcher.registerRpcEndpoint(name, endpoint)
}
//这个方法才是最终注册Rpc的入口
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
//封装一下地址和名称
val addr = RpcEndpointAddress(nettyEnv.address, name)
//封装一些既定方法,主要是完成与nettyEnv和addr之间的绑定
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
//关键步骤,生成 EndpointData并与 name绑定,上面讲到可以将 EndpointData看做一个结构体,内部实例化了 Inbox,而 Inbox内部又会维护一个 messages列表,在初始化时将OnStart 加入列表。
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
val data = endpoints.get(name)
//维护Hash
endpointRefs.put(data.endpoint, data.ref)
//将 EndpointData提交到 receivers中,等待被 threadpool调用
receivers.offer(data) // for the OnStart message
}
endpointRef
}
通过上述的setupEndpoint方法内部各种调用,最终 Master 中的OnStart方法会被率先调用,具体细节放在介绍Master的篇幅中,此处不再展开。
askSync
文章开头还剩最后一行代码,调用askSync,参数为BoundPortsRequest
。
val portsResponse = masterEndpoint.askSync[BoundPortsResponse(BoundPortsRequest)
追踪askSync方法,最终会到具体的RpcEndpointRef
实现类NettyRpcEndpointRef
中。
def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
* specified timeout, throw an exception if this fails.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]]
*/
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
// 可以看到最终调用实现类的 ask方法,同时注意这是一个阻塞操作,且只运行一次,出错不会重试
val future = ask[T](message, timeout)
timeout.awaitResult(future)
}
我们查看NettyRpcEndpointRef
的ask方法
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
//内部调用 nettyEnv的同名方法,参数为封装的 RequestMessage
nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout)
}
private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
if (remoteAddr == address) {
val p = Promise[Any]()
p.future.onComplete {
case Success(response) => onSuccess(response)
case Failure(e) => onFailure(e)
}(ThreadUtils.sameThread)
// 本地调用 ,最终是通过分配器将 message加入到 EndpointData 的 inbox 中,然后通过 receivers调用
dispatcher.postLocalMessage(message, p)
} else { // 调用地址为远程主机,此处暂不展开
val rpcMessage = RpcOutboxMessage(...)
}
}
receiveAndReply
而Master的receiveAndReply在接收到BoundPortsResponse消息后被调用,通过模式匹配最终调用context.reply方法进行消息回复。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
//此处有很多消息类型,省略
case BoundPortsRequest =>
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
}
至此,一个简单的Master注册Rpc调用自身的流程完成。
最后附上一张引用自其他文章的图解,文章链接会在末尾给出。
参考: