activemq源码笔记(二)

activemq源码分析(二)——consume接收消息
先看看本博客把consumer端分析后完整的activemq流程图
activemq完整流程
前面分析了一篇博客关于producer如何生产消息:activemq源码笔记(一),最终还是没有找到与ack相关的内容,因为ack的提交逻辑主要在消费者。本篇博客继续跟踪消费者消费消息的源码。
先看看代码

 		//1、创建工厂连接对象,需要制定ip和端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用连接对象创建会话(session)对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
        Queue queue = session.createQueue("test-queue");
        //使用session创建到达queue的consumer
        MessageConsumer consumer = session.createConsumer(queue);
                //为consumer添加消息处理方法---异步等待服务器推
        consumer.setMessageListener((message)->{
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println(textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
        Message msg = consumer.receive();//同步,消费者主动拉
        session.close();
        connection.close();

前面关于到创建queue的都跟踪过来,接下来看看createConsumer方法,其使用session的id与内部的consume序列号生成器(long类型自增)生成一个ConsumerId对象

    public MessageConsumer createConsumer(Destination destination) throws JMSException {
        return this.createConsumer(destination, (String)null);
    }
    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
        return this.createConsumer(destination, messageSelector, false);
    }
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
        return this.createConsumer(destination, messageSelector, noLocal, (MessageListener)null);
    }
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
        this.checkClosed();
        if (destination instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)destination;
            return customDestination.createConsumer(this, messageSelector, noLocal);
        } else {
            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
            int prefetch = false;
            int prefetch;
            if (destination instanceof Topic) {
                prefetch = prefetchPolicy.getTopicPrefetch();
            } else {
                prefetch = prefetchPolicy.getQueuePrefetch();
            }

            ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
            return new ActiveMQMessageConsumer(this, this.getNextConsumerId(), activemqDestination, (String)null, messageSelector, prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, this.isAsyncDispatch(), messageListener);
        }
    }
    
    protected ConsumerId getNextConsumerId() {
        return new ConsumerId(this.info.getSessionId(), this.consumerIdGenerator.getNextSequenceId());
    }

最终构造一个ActiveMQMessageConsumer的对象

	//ActiveMQMessageConsumer.class
	 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync, MessageListener messageListener) throws JMSException {
			//省略了一些异常代码

            this.session = session;
            this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
            if (this.redeliveryPolicy == null) {
                this.redeliveryPolicy = new RedeliveryPolicy();
            }
			//省略部分此处不需要关注代码
            this.info = new ConsumerInfo(consumerId);
            this.info.setClientId(this.session.connection.getClientID());
            this.info.setDestination(dest);
            this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() && !this.info.isBrowser();
            this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
            if (messageListener != null) {
                this.setMessageListener(messageListener);
            }
            this.session.addConsumer(this);
            this.session.syncSendPacket(this.info);
            if (session.connection.isStarted()) {
                this.start();
            }
    }
	//ActiveMQSession.class
    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
        this.consumers.add(consumer);
        if (consumer.isDurableSubscriber()) {
            this.stats.onCreateDurableSubscriber();
        }

        this.connection.addDispatcher(consumer.getConsumerId(), this);
    }

可以看到主要是一些参数的绑定,将ActiveMQSession聚合到了这个ActiveMQMessageConsumer里面,然后又反过来把这个Consumer添加到ActiveMQSession的Consumer集合里(继续吐槽,又开始整循环引用),此处添加Consumer方法中还将这个consumer的id与当前session的对应关系作为一个Dispatcher加入ActiveMQConnection对象的Dispatcher集合,也就是说ActiveMQSession也是一个ActiveMQDispatcher

	//ActiveMQConnection.class
    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
        this.dispatchers.put(consumerId, dispatcher);
    }

最终调用ActiveMQMessageConsumer的start方法

    public void start() throws JMSException {
        if (!this.unconsumedMessages.isClosed()) {
            this.started.set(true);
            this.unconsumedMessages.start();
            this.session.executor.wakeup();
        }
    }

