版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/Afox4l/article/details/79078379
一.模式图
二.相关概念
1,主题(Topic)
2,发布者(Publisher)
3,订阅者(Subscriber)
客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
三.特点
1,每个消息可以有多个消费者
2,发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
3,为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型
ActiveMQ的安装在上一文章已经说过,此处几不在介绍。
四.代码实现
两种接受消息方式
1),messageCustomer.recive()或者messageCustomer.recive(timeout)
2),messageCustomer.setMessageListener(监听器) ;监听器要实现MessageListener
这里采用监听模式实现消息接收
1,创建监听器Listener1 。
根据需求可以创建多个监听器,满足不同的订阅者
public class Listener1 implements MessageListener{
@Override
public void onMessage(Message message) {
try {
System.out.println("sub1收到的消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2,创建发布者Publisher
public class Publisher {
private static final int SEND_NUMBER=5;
public static void main(String[] args){
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session;//会话
Destination destination;//连接目的地
MessageProducer messageProducer;//消息制造者
//实例化工厂
connectionFactory=new ActiveMQConnectionFactory(Connec.USER,Connec.PASSWORD,Connec.BROKER_URL);
try {
connection=connectionFactory.createConnection();
connection.start();//启动连接
session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
destination=session.createTopic("pubActiveMQ1");
messageProducer=session.createProducer(destination);
sendMessage(session,messageProducer);
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
try {
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException {
for(int i=0;i<SEND_NUMBER;i++) {
TextMessage message= session.createTextMessage("pubActiveMQ发送的消息"+i);
System.out.println("pub发送的消息:"+message.getText());
messageProducer.send(message);
}
}
}
3,创建订阅者Subscribe1
如果多个订阅者,则创建多个,每一个订阅者注册一个满足自己需求的监听器
public class Subscribe1 {
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session;//会话
Destination destination;//连接目的地
MessageConsumer messageConsumer;
//实例化工厂
connectionFactory=new ActiveMQConnectionFactory(Connec.USER,Connec.PASSWORD,Connec.BROKER_URL);
try {
connection=connectionFactory.createConnection();
connection.start();//启动连接
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination=session.createTopic("pubActiveMQ1");
messageConsumer=session.createConsumer(destination);
while(true) {
messageConsumer.setMessageListener(new Listener1());
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
try {
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
在订阅者注册监听器的地方要加上一个循环,不然 当main方法结束的时候,监听器的生命周期也结束了,就无法监听到发布者发布的消息。
4,运行代码
先订阅,后发布。
运行代码的时候要先运行订阅者代码,然后运行发布者代码。
————————————————
版权声明:本文为CSDN博主「Afox4l」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Afox4l/article/details/79078379