上一篇看了看生产者的部分,本文将开始看消费者的部分,先看一段伪代码,如:
1 // 监听broker的消息 2 public class Transport implements Runnable { 3 private TransportListener transportListener; 4 5 public void run() { 6 Object command = "pullMessage"; 7 while(true) { 8 Response resp = requestBroker(command); 9 if (resp.getStatus() != OK) { 10 // 超时重试 11 // 异常处理 12 } else { 13 // 监听到了信息 14 transportListener.onMessage(resp.getMessage()) 15 } 16 } 17 } 18 19 public Response requestBroker(Object command) { 20 // request broker 21 } 22 } 23 // 监听器 24 public class TransportListener { 25 private MessageDispatcher messageDispatcher; 26 27 public void onMessage(Message message) { 28 // 转发消息到consumer的消息队列中 29 messageDispatcher.dispatch(message); 30 } 31 } 32 // 消息转发器 33 public class MessageDispatcher { 34 private MessageListener messageListener; 35 private MessageDispatchChannel messageDispatchChannel; 36 37 public void dispatch(Message message) { 38 if (messageListener != null) { 39 messageListener.onMessage(message); 40 } else { 41 messageDispatchChannel.enqueue(message); 42 } 43 } 44 } 45 // 转发器通道 46 public class MessageDispatchChannel { 47 public void enqueue(Message message) { 48 // 入队 49 } 50 public Message dequeue() { 51 // 出队 52 } 53 } 54 // 消费者 55 public class Consumer { 56 private MessageDispatchChannel messageDispatchChannel; 57 public Message receive() { 58 for (; ; ) { 59 // 消息出队 60 return messageDispatchChannel.dequeue(); 61 } 62 } 63 }
Transport在一个Thread线程中运行,轮询broker的消息,将消息依次转给TransportListener -> MessageDispatcher -> MessageDispatchChannel -> Consumer或者MessageListener
DEMO
DEMO和生产者基本类似
1 // 构建并配置一个连接工厂 2 // 1、单纯给connectionFactory设置了一下brokerUrl、userName、password 3 // brokerUrl如果有配置参数,会被解析处理并设置到connectionFactory上 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 5 // 创建连接 6 // 1、创建了transport用于发送数据,以及transport启动一个线程轮询数据(带上clientID),如果有监听器,那么触发监听器 7 // 2、transport启动开始,会调用get请求判断能否连接,以及是否支持gzip压缩 8 Connection connection = connectionFactory.createConnection(); 9 // 打开连接 10 // 1、发送连接信息给broker 11 // 2、设置连接的状态为started=true 12 connection.start(); 13 // 创建一个有事务的会话 14 // 1、创建了一个session对象,并关联了connection对象 15 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 16 // 创建目的地 17 // 1、创建一个queue对象 18 Destination destination = session.createQueue("queue1"); 19 // 创建生产者 20 // 1、构建了一个MessageConsumer的实例对象,以及消息队列容器 21 MessageConsumer consumer = session.createConsumer(destination); 22 // 获取一个消息 23 Message message = consumer.receive(); 24 System.out.println("JMSMessageID=" + message.getJMSMessageID()); 25 // 发送ack确认消费 26 message.acknowledge(); 27 // 关闭连接 28 // 1、关闭线程池、关闭transport线程 29 connection.close();
源码解析
其它内容和生产者部分类似:https://www.cnblogs.com/lay2017/p/11094374.html
本文只关注以下两个不同的地方
createConnection
1 Transport transport = createTransport(); // 创建transport对象,如果是http的schema,那么创建HttpClientTransport 2 connection = createActiveMQConnection(transport, factoryStats); // 创建连接对象,给transport添加消息监听器 3 4 connection.setUserName(userName); // 设置用户名 5 connection.setPassword(password); // 设置密码 6 7 configureConnection(connection); // 配置连接 8 9 transport.start(); // 启动transport
createTransport创建了一个transport。
createActiveMQConnection创建connection传入了transport,进入方法看看创建过程对transport做了什么,一路跟进,在ActiveMQConnection的构造方法里面,设置了一个TransportListener,监听broker的消息
1 this.transport.setTransportListener(this); // 消息监听
再往下看transport.start()方法启动了transport,我们看看start方法做了什么,这里我们以Http协议的HttpClientTransport为例
HttpClientTransport的start将会启动一个线程
1 @Override 2 public void run() { 3 // ... 4 // HTTP轮询数据,如果有数据,那么触发消费监听 5 while (!isStopped() && !isStopping()) { 6 // ... 7 try { 8 answer = httpClient.execute(httpMethod); 9 int status = answer.getStatusLine().getStatusCode(); 10 if (status != HttpStatus.SC_OK) { 11 // 异常 12 } else { 13 receiveCounter++; // receive到了数据 14 DataInputStream stream = createDataInputStream(answer); // 获取数据流 15 Object command = getTextWireFormat().unmarshal(stream); // 解析出数据流 16 if (command == null) { 17 LOG.debug("Received null command from url: " + remoteUrl); 18 } else { 19 doConsume(command); // 触发消费监听 20 } 21 stream.close(); 22 } 23 } catch (IOException e) { 24 // ... 25 } finally { 26 // ... 27 } 28 } 29 }
这里我们注意到,如果从broker中轮询出消息了将会调用doConsume方法,进入doConsume,该方法调用了之前设置的transportListener
1 transportListener.onCommand(command); // 触发监听
transportListener的实现是ActiveMQConnection这个类,我们找到ActiveMQConnection的onCommand方法
1 // ... 2 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); 3 if (dispatcher != null) { 4 // ... 5 dispatcher.dispatch(md); 6 }
onCommand方法根据ConsumerId找到dispatcher(注意,dispatcher接口由ActiveMQConsumer实现,所以就是找到Consumer),并将消息转发。
进入ActiveMQMessageConsumer对象的dispatch方法,代码有点长,大体逻辑是有Listener调用onMessage没有得话enqueue入队。
1 MessageListener listener = this.messageListener.get(); 2 try { 3 // ... 4 synchronized (unconsumedMessages.getMutex()) { 5 if (!unconsumedMessages.isClosed()) { 6 if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { 7 if (listener != null && unconsumedMessages.isRunning()) { 8 // ... 9 ActiveMQMessage message = createActiveMQMessage(md); 10 // ... 11 try { 12 boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired(); 13 if (!expired) { 14 listener.onMessage(message); 15 } 16 // ... 17 } catch (RuntimeException e) { 18 // ... 19 } 20 } else { 21 // ... 22 23 if (md.getMessage() == null) { 24 unconsumedMessages.enqueue(md); 25 } else { 26 if (!consumeExpiredMessage(md)) { 27 unconsumedMessages.enqueue(md); 28 // ... 29 } else { 30 // ... 31 if (info.getCurrentPrefetchSize() == 0) { 32 unconsumedMessages.enqueue(null); 33 } 34 } 35 } 36 } 37 } else { 38 // ... 39 } 40 } 41 } 42 // ... 43 } catch (Exception e) { 44 // ... 45 }
enqueue方法由MessageDipatchChannel子类实现,我们看看它其中一个子类实现SimplePriorityMessageDispatchChannel,其实就是添加到一个List里面去,然后通知等待的线程。
1 public void enqueue(MessageDispatch message) { 2 synchronized (mutex) { 3 getList(message).addLast(message); 4 this.size++; 5 mutex.notify(); 6 } 7 }
到这里,我们基本看到了消息从broker请求而来 -> 监听器 -> 转发器 -> 消费者这么一个过程,都包含再createConnection这个方法中启动的线程逻辑里。
acknowledge
再关注一下createSession方法,这个方法transacted=false关闭了事务,而ack模式采用了client_acknowledge模式,所以在业务完成以后要调用message.acknowledge();方法确认完成。
进入ActiveMQMessage.acknowledge方法,我们看到实际上ack操作是调用了一个acknowledgeCallback回调方法
1 @Override 2 public void acknowledge() throws JMSException { 3 if (acknowledgeCallback != null) { 4 try { 5 acknowledgeCallback.execute(); 6 } catch (JMSException e) { 7 throw e; 8 } catch (Throwable e) { 9 throw JMSExceptionSupport.create(e); 10 } 11 } 12 }
这个回调方法是在dispatch的时候dispatch的时候调用createActiveMQMessage的时候设置的,我们进入ActiveMQMessageConsumer的createActiveMQMessage方法
1 private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException { 2 ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy(); 3 // ... 4 if (session.isClientAcknowledge()) { 5 m.setAcknowledgeCallback(new Callback() { 6 @Override 7 public void execute() throws Exception { 8 checkClosed(); 9 session.checkClosed(); 10 session.acknowledge(); 11 } 12 }); 13 } else if (session.isIndividualAcknowledge()) { 14 // ... 15 } 16 return m; 17 }
跟进session.acknowledge()方法
1 public void acknowledge() throws JMSException { 2 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 3 ActiveMQMessageConsumer c = iter.next(); 4 c.acknowledge(); 5 } 6 }
再跟进acknowledge方法
1 // ... 2 synchronized(deliveredMessages) { 3 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 4 // ... 5 session.sendAck(ack); 6 // ... 7 }
调用了session的sendAck方法,asyncSendPacket和syncsendPacket方法都是调用了transport提供的请求接口,将和broker直接通信。
1 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 3 asyncSendPacket(ack); 4 } else { 5 syncSendPacket(ack); 6 } 7 }
客户端接口设计UML
最后,结合两篇文章,我们看看ActiveMQ的客户端接口设计的UML图,总体还是基于JMS编程模式
转载于:https://www.cnblogs.com/lay2017/p/11101693.html