akka中,父actor可以定义supervisorStartegy来实现对子actor的异常监管应对策略。
* override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
* case _: ArithmeticException => Resume
* case _: NullPointerException => Restart
* case _: IllegalArgumentException => Stop
* case _: Exception => Escalate
* }
以上为一个监管策略的具体实现,可以看到具体的异常应对策略有Resume,Restart,Stop和Escalate。
当父actor生成子actor的时候,将会通过在子actor中的parent记录父actor来传递异常信息。
下面是具体的监管实现。
在子actor处理消息的invoke()函数中,将会捕获在receive()函数中抛出的异常,并在handleInvokeFailure()函数中进行处理。
final def invoke(messageHandle: Envelope): Unit = {
val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
try {
currentMessage = messageHandle
if (influenceReceiveTimeout)
cancelReceiveTimeout()
messageHandle.message match {
case _: AutoReceivedMessage => autoReceiveMessage(messageHandle)
case msg => receiveMessage(msg)
}
currentMessage = null // reset current message after successful invocation
} catch handleNonFatalOrInterruptedException { e =>
handleInvokeFailure(Nil, e)
} finally {
// Schedule or reschedule receive timeout
checkReceiveTimeout(reschedule = influenceReceiveTimeout)
}
}
在handleInvokeFailure()函数中,将会直接将错误信息封装成Failed类型的系统消息投递到parent父actor的系统消息队列中。
父actor将会通过读取系统消息队列的时候读取到子actor中发生的异常。
在systemInvoke中的invokeAll()函数中将会进行具体处理。
@tailrec
def invokeAll(messages: EarliestFirstSystemMessageList, currentState: Int): Unit = {
val rest = messages.tail
val message = messages.head
message.unlink()
try {
message match {
case message: SystemMessage if shouldStash(message, currentState) => stash(message)
case f: Failed => handleFailure(f)
case DeathWatchNotification(a, ec, at) => watchedActorTerminated(a, ec, at)
case Create(failure) => create(failure)
case Watch(watchee, watcher) => addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) => remWatcher(watchee, watcher)
case Recreate(cause) => faultRecreate(cause)
case Suspend() => faultSuspend()
case Resume(inRespToFailure) => faultResume(inRespToFailure)
case Terminate() => terminate()
case Supervise(child, async) => supervise(child, async)
case NoMessage => // only here to suppress warning
}
} catch handleNonFatalOrInterruptedException { e =>
handleInvokeFailure(Nil, e)
}
val newState = calculateState
// As each state accepts a strict subset of another state, it is enough to unstash if we "walk up" the state
// chain
val todo = if (newState < currentState) rest.reversePrepend(unstashAll()) else rest
if (isTerminated) sendAllToDeadLetters(todo)
else if (todo.nonEmpty) invokeAll(todo, newState)
}
在这里,之前封装的Failed事件将会在handleFailure()函数中相应进行处理。
在handleFailure()函数中将会根据之前定义的监管策略来对此处取得的异常进行相应的处理。
final protected def handleFailure(f: Failed): Unit = {
currentMessage = Envelope(f, f.child, system)
getChildByRef(f.child) match {
/*
* only act upon the failure, if it comes from a currently known child;
* the UID protects against reception of a Failed from a child which was
* killed in preRestart and re-created in postRestart
*/
case Some(stats) if stats.uid == f.uid =>
if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause
case Some(stats) =>
publish(
Debug(
self.path.toString,
clazz(actor),
"dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")"))
case None =>
publish(
Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child))
}
}
在这里,当配置的应对策略是Escalate的时候,将会直接false,导致会直接将这个异常抛到更加高一层的actor中进行处理。