在akka官网中关于远程actor交互,介绍了两种方法,一种是通过actorSelection查询,另一种是通过actorOf在远程节点创建一个actor。actorSelection我们之前的博客中已经介绍过,创建远程actor也有过简要说明,但其原理并没有做过多分析。下面就来分析一下如何在远程节点部署actor。
官网介绍了两种部署远程actor的方法,一种是通过akka.actor.deployment配置,另一种是编程,都比较简单。考虑到代码完整性,下面摘录akka官方样例库中的代码做分析。
def startRemoteCreationSystem(): Unit = { val system = ActorSystem("CreationSystem", ConfigFactory.load("remotecreation")) val actor = system.actorOf(Props[CreationActor], name = "creationActor") println("Started CreationSystem") import system.dispatcher system.scheduler.schedule(1.second, 1.second) { if (Random.nextInt(100) % 2 == 0) actor ! Multiply(Random.nextInt(20), Random.nextInt(20)) else actor ! Divide(Random.nextInt(10000), (Random.nextInt(99) + 1)) } }
在sample.remote.calculator.CreationApplication中有以上函数,是用来远程部署actor的,当然是以配置的方式实现。
include "common" akka { actor { deployment { "/creationActor/*" { remote = "akka.tcp://[email protected]:2552" } } } remote.netty.tcp.port = 2554 }
配置文件如上。细心的读者一定发现远程部署actor跟正常actor的创建基本没有区别,代码可以完全一致,只需要修改配置。那是如何做到在远程节点部署actor的呢?
val ref = system.actorOf(Props[SampleActor]. withDeploy(Deploy(scope = RemoteScope(address))))
上面代码是通过编程的方式来创建远程actor,可以发现跟创建普通的actor还是多少有点区别,那就是有withDeploy代码块。其实不管是配置,还是withDeploy都是在给创建actor时配置相关参数。那这些参数具体是在哪里生效,生效后又是如何处理的呢?既然最终都会影响Props的deploy配置,那就从Props入手。
/** * Returns a new Props with the specified deployment configuration. */ def withDeploy(d: Deploy): Props = copy(deploy = d withFallback deploy)
withDeploy比较简单,就是把最新的Deploy与Props原来的deploy变量进行合并,Deploy就是配置的一个类,不再过多分析,但有一点需要注意scope的值默认是NoScopeGiven,routerConfig默认是NoRouter。既然远程创建actor与Props有关,那就要找到在actorOf调用中涉及到Props使用的地方。
前面的博客已经分析过ActorSystem.actorOf的调用流程,为了便于理解,这里再贴出相关代码。
def actorOf(props: Props, name: String): ActorRef = if (guardianProps.isEmpty) guardian.underlying.attachChild(props, name, systemService = false) else throw new UnsupportedOperationException( s"cannot create top-level actor [$name] from the outside on ActorSystem with custom user guardian")
guardian是provider的一个字段,而remote模式下provider是RemoteActorRefProvider,而RemoteActorRefProvider中guardian定义如下。
override val deployer: Deployer = createDeployer /** * Factory method to make it possible to override deployer in subclass * Creates a new instance every time */ protected def createDeployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess) private val local = new LocalActorRefProvider(systemName, settings, eventStream, dynamicAccess, deployer, Some(deadLettersPath ⇒ new RemoteDeadLetterActorRef(this, deadLettersPath, eventStream))) override def guardian: LocalActorRef = local.guardian
其实还是在调用local的guardian相关的函数。其实还是在调用local的attachChild函数,注意调用过程中参数的值,其中async是true,systemService是false。
private[akka] def attachChild(props: Props, name: String, systemService: Boolean): ActorRef = makeChild(this, props, checkName(name), async = true, systemService = systemService)
private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = { if (cell.system.settings.SerializeAllCreators && !systemService && props.deploy.scope != LocalScope) { val oldInfo = Serialization.currentTransportInformation.value try { val ser = SerializationExtension(cell.system) if (oldInfo eq null) Serialization.currentTransportInformation.value = system.provider.serializationInformation props.args forall (arg ⇒ arg == null || arg.isInstanceOf[NoSerializationVerificationNeeded] || { val o = arg.asInstanceOf[AnyRef] val serializer = ser.findSerializerFor(o) val bytes = serializer.toBinary(o) val ms = Serializers.manifestFor(serializer, o) ser.deserialize(bytes, serializer.identifier, ms).get != null }) } catch { case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e) } finally Serialization.currentTransportInformation.value = oldInfo } /* * in case we are currently terminating, fail external attachChild requests * (internal calls cannot happen anyway because we are suspended) */ if (cell.childrenRefs.isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated") else { reserveChild(name) // this name will either be unreserved or overwritten with a real child below val actor = try { val childPath = new ChildActorPath(cell.self.path, name, ActorCell.newUid()) cell.provider.actorOf(cell.systemImpl, props, cell.self, childPath, systemService = systemService, deploy = None, lookupDeploy = true, async = async) } catch { case e: InterruptedException ⇒ unreserveChild(name) Thread.interrupted() // clear interrupted flag before throwing according to java convention throw e case NonFatal(e) ⇒ unreserveChild(name) throw e } // mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise if (mailbox ne null) for (_ ← 1 to mailbox.suspendCount) actor.suspend() initChild(actor) actor.start() actor } }
而且默认配置下cell.system.settings.SerializeAllCreators是false,所以最终会调用cell.provider.actorOf(cell.systemImpl, props, cell.self, childPath,systemService = systemService, deploy = None, lookupDeploy = true, async = async),其中systemService是false,deploy是None,lookupDeploy是true,async是true。
RemoteActorRefProvider的actorOf代码如下
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async) else { if (!system.dispatchers.hasDispatcher(props.dispatcher)) throw new ConfigurationException(s"Dispatcher [${props.dispatcher}] not configured for path $path") /* * This needs to deal with “mangled” paths, which are created by remote * deployment, also in this method. The scheme is the following: * * Whenever a remote deployment is found, create a path on that remote * address below “remote”, including the current system’s identification * as “sys@host:port” (typically; it will use whatever the remote * transport uses). This means that on a path up an actor tree each node * change introduces one layer or “remote/scheme/sys@host:port/” within the URI. * * Example: * * akka.tcp://sys@home:1234/remote/akka/sys@remote:6667/remote/akka/sys@other:3333/user/a/b/c * * means that the logical parent originates from “akka.tcp://sys@other:3333” with * one child (may be “a” or “b”) being deployed on “akka.tcp://sys@remote:6667” and * finally either “b” or “c” being created on “akka.tcp://sys@home:1234”, where * this whole thing actually resides. Thus, the logical path is * “/user/a/b/c” and the physical path contains all remote placement * information. * * Deployments are always looked up using the logical path, which is the * purpose of the lookupRemotes internal method. */ @scala.annotation.tailrec def lookupRemotes(p: Iterable[String]): Option[Deploy] = { p.headOption match { case None ⇒ None case Some("remote") ⇒ lookupRemotes(p.drop(3)) case Some("user") ⇒ deployer.lookup(p.drop(1)) case Some(_) ⇒ None } } val elems = path.elements val lookup = if (lookupDeploy) elems.head match { case "user" | "system" ⇒ deployer.lookup(elems.drop(1)) case "remote" ⇒ lookupRemotes(elems) case _ ⇒ None } else None val deployment = { deploy.toList ::: lookup.toList match { case Nil ⇒ Nil case l ⇒ List(l reduce ((a, b) ⇒ b withFallback a)) } } Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) ⇒ b withFallback a) match { case d @ Deploy(_, _, _, RemoteScope(address), _, _) ⇒ if (hasAddress(address)) { local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async) } else if (props.deploy.scope == LocalScope) { throw new ConfigurationException(s"configuration requested remote deployment for local-only Props at [$path]") } else try { try { // for consistency we check configuration of dispatcher and mailbox locally val dispatcher = system.dispatchers.lookup(props.dispatcher) system.mailboxes.getMailboxType(props, dispatcher.configurator.config) } catch { case NonFatal(e) ⇒ throw new ConfigurationException( s"configuration problem while creating [$path] with dispatcher [${props.dispatcher}] and mailbox [${props.mailbox}]", e) } val localAddress = transport.localAddressForRemote(address) val rpath = (RootActorPath(address) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements). withUid(path.uid) new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d)) } catch { case NonFatal(e) ⇒ throw new IllegalArgumentException(s"remote deployment failed for [$path]", e) } case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async) } }
分析lookupRemotes就会知道,这个函数其实就是在分析路径中/user后面的路径,根据对应的路径去deployer查找对应的配置。那么deployer具体是什么值呢?lookup又是做什么呢?在RemoteActorRefProvider定义中可以看到deployer是一个RemoteDeployer,RemoteDeployer的其他字段和方法不再具体分析,但覆盖的parseConfig很重要,简单来说它就是在读取配置文件的相关配置,然后给deploy传入RemoteScope或RemoteRouterConfig,都是在给scope赋值。
override def parseConfig(path: String, config: Config): Option[Deploy] = { super.parseConfig(path, config) match { case d @ Some(deploy) ⇒ deploy.config.getString("remote") match { case AddressFromURIString(r) ⇒ Some(deploy.copy(scope = RemoteScope(r))) case str if !str.isEmpty ⇒ throw new ConfigurationException(s"unparseable remote node name [${str}]") case _ ⇒ val nodes = immutableSeq(deploy.config.getStringList("target.nodes")).map(AddressFromURIString(_)) if (nodes.isEmpty || deploy.routerConfig == NoRouter) d else deploy.routerConfig match { case r: Pool ⇒ Some(deploy.copy(routerConfig = RemoteRouterConfig(r, nodes))) case _ ⇒ d } } case None ⇒ None } }
从这里可以看出,配置文件的优先级要高于编程传入的参数。配置读取并设置完成后,会执行以下Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) ⇒ b withFallback a),其实就是将配置文件和编程指定的参数进行合并,可以看出配置文件的优先级更高。但无论哪个配置生效对应deploy的scope都是RemoteScope。由于我们指定的address不是在本地,所以hasAddress(address)返回false,这样最终就创建了一个new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))实例。
其中rpath是类似这样的字符串akka.tcp://[email protected]:2553/remote/akka.tcp/[email protected]:2552/user/remote#-888292812。第一个地址是远程actor所在的地址,第二个地址是本地地址(也就是远程actor在本地的代理)。至此actorOf执行结束,好像也没有看到在远程部署actor的相关代码啊?惊不惊喜意不意外?哈哈,有点晕啊。
别着急,还记得makeChild函数么,它最后还调用了actor.start函数啊,那么RemoteActorRef是如何start的呢?
def start(): Unit = if (props.isDefined && deploy.isDefined) remote.provider.useActorOnNode(this, props.get, deploy.get, getParent)
很显然props和deploy都是优质的,会去调用remote.provider.useActorOnNode函数,很显然这个函数是属于RemoteActorRefProvider的。
/** * Using (checking out) actor on a specific node. */ def useActorOnNode(ref: ActorRef, props: Props, deploy: Deploy, supervisor: ActorRef): Unit = { log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, ref.path) // we don’t wait for the ACK, because the remote end will process this command before any other message to the new actor // actorSelection can't be used here because then it is not guaranteed that the actor is created // before someone can send messages to it resolveActorRef(RootActorPath(ref.path.address) / "remote") ! DaemonMsgCreate(props, deploy, ref.path.toSerializationFormat, supervisor) remoteDeploymentWatcher ! RemoteDeploymentWatcher.WatchRemote(ref, supervisor) }
这个函数也比较简单,它给两个actor发送了分别发送了一条消息。我们先来看第一个actor具体是什么。
def resolveActorRef(path: ActorPath): ActorRef = { if (hasAddress(path.address)) local.resolveActorRef(rootGuardian, path.elements) else try { new RemoteActorRef(transport, transport.localAddressForRemote(path.address), path, Nobody, props = None, deploy = None) } catch { case NonFatal(e) ⇒ log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) new EmptyLocalActorRef(this, path, eventStream) } }
它通过RootActorPath(ref.path.address) / "remote"这个路径创建了又一个RemoteActorRef,然后给他发送了一个DaemonMsgCreate(props, deploy, ref.path.toSerializationFormat, supervisor)消息。那么这个RootActorPath(ref.path.address) / "remote"路径对应的actor应该是什么呢?其实这应该是一个位于目标actor所在节点且名字是remote的actor,那么这个actor什么时候创建的呢?
其实远程部署有一个前提,就是目标节点的ActorSystem已经启动,那么就意味着RootActorPath(ref.path.address) / "remote"这个actor在远程节点已经启动,我们只需要找到是如何启动、在哪里启动、启动了哪个Actor,就知道是如何处理DaemonMsgCreate消息了。还记得RemoteActorRefProvider的init函数吗,其中有一段代码如下。
val internals = Internals( remoteDaemon = { val d = new RemoteSystemDaemon( system, local.rootPath / "remote", rootGuardian, remotingTerminator, _log, untrustedMode = remoteSettings.UntrustedMode) local.registerExtraNames(Map(("remote", d))) d }, transport = if (remoteSettings.Artery.Enabled) remoteSettings.Artery.Transport match { case ArterySettings.AeronUpd ⇒ new ArteryAeronUdpTransport(system, this) case ArterySettings.Tcp ⇒ new ArteryTcpTransport(system, this, tlsEnabled = false) case ArterySettings.TlsTcp ⇒ new ArteryTcpTransport(system, this, tlsEnabled = true) } else new Remoting(system, this))
这段代码中创建了一个RemoteSystemDaemon,这个daemon第二个参数是local.rootPath / "remote",这个值其实是与上文中的RootActorPath(ref.path.address) / "remote"相对应的。
/** * INTERNAL API */ @SerialVersionUID(1L) private[akka] final case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg /** * INTERNAL API * * Internal system "daemon" actor for remote internal communication. * * It acts as the brain of the remote that responds to system remote events (messages) and undertakes action. */ private[akka] class RemoteSystemDaemon( system: ActorSystemImpl, _path: ActorPath, _parent: InternalActorRef, terminator: ActorRef, _log: MarkerLoggingAdapter, val untrustedMode: Boolean) extends VirtualPathContainer(system.provider, _path, _parent, _log)
RemoteSystemDaemon定义的地方我们同时也找到了DaemonMsgCreate消息的定义,很显然RemoteSystemDaemon是可以处理DaemonMsgCreate消息的。从RemoteSystemDaemon的定义来看这是一个ActorRef。
其实在RemoteSystemDaemon创建之后还调用了local.registerExtraNames(Map(("remote", d))),这段代码又是用来做什么的呢?
/** * Higher-level providers (or extensions) might want to register new synthetic * top-level paths for doing special stuff. This is the way to do just that. * Just be careful to complete all this before ActorSystem.start() finishes, * or before you start your own auto-spawned actors. */ def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras
从官方注释来看,这是用来更高级别的provider或者extensions用来注册合成的、顶级路径,以便做特殊的工作。这必须在ActorSystem启动结束或者自动生成的actor自动之前完成成。这个函数很简单,就是把以name和对应的AcotRef注册到extraNames中去。也就是说我们把remote和其对应的RemoteSystemDaemon注册到了extraNames中,那extraNames有什么用呢?或者说什么时候用呢?
还记得在网络位置透明这篇博客中是如何介绍根据ActorPathString找到对应的actor的吗?序列化类最终调用了RemoteActorRefProvider的internalResolveActorRef把path转化成了对应的ActorRef。
/** * INTERNAL API: This is used by the `ActorRefResolveCache` via the * public `resolveActorRef(path: String)`. */ private[akka] def internalResolveActorRef(path: String): ActorRef = path match { case ActorPathExtractor(address, elems) ⇒ if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems) else { val rootPath = RootActorPath(address) / elems try { new RemoteActorRef(transport, transport.localAddressForRemote(address), rootPath, Nobody, props = None, deploy = None) } catch { case NonFatal(e) ⇒ log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) new EmptyLocalActorRef(this, rootPath, eventStream) } } case _ ⇒ log.debug("Resolve (deserialization) of unknown (invalid) path [{}], using deadLetters.", path) deadLetters }
很显然,远程ActorSystem收到的path中的address是属于该远程节点的,也就是说最终会调用local.resolveActorRef(rootGuardian, elems)
private[akka] def resolveActorRef(ref: InternalActorRef, pathElements: Iterable[String]): InternalActorRef = if (pathElements.isEmpty) { log.debug("Resolve (deserialization) of empty path doesn't match an active actor, using deadLetters.") deadLetters } else ref.getChild(pathElements.iterator) match { case Nobody ⇒ if (log.isDebugEnabled) log.debug( "Resolve (deserialization) of path [{}] doesn't match an active actor. " + "It has probably been stopped, using deadLetters.", pathElements.mkString("/")) new EmptyLocalActorRef(system.provider, ref.path / pathElements, eventStream) case x ⇒ x }
其实也就是调用ref.getChild查找ref的子actor,也就是在rootGuardian现在查找elems对应的actor,根据上下文这里的elems应该就是"remote",我们来看看rootGuardian是如何查找remote的。
override lazy val rootGuardian: LocalActorRef = new LocalActorRef( system, Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy), defaultDispatcher, defaultMailbox, theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { override def getParent: InternalActorRef = this override def getSingleChild(name: String): InternalActorRef = name match { case "temp" ⇒ tempContainer case "deadLetters" ⇒ deadLetters case other ⇒ extraNames.get(other).getOrElse(super.getSingleChild(other)) } }
首先需要知道rootGuardian是什么,它是一个LocalActorRef,并且重写了getParent、getSingleChild两个函数。那LocalActorRef.getChild又是如何实现的呢
override def getChild(names: Iterator[String]): InternalActorRef = { /* * The idea is to recursively descend as far as possible with LocalActor * Refs and hand over to that “foreign” child when we encounter it. */ @tailrec def rec(ref: InternalActorRef, name: Iterator[String]): InternalActorRef = ref match { case l: LocalActorRef ⇒ val next = name.next() match { case ".." ⇒ l.getParent case "" ⇒ l case any ⇒ l.getSingleChild(any) } if (next == Nobody || name.isEmpty) next else rec(next, name) case _ ⇒ ref.getChild(name) } if (names.isEmpty) this else rec(this, names) }
names不为空,而且就是“remote”,会去调用rec,rec函数中ref就是this,也就是LocalActorRef,所以会根据name依次查找,由于name就是“remote”,所以会调用getSingleChild,而getSingleChild的实现已经被覆盖。由于name是“remote”,所以会命中覆盖后的getSingleChild函数中case other分支。这个分支的逻辑就是,先去extraNames取值,如果没有查找到则调用super.getSingleChild。很明显在extraNames有"remote"对应的InternalActorRef,那就是RemoteSystemDaemon。
所以,本地actor给远程RootActorPath(ref.path.address) / "remote"路径下的actor发送的DaemonMsgCreate消息,就会被远程的RemoteSystemDaemon处理。
override def !(msg: Any)(implicit sender: ActorRef = Actor.noSender): Unit = try msg match { case message: DaemonMsg ⇒ log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address) message match { case DaemonMsgCreate(_, _, path, _) if untrustedMode ⇒ log.debug("does not accept deployments (untrusted) for [{}]", path) // TODO add security marker? case DaemonMsgCreate(props, deploy, path, supervisor) if whitelistEnabled ⇒ val name = props.clazz.getCanonicalName if (remoteDeploymentWhitelist.contains(name)) doCreateActor(message, props, deploy, path, supervisor) else { val ex = new NotWhitelistedClassRemoteDeploymentAttemptException(props.actorClass, remoteDeploymentWhitelist) log.error(LogMarker.Security, ex, "Received command to create remote Actor, but class [{}] is not white-listed! " + "Target path: [{}]", props.actorClass, path) } case DaemonMsgCreate(props, deploy, path, supervisor) ⇒ doCreateActor(message, props, deploy, path, supervisor) } case sel: ActorSelectionMessage ⇒ val (concatenatedChildNames, m) = { val iter = sel.elements.iterator // find child elements, and the message to send, which is a remaining ActorSelectionMessage // in case of SelectChildPattern, otherwise the actual message of the selection @tailrec def rec(acc: List[String]): (List[String], Any) = if (iter.isEmpty) (acc.reverse, sel.msg) else { iter.next() match { case SelectChildName(name) ⇒ rec(name :: acc) case SelectParent if acc.isEmpty ⇒ rec(acc) case SelectParent ⇒ rec(acc.tail) case pat: SelectChildPattern ⇒ (acc.reverse, sel.copy(elements = pat +: iter.toVector)) } } rec(Nil) } getChild(concatenatedChildNames.iterator) match { case Nobody ⇒ val emptyRef = new EmptyLocalActorRef(system.provider, path / sel.elements.map(_.toString), system.eventStream) emptyRef.tell(sel, sender) case child ⇒ child.tell(m, sender) } case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(this)) case TerminationHook ⇒ terminating.switchOn { terminationHookDoneWhenNoChildren() foreachChild { system.stop } } case AddressTerminated(address) ⇒ foreachChild { case a: InternalActorRef if a.getParent.path.address == address ⇒ system.stop(a) case _ ⇒ // skip, this child doesn't belong to the terminated address } case unknown ⇒ log.warning(LogMarker.Security, "Unknown message [{}] received by [{}]", unknown, this) } catch { case NonFatal(e) ⇒ log.error(e, "exception while processing remote command [{}] from [{}]", msg, sender) }
RemoteSystemDaemon是如何处理DaemonMsgCreate消息的呢?上面是源码,默认配置下whitelistEnabled是false,所以会调用doCreateActor。
private def doCreateActor(message: DaemonMsg, props: Props, deploy: Deploy, path: String, supervisor: ActorRef) = { path match { case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote" ⇒ // TODO RK currently the extracted “address” is just ignored, is that okay? // TODO RK canonicalize path so as not to duplicate it always #1446 val subpath = elems.drop(1) val p = this.path / subpath val childName = { val s = subpath.mkString("/") val i = s.indexOf('#') if (i < 0) s else s.substring(0, i) } val isTerminating = !terminating.whileOff { val parent = supervisor.asInstanceOf[InternalActorRef] val actor = system.provider.actorOf(system, props, parent, p, systemService = false, Some(deploy), lookupDeploy = true, async = false) addChild(childName, actor) actor.sendSystemMessage(Watch(actor, this)) actor.start() if (addChildParentNeedsWatch(parent, actor)) parent.sendSystemMessage(Watch(parent, this)) } if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address) case _ ⇒ log.debug("remote path does not match path from message [{}]", message) } }
抛开其他代码,我们会发现,其实最终调用了system.provider.actorOf创建了对应的actor。
DaemonMsgCreate(props, deploy, ref.path.toSerializationFormat, supervisor)
我们回过头看看DaemonMsgCreate是如何构造的,在结合上下文,就会发现,本地ActorSystem把props,deploy,actorpath,supervisor序列化发送给了远程节点,远程节点收到消息后,反序列化,然后调用provider.actorOf创建了对应的actor。但需要注意其中参数的值,props是远程节点发过来的,systemService是false,lookupDeploy是true,async是false。actor创建的具体过程不在分析,应该就是调用LocalActorRefProvider在本地创建了一个actor,然后把当前actor添加到RemoteSystemDaemon的监控列表中,最后调用actor.start,启动actor。
至此我们就在远程的ActorSystem创建了对应的actor,怎么样,涉及的逻辑是不是也比较清晰呢?简单来说就是把props、deploy等信息序列化后发送给远程ActorSystem下特定的actor(RemoteSystemDaemon),由该actor负责创建props对应的actor。当然了还需要在本地创建一个远程actor的代理(RemoteActorRef),以便与远程actor收发消息。