如果一个Actor在执行过程中发生意外,比如没有处理某些异常,导致出错,那么这个时候应该怎么办呢?
在Akka框架内,父Actor对子Actor进行监督,监控子Actor的行为是否有异常。大体上,监控策略分为两种:
1. OneForOneStrategy策略:父Actor只会对出问题的子Actor进行处理。比如重启或停止。Akka的默认策略,推荐使用。
2. AllForOneStrategy策略:父Actor会对出问题的子Actor以及它所有的兄弟节点都进行处理。只适用于各个Actor联系非常紧密的场景,如果多个Actor只要有一个失败,则宣布整个任务失败的情况。
在指定的策略中,我们可以对Actor的失败情况进行相应的处理。如我们可以无视这个错误,或重启这个Actor,或可以让这个Actor彻底停止工作。要指定这些监督行为,只需要指定一个自定义的监督策略即可。
实例
package com.bzb.java8.akka;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.UntypedActor;
import akka.japi.Function;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
/**
* @author bzb
* @Description: 父Actor,作为所有子Actor的监督者
* @date 2018/9/14 15:17
*/
public class Supervisor extends UntypedActor {
// 指定监督策略, 运行Actor遇到错误后,则会执行如下指令。在1分钟内重试3次,如果超过这个频率,则会杀死这个Actor。
private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create(1, TimeUnit.MINUTES),
new Function<Throwable, SupervisorStrategy.Directive>() {
@Override
public SupervisorStrategy.Directive apply(Throwable t) throws Exception {
if (t instanceof ArithmeticException) { // 算术异常,如除数为0,
System.out.println("meet ArithmeticException, just resume");
return SupervisorStrategy.resume(); // 继续指定这个Actor,不做任何处理
} else if (t instanceof NullPointerException) { // 空指针异常
return SupervisorStrategy.restart(); // 重启这个Actor
} else if (t instanceof IllegalArgumentException) { // 非法参数异常
return SupervisorStrategy.stop(); // 直接停止这个Actor
} else {
// 其他异常,直接向上抛出,有更顶层的Actor处理
return SupervisorStrategy.escalate();
}
}
});
// 覆盖父类的supervisorStrategy(),设置自定义的监督策略
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof Props) {
// 新建一个名为restartActor的子Actor,这个Actor就有当前的Supervisor进行监督。
// 当Supervisor接受到一个Props对象时,就会根据这个Props配置生成一个restart。
getContext().actorOf((Props) msg,"restartActor");
} else {
unhandled(msg);
}
}
}
package com.bzb.java8.akka;
import akka.actor.UntypedActor;
import scala.Option;
/**
* @author bzb
* @Description:
* @date 2018/9/14 15:39
*/
public class RestartActor extends UntypedActor {
public enum Msg {
DONE, RESTART;
}
// 以下都是生命周期的回调接口。目的是更好的观察Actor的活动情况。
@Override
public void preStart() throws Exception {
System.out.println("preStart hashcode:" + this.hashCode());
}
@Override
public void postStop() throws Exception {
System.out.println("postStop hashcode:" + this.hashCode());
}
// 重启后,在新的实例上调用postRestart钩子方法。新的Actor实例代替老的实例进行工作。preStart()方法是在postRestart()方法中调用的
@Override
public void postRestart(Throwable reason) throws Exception {
super.postRestart(reason);
System.out.println("postRestart hashcode:" + this.hashCode());
}
// 重启之前,在老的实例上会回调preRestart方法。老的实例会被回收。
@Override
public void preRestart(Throwable reason, Option<Object> message) throws Exception {
System.out.println("preRestart hashcode:" + this.hashCode());
}
@Override
public void onReceive(Object msg) throws Exception {
if (msg == Msg.DONE) {
// 停止当前Actor
getContext().stop(getSelf());
} else if (msg == Msg.RESTART) {
System.out.println(((Object) null).toString()); // 模拟空指针异常
double a = 0 / 0; // 算术异常
} else {
unhandled(msg);
}
}
}
package com.bzb.java8.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.typesafe.config.ConfigFactory;
/**
* @author bzb
* @Description:
* @date 2018/9/14 15:52
*/
public class CustomStrategy {
public static void customStrategy(ActorSystem system) {
// 创建监督者Actor
ActorRef supervisor = system.actorOf(Props.create(Supervisor.class), "Supervisor");
// 向supervisor发送要给RestartActor的Props,这个消息会是的supervisor创建RestartActor。
supervisor.tell(Props.create(RestartActor.class), ActorRef.noSender());
// 选中RestartActor实例
ActorSelection sel = system.actorSelection("akka://lifecycle/user/Supervisor/restartActor");
// 向RestartActor发送100个RESTART消息,这会是的RestartActor抛出空指针异常。
for (int i = 0; i < 100; i++) {
sel.tell(RestartActor.Msg.RESTART, ActorRef.noSender());
}
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("lifecycle", ConfigFactory.load("samplehello.conf"));
customStrategy(system);
}
}
选择Actor
在在一个ActorSystem中,可能存在大量的Actor。如何才能有效地对大量Actor进行批量的管理和通信呢?Akka为我们提供了一个ActorSelection类,用来批量进行消息发送。
for(int i = 0;i < WORDER_COUNT; i++){
workers.add(system.actorOf(Props.create(MyWorker.class,i), "worker_"+i));
}
// 通过通配符,选择所有满足条件的Actor
ActorSelection selection = getContext().actorSelection("/user/worker_*");
selection.tell(5, getSelf()); // 通过这selection实例,可以批量的发送消息。