Actor之间发送消息,并不是直接通过API的方式直接实现,而是依赖于自己的邮箱(mailbox)。这就好比在旧社会中,大家进行信件交流,发件人把自己的信件放到自己的邮箱中,送件人每天早上从你的邮箱中取出信件,在这个过程中,邮箱就起到了中间存储作用,所有的信件都会放在里面。在Actor系统中,邮箱是一个队列结构,默认遵循FIFO,可以根据我们的需要自定义。
Actor可以接受其它多个Actor的消息,默认的消息顺序没有规律,无法保证某个消息一定排在另外一个消息的前面,但是对于同一个Actor发送的多个消息,一定是按照先后顺序接受。例如:actorA给ActorB发送message1和message2,那么ActorB接受消息的顺序一定是message1->message2,否则,执行任务岂不是乱套了。
邮箱的分类
在Akka中,邮箱主要分为两种类型:无界和有界。
类型 |
描述 |
无界(Unbounded) |
邮箱的容量没有上限,大部分无界邮箱都是无阻塞的。也就是说当消息的数量过于庞大时,也不会阻塞消息处理。 |
有界(Bounded) |
邮箱的容量有限制,当消息数量达到邮箱数量时,会根据mailbox-push-timeout-time(消息入队超时时间)的配置进行阻塞(或非阻塞),负数则表示无限超时(慎用)。 |
邮箱的配置
Akka系统中,提供了邮箱配置项,我们可以根据自己的需要进行配置,系统给我们提供了默认配置,例如:
akka.actor.default-mailbox{
mailbox-type="akka.dispatch.UnboundedMailbox"
mailbox-capacity=1000
mailbox-push-timeout-time=10s
}
参数分析:
mailbox-type:邮箱类型,默认采用无界邮箱UnboundedMailbox,表示邮箱队列大小不受限制。
mailbox-capacity:邮箱容量,定义有界邮箱(BoundedMail)的大小,该值只能为正数。
mailbox-push-timeout-time:入队超时时间,也就是消息进入队列的超时时限。
UnboundedMailbox主要依赖java.util.concurrent.ConcurrentLinkedQueue队列来实现,它是一个基于链表的队列结构,遵循FIFO,该队列结构采用CAS无锁算法保证多线程的安全,性能更佳(大家都懂得,锁会导致系统性能和吞吐量的下降,这里就不做过多介绍了)。
优先级邮箱
在实际的项目中,根据业务逻辑的需要,消息处理可能存在一定的优先级,那么我们就需要使用PriorityMailbox。下面我们就来看看优先级邮箱怎样使用:
目标:历史美女按照你喜欢的规则输出
第一步:extends PriorityMailbox,创建自己规则的优先级邮箱
public class PriorityMailbox extends UnboundedStablePriorityMailbox {
public PriorityMailbox(ActorSystem.Settings settings, Config config) {
super(new PriorityGenerator() {
//返回值越小,优先级越高
@Override
public int gen(Object message) {
if ("貂蝉".equals(message)) {
return 0;
} else if ("西施".equals(message)) {
return 1;
} else if ("大乔".equals(message)) {
return 2;
} else {
return 3;
}
}
});
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("system", ConfigFactory.load("mailbox"));
ActorRef priorityActor = system.actorOf(Props.create(PriorityMailboxActor.class).withMailbox("priority-mailbox"), "priorityActor");
String[] messages={"李四","大乔","貂蝉","西施"};
for(String message:messages){
priorityActor.tell(message,ActorRef.noSender());
}
}
}
class PriorityMailboxActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().matchAny(System.out::println).build();
}
}
在该构造方法中,我们new了一个ProrityGenerator对象,并重写gen方法,该方法就是我们定义自己优先级规则的地方,返回值越小,优先级越高。
第二步:配置邮箱,如下:
mailbox.conf
priority-mailbox{
mailbox-type="com.release.util.akka.mailbox.PriorityMailbox"
}
第三步:让Actor使用withMailbox关联该邮箱类型
ActorSystem system = ActorSystem.create("system", ConfigFactory.load("mailbox"));
ActorRef priorityActor = system.actorOf(Props.create(PriorityMailboxActor.class).withMailbox("priority-mailbox"), "priorityActor");
String[] messages={"李四","大乔","貂蝉","西施"};
for(String message:messages){
priorityActor.tell(message,ActorRef.noSender());
}
结果:
貂蝉
西施
大乔
李四
通过输出结果,大家应该看到我们的优先级配置已经起了作用。
控制消息
在大部分的需求中,优先级邮箱已经可以满足,但是当我们需要某个消息每次都拥有最高的优先级时,就需要额外的邮箱支持了。Akka提供了ControAwareMailbox邮箱类型,可以让实现了ControlMessage接口的消息拥有最高优先级。
第一步:定义消息Message,实现ControlMessage
public class ControlMessageBean implements ControlMessage {
private final String msg;
public ControlMessageBean(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return this.msg;
}
}
第二步:配置ControAwareMailbox邮箱
control-mailbox{
mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
}
第三步:通过关联withMailBox方式关联该邮箱
public class ControlMessageActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().matchAny(System.out::println).build();
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("system", ConfigFactory.load("mailbox"));
ActorRef controlActor = system.actorOf(Props.create(ControlMessageActor.class).withMailbox("control-mailbox"), "controlActor");
Object[] messages={"张三","李四",new ControlMessageBean("皇上"),new ControlMessageBean("皇后"),"王五"};
for(Object message:messages){
controlActor.tell(message,ActorRef.noSender());
}
}
}
输出结果:
皇上
皇后
张三
李四
王五
大家可以发现,ControlMessageBean定义的消息总会被先处理,说明控制消息已经起到作用,实现ControlMessage 消息,在ControAwareMailbox邮箱中拥有最高优先级。
邮箱使用方式
在上述示例中,我们都是通过withMailbox代码方式来关联邮箱类型,其实,邮箱使用还有许多其它的方式,例如:
- 直接配置Actor邮箱
- 将邮箱配置到dispatcher中,然后让Actor关联该dispatcher
直接配置Actor邮箱
我们可以在配置文件中,指定某个Actor的邮箱类型,如下:
akka.actor.deployment{
/priorityActor{
mailbox=priority-mailbox
}
}
上述配置中,priorityActor是创建Actor所指定的名称,,mailbox是我们自定义的优先级邮箱名称。
配置dispatcher邮箱
在消息调度时,dispatcher会依赖Actor邮箱,所以我们可以把邮箱配置dispatcher上,然后让Actor使用该dispatcher即可。
priority-dispatcher{
type=Dispatcher
mailbox-type="com.release.util.akka.mailbox.PriorityMailbox"
}
Actor使用该dispatcher,如下:
ActorRef configActor = system.actorOf(Props.create(PriorityActor.class).withDispatcher("priority-dispatcher"), "priorityActor");
RequiresMessageQueue接口
除了上述方法给某个Actor指定邮箱外,我们还可以让Actor实现RequiresMessageQueue接口,让Actor自动拥有特定类型的邮箱。该接口提供泛型,我们可以给定某个邮箱的语义。关于语义和邮箱的关系,我们可以从akka包下找到reference.conf文件,里面定义了相关映射,如下:
我们根据里面的配置,总结如下表格:
语义接口 |
邮箱类型 |
akka.dispatch.UnboundedMessageQueueSemantics |
akka.dispatch.UnboundedMailbox |
akka.dispatch.BoundedMessageQueueSemantics |
akka.dispatch.BoundedMailbox |
akka.dispatch.DequeBasedMessageQueueSemantics |
akka.dispatch.UnboundedDequeBasedMailbox |
akka.dispatch.UnboundedDequeBasedMessageQueueSemantics |
akka.dispatch.UnboundedDequeBasedMailbox |
akka.dispatch.BoundedDequeBasedMessageQueueSemantics |
akka.dispatch.BoundedDequeBasedMailbox |
akka.dispatch.MultipleConsumerSemantics |
akka.dispatch.UnboundedMailbox |
akka.dispatch.ControlAwareMessageQueueSemantics |
akka.dispatch.UnboundedControlAwareMailbox |
akka.dispatch.UnboundedControlAwareMessageQueueSemantics |
akka.dispatch.UnboundedControlAwareMailbox |
akka.dispatch.BoundedControlAwareMessageQueueSemantics |
akka.dispatch.BoundedControlAwareMailbox |
akka.event.LoggerMessageQueueSemantics |
akka.event.LoggerMailboxType |
如果我们现在想要实现某一类消息优先级发送,需求变得非常简单,示例如下:
public class RequiresMsgActor extends AbstractActor implements RequiresMessageQueue<UnboundedControlAwareMessageQueueSemantics> {
@Override
public Receive createReceive() {
return receiveBuilder().matchAny(System.out::println).build();
}
}
如果我们给RequiresMsgActor发送消息,实现ControlMessage的消息将被优先处理。
自定义邮箱
或许,在实际项目中,大家需要做一些业务化的需求,内置邮箱不能满足业务的需要,这时,我们就可以自定义邮箱。
自定义邮箱过程:首先定义一个邮箱队列(实现MessageQueue接口),然后定义邮箱类型并指定邮箱队列,最后配置邮箱给Actor。下面我们详细说一下整个流程。
第一步:定义MailBoxQueue类,实现MessageQueue接口:
public class MailBoxQueue implements MessageQueue {
/**
* 自定义邮箱队列
*/
private Queue<Envelope> queue = new ConcurrentLinkedQueue<>();
/**
* 投递消息,消息入队
*/
@Override
public void enqueue(ActorRef actorRef, Envelope envelope) {
queue.offer(envelope);
}
/**
* 取出消息,消息出队
* @return Envelope
*/
@Override
public Envelope dequeue() {
return queue.poll();
}
/**
* 返回当前消息数量
* @return int
*/
@Override
public int numberOfMessages() {
return queue.size();
}
/**
* 当前是否存在消息
* @return true or false
*/
@Override
public boolean hasMessages() {
return !queue.isEmpty();
}
/**
* 消息传递过程中,Actor出现故障,投递的消息将进入到死信队列
* @param actorRef actor引用
* @param deadLetters 死信队列
*/
@Override
public void cleanUp(ActorRef actorRef, MessageQueue deadLetters) {
for (Envelope e : queue) {
deadLetters.enqueue(actorRef, e);
}
}
}
第二步:定义邮箱类型,实现MailboxType接口并且在create方法中指定自定义的MailBoxQueue队列。
public class MailBoxType implements MailboxType, ProducesMessageQueue<MailBoxQueue> {
/**
* 定义邮箱类型必须给出一个公开的,带有Settings、Config这两个类型的构造函数
* 否则会抛出java.lang.NoSuchMethodException: com.release.util.akka.mailbox.MailBoxType.<init>(akka.actor.ActorSystem$Settings, com.typesafe.config.Config)
*
* @param settings
* @param config
*/
public MailBoxType(ActorSystem.Settings settings, Config config) {
}
@Override
public MessageQueue create(Option<ActorRef> option, Option<ActorSystem> option1) {
return new MailBoxQueue();
}
}
第三步:配置邮箱类型
mySelf-mailbox{
mailbox-type= "com.release.util.akka.mailbox.MailBoxType"
}
现在我们就可以把该类型的邮箱指定给我们的Actor使用了,当然,在实际项目中,我们的业务可能复杂多样,并不像这里简单的定义邮箱类型,不过流程都是一样的,只是说对消息的入队、出队等处理方式复杂化。大家可以按照上述使用邮箱的方式,进行测试。
总结
邮箱分为无界和有界两种,前者表示无容量限制,后者表示有容量限制,用来存储消息。Actor都拥有一个邮箱,如果我们没有给Actor配置邮箱,将采用无界默认邮箱。Actor系统已经内置了很多邮箱类型,如果在项目中,这些邮箱不能满足需要,我们可以通过实行MailboxType和MessageQueue自定义邮箱。