ActiveMQ顺序消费消息+消息分组

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qincidong/article/details/89156732

简介

Queue中的消息是按照顺序发送给Consumers的。然而,当你有多个Consumer同时从相同的Queue提取消息时,顺序将不能得到保证。因为这些消息时被多个线程并发的处理。但是,有时候保证消息的顺序是很重要的。例如,你可能不希望插入订单操作结束之前执行更新订单的操作。那么我们可以通过Exclusive Consumer和Message Groups来实现这一目的。

独有消费者

从ActiveMQ4.X版本开始支持ExclusiveConsumer(或者说是Exclusive Queues)。Broker会从多个Consumer中挑选一个Consumer来处理所有的消息,从而保证消息的有序处理。如果这个Consumer失效,那么Broker会自动切换到其他的Consumer。

可以通过Destination的Option来创建一个Exclusive Consumer,如下:

queue = new ActiveMQQueue("Test.Queue?consumer.exclusive=true");
consumer = session.createConsumer(queue);

另外,还可以给Consumer设置优先级,以便针对网络情况进行优化。如下:

queue = new ActiveMQQueue("Test.Queue?consumer.exclusive=true&consumer.priority=10");

消息分组

从Apache官方文档的话说,是Exclusive Consumer功能的增强。逻辑上,可以看成是一种并发的Exclusive Consumer。JMS消息属性JMXGroupID被用来区分Message Group。Message Groups特性保证所有具有相同JMSGroupID的消息会被分发到相同的Consumer(只要这个Consumer保持Active)。另一方面,Message Groups也是一种负载均衡的机制。

在一个消息被分发到Consumer前,Broker会检查消息的JMSGroupID属性。如果存在,那么broker会检查是否有某个Consumer拥有这个Message Group。如果没有,那么broker会选择一个Consumer,并将它关联到这个Message Group。此后,这个Consumer会接收这个Message Group的所有消息。直到:

  • Consumer被关闭。
  • Message Group被关闭。通过发送一个消息,并设置这个消息的JMSXGroupSeq为-1.

从4.1版本开始,ActiveMQ支持一个布尔字段JMSXGroupFirstForConsumer 。当某个message group的第一个消息被发送到consumer的时候,这个字段被设置。如果客户使用failover transport连接到broker。在由于网络问题等造成客户重新连接到broker的时候,相同message group的消息可能会被分发到不同与之前的consumer,因此JMSXGroupFirstForConsumer字段也会被重新设置。

创建一个Message Groups

创建一个Message Groups,只需要在Message对象上设置属性即可。如下:

Message message = session.createTextMessage("hello,world");
message.setStringProperty("JMSXGroupID","GroupA");
...
producer.send(message);

关闭一个Message Groups

关闭一个Message Group,也只需要在Message对象上设置相应的属性即可。如下:

message.setStringProperty("JMSXGroupID","GroupA");
message.setIntProperty("JMSXGroupSeq", -1);

猜你喜欢

转载自blog.csdn.net/qincidong/article/details/89156732