02.ActiveMQ 源码解析之发送消息

1. 环境搭建

  • 代码已经上传至 https://github.com/masteryourself/activemq.git ,分支名称是 masteryourself-activemq-5.15.9

  • producer 是 activemq-example 工程,启动类是 QueueProducer

2. 源码解析

2.1 流程预览

image

// 1. 创建 ActiveMQConnection
org.apache.activemq.ActiveMQConnectionFactory#createConnection ->
	org.apache.activemq.ActiveMQConnectionFactory#createActiveMQConnection
		
		// 1.1(*) 创建 transport,用于与 server 端进行交互
		org.apache.activemq.ActiveMQConnectionFactory#createTransport ->
			org.apache.activemq.transport.TransportFactory#connect ->
				
				// 1.1.1(*) 这里是 spi 扩展,和 dubbo 类似
				// 从 META-INF/services/org/apache/activemq/transport/tcp 文件中找到实现类【TcpTransportFactory】
				org.apache.activemq.transport.TransportFactory#findTransportFactory
				
				// 1.1.2(*) 调用 doConnect 方法,返回 Transport 对象
				org.apache.activemq.transport.TransportFactory#doConnect

					// 1.1.2.1 创建 Transport
					org.apache.activemq.transport.tcp.TcpTransportFactory#createTransport->
						// tcpTransport 里持有 socketFactory 对象,socketFactory 会创建一个 socket,所以 tcpTransport 就是操作 socket
						org.apache.activemq.transport.tcp.TcpTransportFactory#createTcpTransport

					// 1.1.2.2(*) 配置 configure,这个里面是对 Transport 做链路包装
					org.apache.activemq.transport.TransportFactory#configure

						// 1.1.2.2.1 组装一个复合的transport,这里会包装两层,一个是 IactivityMonitor.另一个是 WireFormatNegotiator
        				// WireFormatNegotiator 实现了客户端连接 broker 的时候先发送数据解析相关的协议信息,比如解析版本号,是否使用缓存等
        				// InactivityMonitor 用于实现连接成功成功后的心跳检查机制,客户端每 10s 发送一次心跳信息。服务端每 30s 读取一次心跳信息
						org.apache.activemq.transport.tcp.TcpTransportFactory#compositeConfigure

						// 1.1.2.2.2 用 MutexTransport 包装,实现写锁,表示同一时间只允许发送一个请求
						org.apache.activemq.transport.MutexTransport#<init>

						// 1.1.2.2.3 用 ResponseCorrelator 包装,用于实现异步请求
						org.apache.activemq.transport.ResponseCorrelator#<init>

2.2 流程详解

2.2.1 ActiveMQSession#send(1.1)
  • org.apache.activemq.ActiveMQSession
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
                    MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {

    checkClosed();
    if (destination.isTemporary() && connection.isDeleted(destination)) {
        throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
    }
    // 互斥锁,如果一个 session 的多个 producer 发送消息到这里,会保证消息发送的有序性
    synchronized (sendMutex) {
        // tell the Broker we are about to start a new transaction
        // 告诉broker开始一个新事务,只有事务型会话中才会开启
        doStartTransaction();

        // 从事务上下文中获取事务 id
        TransactionId txid = transactionContext.getTransactionId();
        long sequenceNumber = producer.getMessageSequence();

        //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
        // 在 JMS 协议头中设置是否持久化标识
        message.setJMSDeliveryMode(deliveryMode);

        // 计算消息过期时间
        long expiration = 0L;
        if (!producer.getDisableMessageTimestamp()) {
            long timeStamp = System.currentTimeMillis();
            message.setJMSTimestamp(timeStamp);
            if (timeToLive > 0) {
                expiration = timeToLive + timeStamp;
            }
        }

        // 设置消息过期时间
        message.setJMSExpiration(expiration);

        // 设置消息的优先级
        message.setJMSPriority(priority);

        // 设置消息为非重发
        message.setJMSRedelivered(false);

        // transform to our own message format here
        // 将不同的消息格式统一转化为ActiveMQMessage
        ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);

        // 设置目的地
        msg.setDestination(destination);

        // 生成并设置消息 id
        msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));

        // Set the message id.
        if (msg != message) {

            // 如果消息是经过转化的,则更新原来的消息 id 和目的地
            message.setJMSMessageID(msg.getMessageId().toString());
            // Make sure the JMS destination is set on the foreign messages too.
            message.setJMSDestination(destination);
        }
        //clear the brokerPath in case we are re-sending this message
        msg.setBrokerPath(null);

        msg.setTransactionId(txid);
        if (connection.isCopyMessageOnSend()) {
            msg = (ActiveMQMessage)msg.copy();
        }
        msg.setConnection(connection);

        // 把消息属性和消息体都设置为只读,防止被修改
        msg.onSend();
        msg.setProducerId(msg.getMessageId().getProducerId());
        if (LOG.isTraceEnabled()) {
            LOG.trace(getSessionId() + " sending message: " + msg);
        }

        // 如果onComplete没有设置,且发送超时时间小于0,且消息不需要反馈,且连接器不是同步发送模式,且消息非持久化或者连接器是异步发送模式
        // 或者存在事务 id 的情况下,走异步发送,否则走同步发送
        if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {

            // 异步发送
            this.connection.asyncSendPacket(msg);
            if (producerWindow != null) {
                // Since we defer lots of the marshaling till we hit the
                // wire, this might not
                // provide and accurate size. We may change over to doing
                // more aggressive marshaling,
                // to get more accurate sizes.. this is more important once
                // users start using producer window
                // flow control.
                // 异步发送的情况下,需要设置 producerWindow 的大小
                int size = msg.getSize();
                producerWindow.increaseUsage(size);
            }
        } else {
            if (sendTimeout > 0 && onComplete==null) {
                // 带超时时间的同步发送
                this.connection.syncSendPacket(msg,sendTimeout);
            }else {
                // 带回调的同步发送
                this.connection.syncSendPacket(msg, onComplete);
            }
        }

    }
}
2.2.2 ResponseCorrelator#request(1.1.2.1)
  • org.apache.activemq.transport.ResponseCorrelator
public Object request(Object command) throws IOException {
    // 这里还是调用异步发送方法
    FutureResponse response = asyncRequest(command, null);
    // 同步获取结果,通过 ArrayBlockingQueue 的 take 方法实现
    return response.getResult();
}
发布了37 篇原创文章 · 获赞 3 · 访问量 1万+

猜你喜欢

转载自blog.csdn.net/masteryourself/article/details/104127268