ActiveMq通过createTemporaryQueue,CreateTemporaryTopic创建临时目标,这些目标的生命周期是创建它的Connection的关闭;只有创建它的Connection所创建的session才能从临时目标中接收消息;不过任何的生产者都可以向临时目标中发送消息;如果关闭了创建此临时目标的Connection,那么临时目标被关闭,内容也将消失。
我们写一个demo去验证一件事:
1 同一个connection创建的不同session可以访问这些session中某一个session创建的临时目标。
2 不同connection创建的session,不能访问某个session创建的临时目标。
3 在满足1的情况下,某一个session创建的生产者发送消息到固定的一个队列中并且设置replyto临时队列,消费者可以从这个队列中收到消息并创建一个目标为replyto临时队列的生产者,发送一条replyMessage消息到replyto临时队列;由另外一个session(同一个connection所创建)创建消费者监听这个replyto临时队列,收到消息并打印。
package jeff.mq.tempDestination;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
/**
* 【request/reply模式】
*
* TemporaryQueue虽然是由Session会话创建,但是它的生命周期属于整个Connection连接
* 如果在一个Connection上创建两个Session会话,则一个Session1创建的TemporaryQueue或TemporaryTopic也可以被另一个Session2访问,
* 也就是临时目标可以被同一个Conenction的不同Session会话访问。
*
* 如果这两个Session会话是由不同的Connection连接创建,则一个Session1创建的TemporaryQueue不可以被另一个Session2访问。
* 也就是不同连接Connction创建的Session所创建的临时目标是相互隔离的
*
*
* 临时目标的主要作用就是为了指定回复目的地
*
* @author jeffSheng
* 2018年7月7日
*/
public class TemporaryQueueTest {
private ConnectionFactory connectionFactory;
private Connection connection;
private Connection connection1;
private Queue jeffTempQueue ;
public TemporaryQueueTest(){
try {
this.connectionFactory =
new ActiveMQConnectionFactory(
"jeff",
"123456",
"tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection1 = connectionFactory.createConnection();
connection.start();
connection1.start();
//第一步:先创建一个queue备用,生产者会向这个队列中发送消息
jeffTempQueue = new ActiveMQQueue("jeffTempQueue");
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
TemporaryQueueTest tt = new TemporaryQueueTest();
tt.requestAndReply(tt.connection);
}
private void requestTempQueueByAnotherConnection(Connection connection,TemporaryQueue replyQueue ) throws JMSException{
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
//创建消费者接收replyQueue这个临时queue中的消息
MessageConsumer replyComsumer = session.createConsumer(replyQueue);
replyComsumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
try {
System.out.println("【Connection-1】:Get reply: " + ((TextMessage) m).getText());
} catch (JMSException e) {
}
}
});
}
public void requestAndReply(Connection connection) throws JMSException{
//第二步:使用connection连接创建一个session
final Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//第三步: 使用这个session创建一个TemporaryQueue
TemporaryQueue replyQueue = session.createTemporaryQueue();
//第七步:【创建消费者】 使用同一个session 创建一个消费者接收tempQueue(生产者发送消息的队列)队列的消息,并回复到生产者指定的replyQueue中
MessageConsumer comsumer = session.createConsumer(this.jeffTempQueue);
//第八步:【注册监听器】消费者注册监听器监听消息的到来
comsumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
try {
System.out.println("【Connection】:Get Message: " + ((TextMessage) m).getText());
//第九部:【消费者创建生产者】消费者收到消息后,创建一个生产者往对面生产者指定的响应队列中发送响应消息
MessageProducer producer = session.createProducer(m.getJMSReplyTo());
//第十步:【消费者创建并发送消息】
TextMessage replyMessage = session.createTextMessage("ReplyMessage");
//一定是发送到了replyQueue
producer.send(replyMessage);
} catch (JMSException e) {
}
}
});
//第十一步:【创建session2】 使用同一个Connection创建另一个Session,来读取replyQueue上的消息。
Session session2 = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
//第十二步:【创建session2的消费者】创建一个消费者接收replyQueue的消息
MessageConsumer replyComsumer = session2.createConsumer(replyQueue);
replyComsumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
try {
System.out.println("【Connection】:Get reply: " + ((TextMessage) m).getText());
} catch (JMSException e) {
}
}
});
//使用另外一个connection创建的session去创建消费者去访问其他session创建的replyQueue
//requestTempQueueByAnotherConnection(this.connection1,replyQueue);
//第四步:【创建生产者】使用同一个session创建生产者,并往tempQueue发送消息
MessageProducer producer = session.createProducer(this.jeffTempQueue);
TextMessage message = session.createTextMessage("SimpleMessage");
//第五步:【设置响应队列】生产者设置消息的replayQueue,最终消费者接收到消息后往这个队列中发送响应
message.setJMSReplyTo(replyQueue);
//第六步:【发送消息】生产者发送消息到tempQueue
producer.send(message);
}
}
我们先注释掉:
//使用另外一个connection创建的session去创建消费者去访问其他session创建的replyQueue
// requestTempQueueByAnotherConnection(this.connection1,replyQueue);
启动打印:
说明我们上边说的1和2,然后我们放开注释:
启动打印:
log4j:WARN No appenders could be found for logger (org.apache.activemq.transport.WireFormatNegotiator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" javax.jms.InvalidDestinationException: Cannot use a Temporary destination from another Connection
at org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:196)
at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1239)
at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1183)
at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1095)
at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1067)
at jeff.mq.tempDestination.TemporaryQueueTest.requestTempQueueByAnotherConnection(TemporaryQueueTest.java:76)
at jeff.mq.tempDestination.TemporaryQueueTest.requestAndReply(TemporaryQueueTest.java:131)
at jeff.mq.tempDestination.TemporaryQueueTest.main(TemporaryQueueTest.java:68)
提示信息:不能使用另外一个connection来访问临时目标!
这就是简单的request/reply模型的使用!