1. 环境搭建
-
代码已经上传至 https://github.com/masteryourself/activemq.git ,分支名称是
masteryourself-activemq-5.15.9
-
producer 是
activemq-example
工程,启动类是QueueProducer
2. 源码解析
2.1 流程预览
// 1. 创建 ActiveMQConnection
org.apache.activemq.ActiveMQConnectionFactory#createConnection ->
org.apache.activemq.ActiveMQConnectionFactory#createActiveMQConnection
// 1.1(*) 创建 transport,用于与 server 端进行交互
org.apache.activemq.ActiveMQConnectionFactory#createTransport ->
org.apache.activemq.transport.TransportFactory#connect ->
// 1.1.1(*) 这里是 spi 扩展,和 dubbo 类似
// 从 META-INF/services/org/apache/activemq/transport/tcp 文件中找到实现类【TcpTransportFactory】
org.apache.activemq.transport.TransportFactory#findTransportFactory
// 1.1.2(*) 调用 doConnect 方法,返回 Transport 对象
org.apache.activemq.transport.TransportFactory#doConnect
// 1.1.2.1 创建 Transport
org.apache.activemq.transport.tcp.TcpTransportFactory#createTransport->
// tcpTransport 里持有 socketFactory 对象,socketFactory 会创建一个 socket,所以 tcpTransport 就是操作 socket
org.apache.activemq.transport.tcp.TcpTransportFactory#createTcpTransport
// 1.1.2.2(*) 配置 configure,这个里面是对 Transport 做链路包装
org.apache.activemq.transport.TransportFactory#configure
// 1.1.2.2.1 组装一个复合的transport,这里会包装两层,一个是 IactivityMonitor.另一个是 WireFormatNegotiator
// WireFormatNegotiator 实现了客户端连接 broker 的时候先发送数据解析相关的协议信息,比如解析版本号,是否使用缓存等
// InactivityMonitor 用于实现连接成功成功后的心跳检查机制,客户端每 10s 发送一次心跳信息。服务端每 30s 读取一次心跳信息
org.apache.activemq.transport.tcp.TcpTransportFactory#compositeConfigure
// 1.1.2.2.2 用 MutexTransport 包装,实现写锁,表示同一时间只允许发送一个请求
org.apache.activemq.transport.MutexTransport#<init>
// 1.1.2.2.3 用 ResponseCorrelator 包装,用于实现异步请求
org.apache.activemq.transport.ResponseCorrelator#<init>
2.2 流程详解
2.2.1 ActiveMQSession#send(1.1)
org.apache.activemq.ActiveMQSession
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
}
// 互斥锁,如果一个 session 的多个 producer 发送消息到这里,会保证消息发送的有序性
synchronized (sendMutex) {
// tell the Broker we are about to start a new transaction
// 告诉broker开始一个新事务,只有事务型会话中才会开启
doStartTransaction();
// 从事务上下文中获取事务 id
TransactionId txid = transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();
//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
// 在 JMS 协议头中设置是否持久化标识
message.setJMSDeliveryMode(deliveryMode);
// 计算消息过期时间
long expiration = 0L;
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
// 设置消息过期时间
message.setJMSExpiration(expiration);
// 设置消息的优先级
message.setJMSPriority(priority);
// 设置消息为非重发
message.setJMSRedelivered(false);
// transform to our own message format here
// 将不同的消息格式统一转化为ActiveMQMessage
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
// 设置目的地
msg.setDestination(destination);
// 生成并设置消息 id
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
// Set the message id.
if (msg != message) {
// 如果消息是经过转化的,则更新原来的消息 id 和目的地
message.setJMSMessageID(msg.getMessageId().toString());
// Make sure the JMS destination is set on the foreign messages too.
message.setJMSDestination(destination);
}
//clear the brokerPath in case we are re-sending this message
msg.setBrokerPath(null);
msg.setTransactionId(txid);
if (connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
msg.setConnection(connection);
// 把消息属性和消息体都设置为只读,防止被修改
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isTraceEnabled()) {
LOG.trace(getSessionId() + " sending message: " + msg);
}
// 如果onComplete没有设置,且发送超时时间小于0,且消息不需要反馈,且连接器不是同步发送模式,且消息非持久化或者连接器是异步发送模式
// 或者存在事务 id 的情况下,走异步发送,否则走同步发送
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
// 异步发送
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the
// wire, this might not
// provide and accurate size. We may change over to doing
// more aggressive marshaling,
// to get more accurate sizes.. this is more important once
// users start using producer window
// flow control.
// 异步发送的情况下,需要设置 producerWindow 的大小
int size = msg.getSize();
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0 && onComplete==null) {
// 带超时时间的同步发送
this.connection.syncSendPacket(msg,sendTimeout);
}else {
// 带回调的同步发送
this.connection.syncSendPacket(msg, onComplete);
}
}
}
}
2.2.2 ResponseCorrelator#request(1.1.2.1)
org.apache.activemq.transport.ResponseCorrelator
public Object request(Object command) throws IOException {
// 这里还是调用异步发送方法
FutureResponse response = asyncRequest(command, null);
// 同步获取结果,通过 ArrayBlockingQueue 的 take 方法实现
return response.getResult();
}