activemq源码分析(二)——consume接收消息
先看看本博客把consumer端分析后完整的activemq流程图
前面分析了一篇博客关于producer如何生产消息:activemq源码笔记(一),最终还是没有找到与ack相关的内容,因为ack的提交逻辑主要在消费者。本篇博客继续跟踪消费者消费消息的源码。
先看看代码
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Queue queue = session.createQueue("test-queue");
//使用session创建到达queue的consumer
MessageConsumer consumer = session.createConsumer(queue);
//为consumer添加消息处理方法---异步等待服务器推
consumer.setMessageListener((message)->{
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
Message msg = consumer.receive();//同步,消费者主动拉
session.close();
connection.close();
前面关于到创建queue的都跟踪过来,接下来看看createConsumer方法,其使用session的id与内部的consume序列号生成器(long类型自增)生成一个ConsumerId对象
public MessageConsumer createConsumer(Destination destination) throws JMSException {
return this.createConsumer(destination, (String)null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
return this.createConsumer(destination, messageSelector, false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
return this.createConsumer(destination, messageSelector, noLocal, (MessageListener)null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
this.checkClosed();
if (destination instanceof CustomDestination) {
CustomDestination customDestination = (CustomDestination)destination;
return customDestination.createConsumer(this, messageSelector, noLocal);
} else {
ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
int prefetch = false;
int prefetch;
if (destination instanceof Topic) {
prefetch = prefetchPolicy.getTopicPrefetch();
} else {
prefetch = prefetchPolicy.getQueuePrefetch();
}
ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
return new ActiveMQMessageConsumer(this, this.getNextConsumerId(), activemqDestination, (String)null, messageSelector, prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, this.isAsyncDispatch(), messageListener);
}
}
protected ConsumerId getNextConsumerId() {
return new ConsumerId(this.info.getSessionId(), this.consumerIdGenerator.getNextSequenceId());
}
最终构造一个ActiveMQMessageConsumer的对象
//ActiveMQMessageConsumer.class
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync, MessageListener messageListener) throws JMSException {
//省略了一些异常代码
this.session = session;
this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
if (this.redeliveryPolicy == null) {
this.redeliveryPolicy = new RedeliveryPolicy();
}
//省略部分此处不需要关注代码
this.info = new ConsumerInfo(consumerId);
this.info.setClientId(this.session.connection.getClientID());
this.info.setDestination(dest);
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() && !this.info.isBrowser();
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
if (messageListener != null) {
this.setMessageListener(messageListener);
}
this.session.addConsumer(this);
this.session.syncSendPacket(this.info);
if (session.connection.isStarted()) {
this.start();
}
}
//ActiveMQSession.class
protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
this.consumers.add(consumer);
if (consumer.isDurableSubscriber()) {
this.stats.onCreateDurableSubscriber();
}
this.connection.addDispatcher(consumer.getConsumerId(), this);
}
可以看到主要是一些参数的绑定,将ActiveMQSession聚合到了这个ActiveMQMessageConsumer里面,然后又反过来把这个Consumer添加到ActiveMQSession的Consumer集合里(继续吐槽,又开始整循环引用),此处添加Consumer方法中还将这个consumer的id与当前session的对应关系作为一个Dispatcher加入ActiveMQConnection对象的Dispatcher集合,也就是说ActiveMQSession也是一个ActiveMQDispatcher
//ActiveMQConnection.class
public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
this.dispatchers.put(consumerId, dispatcher);
}
最终调用ActiveMQMessageConsumer的start方法
public void start() throws JMSException {
if (!this.unconsumedMessages.isClosed()) {
this.started.set(true);
this.unconsumedMessages.start();
this.session.executor.wakeup();
}
}
在start方法里,先是调用了unconsumedMessages的start再调用session的executor的wakeup,unconsumedMessages中没有开启线程先看看executor
public void wakeup() {
if (!this.dispatchedBySessionPool) {
if (this.session.isSessionAsyncDispatch()) {
try {
TaskRunner taskRunner = this.taskRunner;
if (taskRunner == null) {
synchronized(this) {
if (this.taskRunner == null) {
if (!this.isRunning()) {
return;
}
this.taskRunner = this.session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: " + this.session.getSessionId());
}
taskRunner = this.taskRunner;
}
}
taskRunner.wakeup();
} catch (InterruptedException var5) {
Thread.currentThread().interrupt();
}
} else {
while(true) {
if (this.iterate()) {
continue;
}
}
}
}
}
executor的方法中判断当前是否以及有创建TaskRunner,没有会去创建一个TaskRunner和调用其wakeup方法,再来看看
TaskRunner
public TaskRunner createTaskRunner(Task task, String name) {
this.init();
ExecutorService executor = (ExecutorService)this.executorRef.get();
return (TaskRunner)(executor != null ? new PooledTaskRunner(executor, task, this.maxIterationsPerRun) : new DedicatedTaskRunner(task, name, this.priority, this.daemon));
}
最终使用传进来的ActiveMQSessionExecutor对象作为Task去创建一个TaskRunner,
//DedicatedTaskRunner.class
public DedicatedTaskRunner(final Task task, String name, int priority, boolean daemon) {
this.task = task;
this.thread = new Thread(name) {
public void run() {
try {
DedicatedTaskRunner.this.runTask();
} finally {
DedicatedTaskRunner.LOG.trace("Run task done: {}", task);
}
}
};
this.thread.setDaemon(daemon);
this.thread.setName(name);
this.thread.setPriority(priority);
this.thread.start();
}
task对象有一个线程成员,在构造时开启了运行,需要留意runTask及wakeup方法
public void wakeup() throws InterruptedException {
synchronized(this.mutex) {
if (!this.shutdown) {
this.pending = true;
this.mutex.notifyAll();
}
}
}
final void runTask() {
//为了方便省略部分处理代码
while(true) {
synchronized(this.mutex) {
this.pending = false;
if (this.shutdown) {
break;
}
}
if (this.task.iterate()) {
continue;
}
synchronized(this.mutex) {
if (!this.shutdown) {
while(true) {
if (this.pending) {
continue;
}
this.mutex.wait();
}
}
}
}
}
主要逻辑就是,循环去调用task的iterate方法,如果成功就继续,失败就进入阻塞等待直到其pengind标志被取消(被调用了wakeup方法),继续进入下一轮的循环调用task的iterate方法,所以主要的业务处理应该就在iterate中,前面讲到task是ActiveMQSessionExecutor,查看其源码
//ActiveMQSessionExecutor.class
public boolean iterate() {
Iterator var1 = this.session.consumers.iterator();
ActiveMQMessageConsumer consumer;
do {
if (!var1.hasNext()) {
MessageDispatch message = this.messageQueue.dequeueNoWait();
if (message == null) {
return false;
}
this.dispatch(message);
return !this.messageQueue.isEmpty();
}
consumer = (ActiveMQMessageConsumer)var1.next();
} while(!consumer.iterate());
return true;
}
void dispatch(MessageDispatch message) {
Iterator var2 = this.session.consumers.iterator();
while(var2.hasNext()) {
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)var2.next();
ConsumerId consumerId = message.getConsumerId();
if (consumerId.equals(consumer.getConsumerId())) {
consumer.dispatch(message);
break;
}
}
//ActiveMQMessageConsumer.class
public boolean iterate() {
MessageListener listener = (MessageListener)this.messageListener.get();
if (listener != null) {
MessageDispatch md = this.unconsumedMessages.dequeueNoWait();
if (md != null) {
this.dispatch(md);
return true;
}
}
return false;
}
可以看到逻辑是,先判断当前session是否有注册消费者,有注册消费者则迭代判断每个消费者是否有注册Listener(异步等待消息),如果有注册Listener并且当前刚好取得到消息,就调用consumer的dispatch由消费者主动去转发消息。如果没有,就dequeue,如果刚好有消息就调用executor的dispatch去转发消息(最终是去迭代是否有注册消费者使用消费者来转发消息),没有则继续挂起等待有人继续调用wakeup修改pending再继续循环。Consumer拿到MessageDispatch调用自己的disptach方法进行消费,这个我们后面再讲,先不展开。
前面这里讲了这么久都是对于已经有消息在队列,而直接dequeue的,那么消息是什么时候入队的呢?
这里得回忆前面那篇博客,我们讲到了session启动后,会开启tcpTransport的线程接收消息,最终回调是到ActiveMQConnection的onCommand方法
//ActiveMQConnection.class
public void onCommand(Object o) {
final Command command = (Command)o;
if (!this.closed.get() && command != null) {
try {
command.visit(new CommandVisitorAdapter() {
public Response processMessageDispatch(MessageDispatch md) throws Exception {
ActiveMQConnection.this.waitForTransportInterruptionProcessingToComplete();
ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)ActiveMQConnection.this.dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage((MemoryUsage)null);
md.setMessage(msg);
}
dispatcher.dispatch(md);
} else {
ActiveMQConnection.LOG.debug("{} no dispatcher for {} in {}", new Object[]{this, md, ActiveMQConnection.this.dispatchers});
}
return null;
}
//省略部分代码
});
} catch (Exception var5) {
this.onClientInternalException(var5);
}
}
Iterator iter = this.transportListeners.iterator();
while(iter.hasNext()) {
TransportListener listener = (TransportListener)iter.next();
listener.onCommand(command);
}
}
补充上次的逻辑流程图
查看Command实现类有一个MessageDispatch,代表要转发的消息(被订阅的queue或者topic产生的消息),查看其visit方法果然是调用CommandVisitor的processMessageDispatch方法处理消息给Consumer
//MessageDispatch.class
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processMessageDispatch(this);
}
根据前面代码,在处理好等到传输完成,封装好消息对象后,会从Session获取该消息的订阅者Dispatch进行转发
ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)ActiveMQConnection.this.dispatchers.get(md.getConsumerId());
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage((MemoryUsage)null);
md.setMessage(msg);
}
dispatcher.dispatch(md);
前面我们已经讲过,消息对应的ActiveMQDispatcher 就是与他相关的ActiveMQSession对象,所以查看其转发源码
//ActiveMQSession.class
public void dispatch(MessageDispatch messageDispatch) {
try {
this.executor.execute(messageDispatch);
} catch (InterruptedException var3) {
Thread.currentThread().interrupt();
this.connection.onClientInternalException(var3);
}
}
ActiveMQSession会调用自己的executor的execute方法去处理这个消息
//ActiveMQSessionExecutor.class
void execute(MessageDispatch message) throws InterruptedException {
//省略部分处理连接未启动的异常代码
if (!this.session.isSessionAsyncDispatch() && !this.dispatchedBySessionPool) {
this.dispatch(message);
} else {
this.messageQueue.enqueue(message);
this.wakeup();
}
}
}
此处根据是配置了是异步处理消息还是同步处理,同步的情况下会直接调用dispatch,异步是把消息先入队,调用wakeup唤醒,前面讲过调用wakeup后会改变pending进而继续运行调用iterate方法去从本地队列取出消息后,再调用dispatch方法去处理消息。
在executor的dispatch方法中,回去从session里拿到Consumer,调用consumer自己的dispatch方法去处理
//ActiveMQMessageConsumer.class
public void dispatch(MessageDispatch md) {
MessageListener listener = (MessageListener)this.messageListener.get();
try {
this.clearMessagesInProgress();
this.clearDeliveredList();
synchronized(this.unconsumedMessages.getMutex()) {
if (!this.unconsumedMessages.isClosed()) {
if (!this.info.isBrowser() && this.session.connection.isDuplicate(this, md.getMessage())) {
if (this.redeliveryExpectedInCurrentTransaction(md, true)) {
LOG.debug("{} tracking transacted redelivery {}", this.getConsumerId(), md.getMessage());
if (this.transactedIndividualAck) {
this.immediateIndividualTransactedAck(md);
} else {
this.session.sendAck(new MessageAck(md, (byte)0, 1));
}
} else {
ConsumerId consumerWithPendingTransaction;
if ((consumerWithPendingTransaction = this.redeliveryPendingInCompetingTransaction(md)) != null) {
LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", new Object[]{this.getConsumerId(), md.getMessage(), consumerWithPendingTransaction});
this.session.getConnection().rollbackDuplicate(this, md.getMessage());
this.dispatch(md);
} else {
LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", this.getConsumerId(), md);
this.posionAck(md, "Suppressing duplicate delivery on connection, consumer " + this.getConsumerId());
}
}
} else if (listener != null && this.unconsumedMessages.isRunning()) {
if (this.redeliveryExceeded(md)) {
this.posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + this.getConsumerId() + " exceeds redelivery policy limit:" + this.redeliveryPolicy);
return;
}
ActiveMQMessage message = this.createActiveMQMessage(md);
this.beforeMessageIsConsumed(md);
try {
boolean expired = this.isConsumerExpiryCheckEnabled() && message.isExpired();
if (!expired) {
listener.onMessage(message);
}
this.afterMessageIsConsumed(md, expired);
} catch (RuntimeException var7) {
LOG.error("{} Exception while processing message: {}", new Object[]{this.getConsumerId(), md.getMessage().getMessageId(), var7});
md.setRollbackCause(var7);
if (!this.isAutoAcknowledgeBatch() && !this.isAutoAcknowledgeEach() && !this.session.isIndividualAcknowledge()) {
this.afterMessageIsConsumed(md, false);
} else {
this.rollback();
}
}
} else {
if (!this.unconsumedMessages.isRunning()) {
this.session.connection.rollbackDuplicate(this, md.getMessage());
}
if (md.getMessage() == null) {
this.unconsumedMessages.enqueue(md);
} else if (!this.consumeExpiredMessage(md)) {
this.unconsumedMessages.enqueue(md);
if (this.availableListener != null) {
this.availableListener.onMessageAvailable(this);
}
} else {
this.beforeMessageIsConsumed(md);
this.afterMessageIsConsumed(md, true);
if (this.info.getCurrentPrefetchSize() == 0) {
this.unconsumedMessages.enqueue((MessageDispatch)null);
}
}
}
}
}
if (++this.dispatchedCount % 1000 == 0) {
this.dispatchedCount = 0;
Thread.yield();
}
} catch (Exception var9) {
this.session.connection.onClientInternalException(var9);
}
}
前面使用this.session.connection.isDuplicate(this, md.getMessage())判断是否是已经被消费了的消息重复发送,如果是就根据当前是否运行在事务模式下选择只发送ack还是还要调起事务回滚,判断重复主要是内部维护了一个生产者隔离的BitArrayBin来存储已经消费的消息的producerSequeueId
//ActiveMQConnection.class
protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
return this.checkForDuplicates && this.connectionAudit.isDuplicate(dispatcher, message);
}
//ConnectionAudit.class
synchronized boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
if (this.checkForDuplicates && message != null) {
ActiveMQDestination destination = message.getDestination();
if (destination != null) {
ActiveMQMessageAudit audit;
boolean result;
if (destination.isQueue()) {
audit = (ActiveMQMessageAudit)this.destinations.get(destination);
if (audit == null) {
audit = new ActiveMQMessageAudit(this.auditDepth, this.auditMaximumProducerNumber);
this.destinations.put(destination, audit);
}
result = audit.isDuplicate(message);
return result;
}
audit = (ActiveMQMessageAudit)this.dispatchers.get(dispatcher);
if (audit == null) {
audit = new ActiveMQMessageAudit(this.auditDepth, this.auditMaximumProducerNumber);
this.dispatchers.put(dispatcher, audit);
}
result = audit.isDuplicate(message);
return result;
}
}
return false;
}
//ActiveMQMessageAuditNotSync.class
public boolean isDuplicate(MessageReference message) {
MessageId id = message.getMessageId();
return this.isDuplicate(id);
}
public boolean isDuplicate(MessageId id) {
boolean answer = false;
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
BitArrayBin bab = (BitArrayBin)this.map.get(pid.toString());
if (bab == null) {
bab = new BitArrayBin(this.auditDepth);
this.map.put(pid.toString(), bab);
this.modified = true;
}
answer = bab.setBit(id.getProducerSequenceId(), true);
}
}
return answer;
}
如果不是重复消息,接下来判断是否本地注册了MessageListener,是进入下面代码
this.beforeMessageIsConsumed(md);
boolean expired = this.isConsumerExpiryCheckEnabled() && message.isExpired();
if (!expired) {
listener.onMessage(message);
}
this.afterMessageIsConsumed(md, expired);
第一步先调用beforeMessageIsConsumed做消息前置处理,主要是如果ack设置的不是自动提交,就将消息加入deliveryMessages(已处理待提交告知服务端队列)
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
md.setDeliverySequenceId(this.session.getNextDeliveryId());
this.lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
if (!this.isAutoAcknowledgeBatch()) {
synchronized(this.deliveredMessages) {
this.deliveredMessages.addFirst(md);
}
if (this.session.getTransacted()) {
if (this.transactedIndividualAck) {
this.immediateIndividualTransactedAck(md);
} else {
this.ackLater(md, (byte)0);
}
}
}
}
接着判断是否有设定超时并且消息是否超时了,没有超时检测或者没有超时就调用注册的MessageListener的onMessage方法,接着进行后置处理,迭代deliveryMessages,告知服务器已经消费,并发送ack
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
if (!this.unconsumedMessages.isClosed()) {
if (messageExpired) {
this.acknowledge(md, (byte)6);
this.stats.getExpiredMessageCount().increment();
} else {
this.stats.onMessage();
if (!this.session.getTransacted()) {
if (this.isAutoAcknowledgeEach()) {
if (this.deliveryingAcknowledgements.compareAndSet(false, true)) {
synchronized(this.deliveredMessages) {
if (!this.deliveredMessages.isEmpty()) {
MessageAck ack;
if (this.optimizeAcknowledge) {
++this.ackCounter;
if ((double)(this.ackCounter + this.deliveredCounter) >= (double)this.info.getPrefetchSize() * 0.65D || this.optimizeAcknowledgeTimeOut > 0L && System.currentTimeMillis() >= this.optimizeAckTimestamp + this.optimizeAcknowledgeTimeOut) {
ack = this.makeAckForAllDeliveredMessages((byte)2);
if (ack != null) {
this.deliveredMessages.clear();
this.ackCounter = 0;
this.session.sendAck(ack);
this.optimizeAckTimestamp = System.currentTimeMillis();
}
if (this.pendingAck != null && this.deliveredCounter > 0) {
this.session.sendAck(this.pendingAck);
this.pendingAck = null;
this.deliveredCounter = 0;
}
}
} else {
ack = this.makeAckForAllDeliveredMessages((byte)2);
if (ack != null) {
this.deliveredMessages.clear();
this.session.sendAck(ack);
}
}
}
}
this.deliveryingAcknowledgements.set(false);
}
} else if (this.isAutoAcknowledgeBatch()) {
this.ackLater(md, (byte)2);
} else {
if (!this.session.isClientAcknowledge() && !this.session.isIndividualAcknowledge()) {
throw new IllegalStateException("Invalid session state.");
}
boolean messageUnackedByConsumer = false;
synchronized(this.deliveredMessages) {
messageUnackedByConsumer = this.deliveredMessages.contains(md);
}
if (messageUnackedByConsumer) {
this.ackLater(md, (byte)0);
}
}
}
}
}
}
private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
if (this.session.getTransacted()) {
this.registerSync();
}
++this.deliveredCounter;
synchronized(this.deliveredMessages) {
MessageAck oldPendingAck = this.pendingAck;
this.pendingAck = new MessageAck(md, ackType, this.deliveredCounter);
this.pendingAck.setTransactionId(this.session.getTransactionContext().getTransactionId());
if (oldPendingAck == null) {
this.pendingAck.setFirstMessageId(this.pendingAck.getLastMessageId());
} else if (oldPendingAck.getAckType() == this.pendingAck.getAckType()) {
this.pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
} else if (!oldPendingAck.isDeliveredAck()) {
LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, this.pendingAck);
this.session.sendAck(oldPendingAck);
} else {
LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, this.pendingAck);
}
if (0.5D * (double)this.info.getPrefetchSize() <= (double)(this.deliveredCounter + this.ackCounter - this.additionalWindowSize)) {
LOG.debug("ackLater: sending: {}", this.pendingAck);
this.session.sendAck(this.pendingAck);
this.pendingAck = null;
this.deliveredCounter = 0;
this.additionalWindowSize = 0;
}
}
}
首先判断不是事务模式(因为事务模式要手动去提交),然后判断isAutoAcknowledgeEach(自动提交,一次ack提交所有消息),发送对应的ack,清空deliveryMessages,如果是isAutoAcknowledgeBatch(批量自动提交),则调用ackLater方法延迟发送,具体实现为,使用pendingAck参数,因为前面代码里有,在isAutoAcknowledgeEach中,如果pendingAck不为空,最后是会发送的,同时当满足
0.5D * (double)this.info.getPrefetchSize() <= (double)(this.deliveredCounter + this.ackCounter - this.additionalWindowSize)
即已经消费待提交的消息数量与ackCounter减去配置的额外窗口大小 >= prefetchSize的一半,ackLater方法也会发送pendingAck,将累计的已消费消息都提交。而这里的ackCounter是由参数optimizeAcknowledge决定的,如果不开启一直是0,条件变为
已经消费待提交的消息数量 >= prefetchSize的一半
这个参数代表根据ack累计优化,实际在前面创建consumer时根据connection的isOptimizeAcknowledge和session的isAutoAcknowledge(因为要自动提交模式的才会有需要对ack优化)和browser(默认是false)参数配置了
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() && !this.info.isBrowser();
前面就讲了异步地等待服务器推的处理模式,接下来简单讲解recieve方法这种拉的工作模式
//ActiveMQConsumer.class
protected void sendPullCommand(long timeout) throws JMSException {
this.clearDeliveredList();
if (this.info.getCurrentPrefetchSize() == 0 && this.unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull();
messagePull.configure(this.info);
messagePull.setTimeout(timeout);
this.session.asyncSendPacket(messagePull);
}
}
public Message receive() throws JMSException {
this.checkClosed();
this.checkMessageListener();
this.sendPullCommand(0L);
MessageDispatch md = this.dequeue(-1L);
if (md == null) {
return null;
} else {
this.beforeMessageIsConsumed(md);
this.afterMessageIsConsumed(md, false);
return this.createActiveMQMessage(md);
}
}
主要逻辑就是异步发送一个pull的命令,然后调用dequeue方法去阻塞(没有设定超时时)直到获得一个消息。
所以,在前一篇博客的基础上,实际是由ActiveMQConnection的onCommand方法与consumer端的处理衔接上了,最终流程图如下