小李:小胖,12月06日是我爸的生日,我怕那一天忘了,应该怎么办啊,你到时能提醒我一下吗?非常感谢。
小胖:我也可能忘记啊,你再想想其它办法吧!
小李:那怎么办啊,这么重要的日子,我可不能忘记啊。
小胖:咦,你不是已经有手机了吗?我听说现在手机有日历功能,有定时提醒功能,可以试试看。
小李:对啊,我们要物尽其用啊。
五分钟后......
小李:好了,我刚刚测试了一下,果然可行,谢谢你啊,小胖,还是你聪明。
Scheduler
假如我们想在某个时刻或者某段时间处理某个任务,Akka中应该怎样处理呢?既然这个问题我们都已经想到了,那么Akka设计人员肯定想到了。别担心,Akka已经提供了Scheduler对象帮助我们实现定时调度功能。
Scheduler对象不是通过new出来的,而是需要依赖ActorSystem得到,它在整个ActorSystem中是单例的,是唯一的。Scheduler提供两种定时调度,用于不同的需求,总结如下:
调度API |
说明 |
scheduleOnce |
表示延迟一段时间后执行 |
scheduler |
表示延迟后定时重复执行,返回Cancellable对象,可用于取消该定时调度 |
参数分析:
Duration delay,延迟调度时间,可选值有day、hour、minute、second、millisecond等
ActorRef receiver,消息处理类
Object message,定时发送的消息体
ExecutionContext executor,线程调度
ActorRef sender,接受receiver消息返回值
关于这两种不同的调度方式,我们使用代码来区分它们。
scheduleOnce
首先我们创建一个定时消息处理类HandlerActor,
public class HandlerActor extends AbstractActor {
private final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(),this);
@Override
public Receive createReceive() {
return receiveBuilder().match(String.class, logger::info)
.matchAny((Object other) -> logger.info("其它未知消息:" + other))
.build();
}
}
现在,我们模拟100毫秒之后,给HandlerActor发送消息,如下:
ActorSystem system = ActorSystem.create("system");
ActorRef handlerActor = system.actorOf(Props.create(HandlerActor.class), "handlerActor");
//延迟100ms后,给handlerActor发送当前时间消息
system.scheduler().scheduleOnce(Duration.ofMillis(100),handlerActor, LocalDateTime.now(),system.dispatcher(),ActorRef.noSender());
当然,我们也可以这样写:
system.scheduler().scheduleOnce(Duration.ofMillis(100), new Runnable() {
@Override
public void run() {
handlerActor.tell(LocalDateTime.now(),ActorRef.noSender());
}
},system.dispatcher());
执行结果都一样,如下:
[INFO] [11/24/2018 11:04:02.685] [system-akka.actor.default-dispatcher-5] [akka://system/user/handlerActor] 其它未知消息:2018-11-24T11:04:02.683
scheduleOnce延迟100s后,只发送一次当前时间给handlerActor,后续不会再执行。
scheduler
我们还是沿用上述HandlerActor继续处理消息,现在我们想每隔1s后打印当前时间,修改如下:
public Receive createReceive() {
return receiveBuilder().matchEquals("time", s -> System.out.println(LocalDateTime.now())).build();
}
执行调度:
system.scheduler().schedule(Duration.ZERO, Duration.ofSeconds(1), handlerActor, "time", system.dispatcher(), ActorRef.noSender());
执行结果:
2018-11-24T11:37:00.864
2018-11-24T11:37:01.869
2018-11-24T11:37:02.879
2018-11-24T11:37:03.878
2018-11-24T11:37:04.878
2018-11-24T11:37:05.883
2018-11-24T11:37:06.861
大家可以看到,每隔1s就打印出了当前时间,当我们不在需要该输出的时候,可以使用cancel方法停止该定时调度,cancel不会停止当前正在调度的任务,如下:
schedule.cancel();
大家可以使用sleep()方法模拟调度时长,自行调试。
在Actor系统启动后,会读取akka.scheduler.implementation这个配置项,默认采用LightArrayRevolverScheduler,源配置(reference.conf中)如下:
# This setting selects the timer implementation which shall be loaded at
# system start-up.
# The class given here must implement the akka.actor.Scheduler interface
# and offer a public constructor which takes three arguments:
# 1) com.typesafe.config.Config
# 2) akka.event.LoggingAdapter
# 3) java.util.concurrent.ThreadFactory
implementation = akka.actor.LightArrayRevolverScheduler
这代表我们可以实现自己的定时调度,通过extends AbstractScheduler 就可以快速实现。