在start方法里,先是调用了unconsumedMessages的start再调用session的executor的wakeup,unconsumedMessages中没有开启线程先看看executor

	public void wakeup() {
        if (!this.dispatchedBySessionPool) {
            if (this.session.isSessionAsyncDispatch()) {
                try {
                    TaskRunner taskRunner = this.taskRunner;
                    if (taskRunner == null) {
                        synchronized(this) {
                            if (this.taskRunner == null) {
                                if (!this.isRunning()) {
                                    return;
                                }

                                this.taskRunner = this.session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: " + this.session.getSessionId());
                            }

                            taskRunner = this.taskRunner;
                        }
                    }

                    taskRunner.wakeup();
                } catch (InterruptedException var5) {
                    Thread.currentThread().interrupt();
                }
            } else {
                while(true) {
                    if (this.iterate()) {
                        continue;
                    }
                }
            }
        }

    }

executor的方法中判断当前是否以及有创建TaskRunner,没有会去创建一个TaskRunner和调用其wakeup方法,再来看看
TaskRunner

	public TaskRunner createTaskRunner(Task task, String name) {
        this.init();
        ExecutorService executor = (ExecutorService)this.executorRef.get();
        return (TaskRunner)(executor != null ? new PooledTaskRunner(executor, task, this.maxIterationsPerRun) : new DedicatedTaskRunner(task, name, this.priority, this.daemon));
    }

最终使用传进来的ActiveMQSessionExecutor对象作为Task去创建一个TaskRunner,

	//DedicatedTaskRunner.class
	public DedicatedTaskRunner(final Task task, String name, int priority, boolean daemon) {
        this.task = task;
        this.thread = new Thread(name) {
            public void run() {
                try {
                    DedicatedTaskRunner.this.runTask();
                } finally {
                    DedicatedTaskRunner.LOG.trace("Run task done: {}", task);
                }

            }
        };
        this.thread.setDaemon(daemon);
        this.thread.setName(name);
        this.thread.setPriority(priority);
        this.thread.start();
    }

task对象有一个线程成员,在构造时开启了运行,需要留意runTask及wakeup方法

    public void wakeup() throws InterruptedException {
        synchronized(this.mutex) {
            if (!this.shutdown) {
                this.pending = true;
                this.mutex.notifyAll();
            }
        }
    }
	final void runTask() {
			//为了方便省略部分处理代码
	        while(true) {
	        	synchronized(this.mutex) {
	               this.pending = false;
	               if (this.shutdown) {
	                   break;
	               }
	            }
	            if (this.task.iterate()) {
	                continue;
	            }
	            synchronized(this.mutex) {
	               if (!this.shutdown) {
	                  while(true) {
	                     if (this.pending) {
	                          continue;
	                     }
	                     this.mutex.wait();
	                  }
	            	}
	            }
	     }
	}

主要逻辑就是,循环去调用task的iterate方法,如果成功就继续,失败就进入阻塞等待直到其pengind标志被取消(被调用了wakeup方法),继续进入下一轮的循环调用task的iterate方法,所以主要的业务处理应该就在iterate中,前面讲到task是ActiveMQSessionExecutor,查看其源码

	//ActiveMQSessionExecutor.class
	public boolean iterate() {
        Iterator var1 = this.session.consumers.iterator();
        ActiveMQMessageConsumer consumer;
        do {
            if (!var1.hasNext()) {
                MessageDispatch message = this.messageQueue.dequeueNoWait();
                if (message == null) {
                    return false;
                }
                this.dispatch(message);
                return !this.messageQueue.isEmpty();
            }
            consumer = (ActiveMQMessageConsumer)var1.next();
        } while(!consumer.iterate());
        return true;
    }
    void dispatch(MessageDispatch message) {
        Iterator var2 = this.session.consumers.iterator();
        while(var2.hasNext()) {
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)var2.next();
            ConsumerId consumerId = message.getConsumerId();
            if (consumerId.equals(consumer.getConsumerId())) {
                consumer.dispatch(message);
                break;
            }
        }
	//ActiveMQMessageConsumer.class
	public boolean iterate() {
        MessageListener listener = (MessageListener)this.messageListener.get();
        if (listener != null) {
            MessageDispatch md = this.unconsumedMessages.dequeueNoWait();
            if (md != null) {
                this.dispatch(md);
                return true;
            }
        }
        return false;
    }

