ActiveMQ 中的消息都继承自 org.apache.activemq.command.BaseCommand 类。
broker 处理消息的调用栈如下:
TransportConnection 类实现了 CommandVisitor 接口,描述了处理各种消息的逻辑。
public class TransportConnection implements Connection, Task, CommandVisitor { @Override public Response service(Command command) { ... // command 即消息。以 ProducerInfo 为例 response = command.visit(this); ... } @Override public Response processAddProducer(ProducerInfo info) throws Exception { SessionId sessionId = info.getProducerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); if (cs == null) { throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId); } SessionState ss = cs.getSessionState(sessionId); if (ss == null) { throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId); } // Avoid replaying dup commands if (!ss.getProducerIds().contains(info.getProducerId())) { ActiveMQDestination destination = info.getDestination(); if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){ throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection()); } } broker.addProducer(cs.getContext(), info); try { ss.addProducer(info); } catch (IllegalStateException e) { broker.removeProducer(cs.getContext(), info); } } return null; } } // org.apache.activemq.command.ProducerInfo public Response visit(CommandVisitor visitor) throws Exception { return visitor.processAddProducer(this); }