activemq是如何实现消息分组的

版权声明:本文为博主原创文章,支持转载,但转载时请务必在明显位置,给出原文连接。 https://blog.csdn.net/john1337/article/details/81912376

activemq的消息分组是一个很有用的特性,首先需要说明的是该特性是针对queue的,对topic无感!

(1)入题

activemq的消息分组实现的功能就是使得同一个消息生产者产生的消息被同一个消费者消费,这样可以保证消费消息的顺序与生产消息的顺序一致,在这个功能上,有人可能会说使用consumer的exclusive特性以及消息selector都可以实现这个功能,是的如果没有其他不同的话那这个特性也就没有存在的必要了,下面进入讲述一下这三个特性的不同点:

1.消息过滤特性selector最大的不足在于如果该消费者down掉了,那么将没有消费者来消费这些消息(只有重新启动该消费者)

2.exclusive特性也可以实现只有一个消费者来消费某个queue上的消息,但是处理细度不足,无法处理消息生产者生产多种JMSXGroupID的消息

3.最后就是消息分组特性了,这是activemq提供的一种细粒度筛选消息的方式

(2)实现原理

最后activemq消息分组是通过JMSXGroupID、JMSXGroupSeq两个消息属性来完成的,同一个JMSXGroupID的消息会被发送给同一个consumer,除非该consumer挂掉,特别需要注意的是JMSXGroupSeq为-1时将会导致broker重新进行负载均衡,具体实现原理看下org.apache.activemq.broker.region.Queue的doActualDispatch方法相关代码:

                if (!fullConsumers.contains(s)) {
                    if (!s.isFull()) {
                        if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
                            // Dispatch it.
                            s.add(node);
                            target = s;
                            break;
                        }
                    } else {
                        // no further dispatch of list to a full consumer to
                        // avoid out of order message receipt
                        fullConsumers.add(s);
                        LOG.trace("Subscription full {}", s);
                    }
                }

    protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
        boolean result = true;
        // Keep message groups together.
        String groupId = node.getGroupID();
        int sequence = node.getGroupSequence();
        if (groupId != null) {

            MessageGroupMap messageGroupOwners = getMessageGroupOwners();
            // If we can own the first, then no-one else should own the
            // rest.
            if (sequence == 1) {
                assignGroup(subscription, messageGroupOwners, node, groupId);
            } else {

                // Make sure that the previous owner is still valid, we may
                // need to become the new owner.
                ConsumerId groupOwner;

                groupOwner = messageGroupOwners.get(groupId);
                if (groupOwner == null) {
                    assignGroup(subscription, messageGroupOwners, node, groupId);
                } else {
                    if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
                        // A group sequence < 1 is an end of group signal.
                        if (sequence < 0) {
                            messageGroupOwners.removeGroup(groupId);
                            subscription.getConsumerInfo().setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1);
                        }
                    } else {
                        result = false;
                    }
                }
            }
        }

        return result;

    }

打完收工~~

猜你喜欢

转载自blog.csdn.net/john1337/article/details/81912376