可以看到逻辑是,先判断当前session是否有注册消费者,有注册消费者则迭代判断每个消费者是否有注册Listener(异步等待消息),如果有注册Listener并且当前刚好取得到消息,就调用consumer的dispatch由消费者主动去转发消息。如果没有,就dequeue,如果刚好有消息就调用executor的dispatch去转发消息(最终是去迭代是否有注册消费者使用消费者来转发消息),没有则继续挂起等待有人继续调用wakeup修改pending再继续循环。Consumer拿到MessageDispatch调用自己的disptach方法进行消费,这个我们后面再讲,先不展开。
前面这里讲了这么久都是对于已经有消息在队列,而直接dequeue的,那么消息是什么时候入队的呢?
这里得回忆前面那篇博客,我们讲到了session启动后,会开启tcpTransport的线程接收消息,最终回调是到ActiveMQConnection的onCommand方法

	//ActiveMQConnection.class
	public void onCommand(Object o) {
        final Command command = (Command)o;
        if (!this.closed.get() && command != null) {
            try {
                command.visit(new CommandVisitorAdapter() {
                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
                        ActiveMQConnection.this.waitForTransportInterruptionProcessingToComplete();
                        ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)ActiveMQConnection.this.dispatchers.get(md.getConsumerId());
                        if (dispatcher != null) {
                            Message msg = md.getMessage();
                            if (msg != null) {
                                msg = msg.copy();
                                msg.setReadOnlyBody(true);
                                msg.setReadOnlyProperties(true);
                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
                                msg.setConnection(ActiveMQConnection.this);
                                msg.setMemoryUsage((MemoryUsage)null);
                                md.setMessage(msg);
                            }

                            dispatcher.dispatch(md);
                        } else {
                            ActiveMQConnection.LOG.debug("{} no dispatcher for {} in {}", new Object[]{this, md, ActiveMQConnection.this.dispatchers});
                        }

                        return null;
                    }
					//省略部分代码
                });
            } catch (Exception var5) {
                this.onClientInternalException(var5);
            }
        }

        Iterator iter = this.transportListeners.iterator();

        while(iter.hasNext()) {
            TransportListener listener = (TransportListener)iter.next();
            listener.onCommand(command);
        }

    }

补充上次的逻辑流程图
activeMQ发送流程
查看Command实现类有一个MessageDispatch,代表要转发的消息(被订阅的queue或者topic产生的消息),查看其visit方法果然是调用CommandVisitor的processMessageDispatch方法处理消息给Consumer

	//MessageDispatch.class
    public Response visit(CommandVisitor visitor) throws Exception {
        return visitor.processMessageDispatch(this);
    }

根据前面代码,在处理好等到传输完成,封装好消息对象后,会从Session获取该消息的订阅者Dispatch进行转发

 	ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)ActiveMQConnection.this.dispatchers.get(md.getConsumerId());
	Message msg = md.getMessage();
	if (msg != null) {
	    msg = msg.copy();
	    msg.setReadOnlyBody(true);
	    msg.setReadOnlyProperties(true);
	    msg.setRedeliveryCounter(md.getRedeliveryCounter());
	    msg.setConnection(ActiveMQConnection.this);
	    msg.setMemoryUsage((MemoryUsage)null);
	    md.setMessage(msg);
	}
	dispatcher.dispatch(md);

前面我们已经讲过,消息对应的ActiveMQDispatcher 就是与他相关的ActiveMQSession对象,所以查看其转发源码

	//ActiveMQSession.class
	 public void dispatch(MessageDispatch messageDispatch) {
	        try {
	            this.executor.execute(messageDispatch);
	        } catch (InterruptedException var3) {
	            Thread.currentThread().interrupt();
	            this.connection.onClientInternalException(var3);
	        }
	    }

ActiveMQSession会调用自己的executor的execute方法去处理这个消息

	//ActiveMQSessionExecutor.class
    void execute(MessageDispatch message) throws InterruptedException {
   		//省略部分处理连接未启动的异常代码
           if (!this.session.isSessionAsyncDispatch() && !this.dispatchedBySessionPool) {
               this.dispatch(message);
           } else {
               this.messageQueue.enqueue(message);
               this.wakeup();
           }
       }
   }

此处根据是配置了是异步处理消息还是同步处理,同步的情况下会直接调用dispatch,异步是把消息先入队,调用wakeup唤醒,前面讲过调用wakeup后会改变pending进而继续运行调用iterate方法去从本地队列取出消息后,再调用dispatch方法去处理消息。
在executor的dispatch方法中,回去从session里拿到Consumer,调用consumer自己的dispatch方法去处理

	//ActiveMQMessageConsumer.class
	public void dispatch(MessageDispatch md) {
        MessageListener listener = (MessageListener)this.messageListener.get();
        try {
            this.clearMessagesInProgress();
            this.clearDeliveredList();
            synchronized(this.unconsumedMessages.getMutex()) {
                if (!this.unconsumedMessages.isClosed()) {
                    if (!this.info.isBrowser() && this.session.connection.isDuplicate(this, md.getMessage())) {
                        if (this.redeliveryExpectedInCurrentTransaction(md, true)) {
                            LOG.debug("{} tracking transacted redelivery {}", this.getConsumerId(), md.getMessage());
                            if (this.transactedIndividualAck) {
                                this.immediateIndividualTransactedAck(md);
                            } else {
                                this.session.sendAck(new MessageAck(md, (byte)0, 1));
                            }
                        } else {
                            ConsumerId consumerWithPendingTransaction;
                            if ((consumerWithPendingTransaction = this.redeliveryPendingInCompetingTransaction(md)) != null) {
                                LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", new Object[]{this.getConsumerId(), md.getMessage(), consumerWithPendingTransaction});
                                this.session.getConnection().rollbackDuplicate(this, md.getMessage());
                                this.dispatch(md);
                            } else {
                                LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", this.getConsumerId(), md);
                                this.posionAck(md, "Suppressing duplicate delivery on connection, consumer " + this.getConsumerId());
                            }
                        }
                    } else if (listener != null && this.unconsumedMessages.isRunning()) {
                        if (this.redeliveryExceeded(md)) {
                            this.posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + this.getConsumerId() + " exceeds redelivery policy limit:" + this.redeliveryPolicy);
                            return;
                        }

                        ActiveMQMessage message = this.createActiveMQMessage(md);
                        this.beforeMessageIsConsumed(md);

                        try {
                            boolean expired = this.isConsumerExpiryCheckEnabled() && message.isExpired();
                            if (!expired) {
                                listener.onMessage(message);
                            }

                            this.afterMessageIsConsumed(md, expired);
                        } catch (RuntimeException var7) {
                            LOG.error("{} Exception while processing message: {}", new Object[]{this.getConsumerId(), md.getMessage().getMessageId(), var7});
                            md.setRollbackCause(var7);
                            if (!this.isAutoAcknowledgeBatch() && !this.isAutoAcknowledgeEach() && !this.session.isIndividualAcknowledge()) {
                                this.afterMessageIsConsumed(md, false);
                            } else {
                                this.rollback();
                            }
                        }
                    } else {
                        if (!this.unconsumedMessages.isRunning()) {
                            this.session.connection.rollbackDuplicate(this, md.getMessage());
                        }

                        if (md.getMessage() == null) {
                            this.unconsumedMessages.enqueue(md);
                        } else if (!this.consumeExpiredMessage(md)) {
                            this.unconsumedMessages.enqueue(md);
                            if (this.availableListener != null) {
                                this.availableListener.onMessageAvailable(this);
                            }
                        } else {
                            this.beforeMessageIsConsumed(md);
                            this.afterMessageIsConsumed(md, true);
                            if (this.info.getCurrentPrefetchSize() == 0) {
                                this.unconsumedMessages.enqueue((MessageDispatch)null);
                            }
                        }
                    }
                }
            }

            if (++this.dispatchedCount % 1000 == 0) {
                this.dispatchedCount = 0;
                Thread.yield();
            }
        } catch (Exception var9) {
            this.session.connection.onClientInternalException(var9);
        }

    }

前面使用this.session.connection.isDuplicate(this, md.getMessage())判断是否是已经被消费了的消息重复发送,如果是就根据当前是否运行在事务模式下选择只发送ack还是还要调起事务回滚,判断重复主要是内部维护了一个生产者隔离的BitArrayBin来存储已经消费的消息的producerSequeueId

	//ActiveMQConnection.class
    protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
        return this.checkForDuplicates && this.connectionAudit.isDuplicate(dispatcher, message);
    }
	//ConnectionAudit.class
 	synchronized boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
        if (this.checkForDuplicates && message != null) {
            ActiveMQDestination destination = message.getDestination();
            if (destination != null) {
                ActiveMQMessageAudit audit;
                boolean result;
                if (destination.isQueue()) {
                    audit = (ActiveMQMessageAudit)this.destinations.get(destination);
                    if (audit == null) {
                        audit = new ActiveMQMessageAudit(this.auditDepth, this.auditMaximumProducerNumber);
                        this.destinations.put(destination, audit);
                    }

                    result = audit.isDuplicate(message);
                    return result;
                }

                audit = (ActiveMQMessageAudit)this.dispatchers.get(dispatcher);
                if (audit == null) {
                    audit = new ActiveMQMessageAudit(this.auditDepth, this.auditMaximumProducerNumber);
                    this.dispatchers.put(dispatcher, audit);
                }

                result = audit.isDuplicate(message);
                return result;
            }
        }

        return false;
    }
	//ActiveMQMessageAuditNotSync.class
	public boolean isDuplicate(MessageReference message) {
        MessageId id = message.getMessageId();
        return this.isDuplicate(id);
    }
    public boolean isDuplicate(MessageId id) {
        boolean answer = false;
        if (id != null) {
            ProducerId pid = id.getProducerId();
            if (pid != null) {
                BitArrayBin bab = (BitArrayBin)this.map.get(pid.toString());
                if (bab == null) {
                    bab = new BitArrayBin(this.auditDepth);
                    this.map.put(pid.toString(), bab);
                    this.modified = true;
                }

                answer = bab.setBit(id.getProducerSequenceId(), true);
            }
        }

        return answer;
    }

如果不是重复消息,接下来判断是否本地注册了MessageListener,是进入下面代码

	this.beforeMessageIsConsumed(md);
    boolean expired = this.isConsumerExpiryCheckEnabled() && message.isExpired();
    if (!expired) {
        listener.onMessage(message);
    }
    this.afterMessageIsConsumed(md, expired);

第一步先调用beforeMessageIsConsumed做消息前置处理,主要是如果ack设置的不是自动提交,就将消息加入deliveryMessages(已处理待提交告知服务端队列)

	private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
        md.setDeliverySequenceId(this.session.getNextDeliveryId());
        this.lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
        if (!this.isAutoAcknowledgeBatch()) {
            synchronized(this.deliveredMessages) {
                this.deliveredMessages.addFirst(md);
            }

            if (this.session.getTransacted()) {
                if (this.transactedIndividualAck) {
                    this.immediateIndividualTransactedAck(md);
                } else {
                    this.ackLater(md, (byte)0);
                }
            }
        }

    }

接着判断是否有设定超时并且消息是否超时了,没有超时检测或者没有超时就调用注册的MessageListener的onMessage方法,接着进行后置处理,迭代deliveryMessages,告知服务器已经消费,并发送ack

	private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
        if (!this.unconsumedMessages.isClosed()) {
            if (messageExpired) {
                this.acknowledge(md, (byte)6);
                this.stats.getExpiredMessageCount().increment();
            } else {
                this.stats.onMessage();
                if (!this.session.getTransacted()) {
                    if (this.isAutoAcknowledgeEach()) {
                        if (this.deliveryingAcknowledgements.compareAndSet(false, true)) {
                            synchronized(this.deliveredMessages) {
                                if (!this.deliveredMessages.isEmpty()) {
                                    MessageAck ack;
                                    if (this.optimizeAcknowledge) {
                                        ++this.ackCounter;
                                        if ((double)(this.ackCounter + this.deliveredCounter) >= (double)this.info.getPrefetchSize() * 0.65D || this.optimizeAcknowledgeTimeOut > 0L && System.currentTimeMillis() >= this.optimizeAckTimestamp + this.optimizeAcknowledgeTimeOut) {
                                            ack = this.makeAckForAllDeliveredMessages((byte)2);
                                            if (ack != null) {
                                                this.deliveredMessages.clear();
                                                this.ackCounter = 0;
                                                this.session.sendAck(ack);
                                                this.optimizeAckTimestamp = System.currentTimeMillis();
                                            }

                                            if (this.pendingAck != null && this.deliveredCounter > 0) {
                                                this.session.sendAck(this.pendingAck);
                                                this.pendingAck = null;
                                                this.deliveredCounter = 0;
                                            }
                                        }
                                    } else {
                                        ack = this.makeAckForAllDeliveredMessages((byte)2);
                                        if (ack != null) {
                                            this.deliveredMessages.clear();
                                            this.session.sendAck(ack);
                                        }
                                    }
                                }
                            }

                            this.deliveryingAcknowledgements.set(false);
                        }
                    } else if (this.isAutoAcknowledgeBatch()) {
                        this.ackLater(md, (byte)2);
                    } else {
                        if (!this.session.isClientAcknowledge() && !this.session.isIndividualAcknowledge()) {
                            throw new IllegalStateException("Invalid session state.");
                        }

                        boolean messageUnackedByConsumer = false;
                        synchronized(this.deliveredMessages) {
                            messageUnackedByConsumer = this.deliveredMessages.contains(md);
                        }

                        if (messageUnackedByConsumer) {
                            this.ackLater(md, (byte)0);
                        }
                    }
                }
            }

        }
    }
    private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
        if (this.session.getTransacted()) {
            this.registerSync();
        }

        ++this.deliveredCounter;
        synchronized(this.deliveredMessages) {
            MessageAck oldPendingAck = this.pendingAck;
            this.pendingAck = new MessageAck(md, ackType, this.deliveredCounter);
            this.pendingAck.setTransactionId(this.session.getTransactionContext().getTransactionId());
            if (oldPendingAck == null) {
                this.pendingAck.setFirstMessageId(this.pendingAck.getLastMessageId());
            } else if (oldPendingAck.getAckType() == this.pendingAck.getAckType()) {
                this.pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
            } else if (!oldPendingAck.isDeliveredAck()) {
                LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, this.pendingAck);
                this.session.sendAck(oldPendingAck);
            } else {
                LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, this.pendingAck);
            }

            if (0.5D * (double)this.info.getPrefetchSize() <= (double)(this.deliveredCounter + this.ackCounter - this.additionalWindowSize)) {
                LOG.debug("ackLater: sending: {}", this.pendingAck);
                this.session.sendAck(this.pendingAck);
                this.pendingAck = null;
                this.deliveredCounter = 0;
                this.additionalWindowSize = 0;
            }
        }
    }

首先判断不是事务模式(因为事务模式要手动去提交),然后判断isAutoAcknowledgeEach(自动提交,一次ack提交所有消息),发送对应的ack,清空deliveryMessages,如果是isAutoAcknowledgeBatch(批量自动提交),则调用ackLater方法延迟发送,具体实现为,使用pendingAck参数,因为前面代码里有,在isAutoAcknowledgeEach中,如果pendingAck不为空,最后是会发送的,同时当满足

0.5D * (double)this.info.getPrefetchSize() <= (double)(this.deliveredCounter + this.ackCounter - this.additionalWindowSize)

即已经消费待提交的消息数量与ackCounter减去配置的额外窗口大小 >= prefetchSize的一半,ackLater方法也会发送pendingAck,将累计的已消费消息都提交。而这里的ackCounter是由参数optimizeAcknowledge决定的,如果不开启一直是0,条件变为

已经消费待提交的消息数量 >= prefetchSize的一半

这个参数代表根据ack累计优化,实际在前面创建consumer时根据connection的isOptimizeAcknowledge和session的isAutoAcknowledge(因为要自动提交模式的才会有需要对ack优化)和browser(默认是false)参数配置了

this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() && !this.info.isBrowser();

前面就讲了异步地等待服务器推的处理模式,接下来简单讲解recieve方法这种拉的工作模式

	//ActiveMQConsumer.class
	protected void sendPullCommand(long timeout) throws JMSException {
        this.clearDeliveredList();
        if (this.info.getCurrentPrefetchSize() == 0 && this.unconsumedMessages.isEmpty()) {
            MessagePull messagePull = new MessagePull();
            messagePull.configure(this.info);
            messagePull.setTimeout(timeout);
            this.session.asyncSendPacket(messagePull);
        }
    }
	public Message receive() throws JMSException {
        this.checkClosed();
        this.checkMessageListener();
        this.sendPullCommand(0L);
        MessageDispatch md = this.dequeue(-1L);
        if (md == null) {
            return null;
        } else {
            this.beforeMessageIsConsumed(md);
            this.afterMessageIsConsumed(md, false);
            return this.createActiveMQMessage(md);
        }
    }

主要逻辑就是异步发送一个pull的命令,然后调用dequeue方法去阻塞(没有设定超时时)直到获得一个消息。
所以,在前一篇博客的基础上,实际是由ActiveMQConnection的onCommand方法与consumer端的处理衔接上了,最终流程图如下
activemq最终流程图
欢迎找歪歪梯聊骚

发布了35 篇原创文章 · 获赞 12 · 访问量 5021

猜你喜欢

转载自blog.csdn.net/weixin_44627989/article/details/105019077