上一篇文章 《二、JMS 点对点模型 – ActiveMQ简单实现》 我们实现了JMS点对点模型的实例,本章对第二种 发布/订阅 模型来做一个简单的实例。
其实发布/订阅 模型与点对点模型的实现方式基本一致,因此这里就不写完整的过程了。
一、开发环境
与上篇文章相同
二、java项目
与上篇文章相同
三、具体实现
1、编写发布者
发布者的代码与上篇文章基本相同,不同的是 使用session 创建是的主题,而不是队列
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author .
* @version 1.0
* @name JMS生产者
* @description 消息的生产者类
* @date 2018/4/11 0011.
*/
public class JMSProducer {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args){
//连接工厂
ConnectionFactory connectionFactory;
//连接
Connection connection = null;
//会话
Session session;
//目的地
Destination destination;
//生产者
MessageProducer messageProducer;
/**
* 编写生产者的步骤
*/
try {
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.URL);
//使用连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//使用连接创建获取会话
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//与点对点唯一不同的地方
//使用会话连接一个主题作为目的地,如果这个主题不存在将会被创建
destination = session.createTopic("HelloWorld.Topic");
//使用会话创建消息发布者
messageProducer = session.createProducer(destination);
//发布主题
sendMessage(session,messageProducer);
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if (connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发布者发布主题
* @param session 会话
* @param messageProducer 发布者
*/
public static void sendMessage(Session session,MessageProducer messageProducer){
try {
//使用会话创建一条文本消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
TextMessage textMessage = session.createTextMessage("你好,世界! by Topic");
//通过消息发布者发出主题
messageProducer.send(textMessage);
System.out.println("已发送主题消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2、编写订阅者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author .
* @version 1.0
* @name
* @description
* @date 2018/4/11 0011.
*/
public class JMSConsumer {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args){
//连接工厂
ConnectionFactory connectionFactory;
//连接
Connection connection = null;
//会话
Session session;
//目的地
Destination destination;
//消息的消费者
MessageConsumer messageConsumer;
/**
* 消息的消费者编写
*/
try {
//实例化工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.URL);
//使用实例工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//使用连接获取会话
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//使用会话连接一个队列作为目的地,如果这个队列不存在将会被创建
// destination = session.createQueue("HelloWorld");
//使用会话创建一个主题,如果这个主题不存在将会被创建
Topic topic = session.createTopic("HelloWorld.Topic");
//使用会话创建一个订阅者
messageConsumer = session.createConsumer(topic);
/**
*获取消息
*/
/*同步实现*/
//设置接收者接收消息的时间,为了便于测试,这里定为50s,接收到消息之前(或超时之前)将一直阻塞
/*TextMessage textMessage = (TextMessage) messageConsumer.receive(50000);
if (textMessage!=null){
System.out.println("收到的消息是:" + textMessage.getText());
}else {
System.out.println("没有收到消息");
}*/
/*异步实现*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
String text = ((TextMessage)message).getText();
System.out.println("收到的消息 :" +text );
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
/*需要异步,则不关闭连接*/
}
}
3、关于持久订阅模式
在第一篇文章 《一、JMS概述》中我们提到:
发布/订阅模型还支持持久订阅的概念,在消息发布时,注册了主题的消费者不需要处于活动状态; 当消费者随后变得活跃时,它将收到消息。如果没有活动使用者注册主题,则该主题不会持有它收到的消息,除非它具有持久订阅的不活动消费者。
主要是业务场景如下:
A系统通过MQ推送数据到B系统。通过发布订阅的消息传送模型。由于涉及到的数据比较重要:比如是关于资金、交易、股票价格的信息。要保证B系统一定收到A系统发送的消息,考虑B系统会断电重启之类异常,故设置持久订阅模式。可以保证在B订阅A主题后,因为断电,订阅者状态变为不活动的。在B系统重启后,依然可以收到消息。
实现持久订阅模式与普通的发布订阅模式一样,主要的不同是必须设置唯一的客户端ID和订阅者ID。
1、在连接启动前设置设置客户端ID
2、使用createDurableSubscriber 创建订阅者并指定订阅者ID
//设置客户端ID
connection.setClientID("client1");
//使用会话创建一个订阅者,并指定订阅者ID为 sub1
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"sub1");
四、运行
1、启动ActiveMQ
与上一篇文章相同
2、运行程序
首先我们运行一下发布者。发布成功后我们可以在ActiveMQ 的主题中看到我们创建出来的主题消息
我们再运行一下订阅者,会发现订阅者一直在待接收消息,并没有输出我们刚刚发布的主题消息。这是因为 发布/订阅 模型 的特点:发布端在发布消息时,如果没有订阅端在线,则不会保留消息,将会认为消息已经发送。
因此我们可以先运行订阅者的代码,启动一个订阅者。为了体现发布/订阅模式一对多的特点,我们再启动第二个订阅者。可以在ActiveMQ中看到在两个订阅者在线了。
我们再启动发布者,发布一个消息。在线的两个订阅者就可以接收到我们刚刚发布的消息了。
我们再设想一下,其中一个订阅者断电下线了,如果再有消息发布,则待它再次上线时已经接收不到第二次发布的消息了。为了解决一个问题,我们可以使用持久订阅模式。
按照上面 持久订阅修改代码后重新启动两个订阅者,注意的时这两个订阅者的客户端ID与订阅者ID都必须唯一。
启动发布者发布一个消息 ,可以看到两个订阅者分别都接收到了发布的消息。
这时我们关闭 订阅者 Client1 ,再发布一个消息。这时Client2接收到了。当我们启动订阅者 Client1后,它也能够收到第二次发布的消息
至此,我们实现了JMS发布/订阅模型,并使用了持久订阅模式关于持久订阅我们需要注意的是:
很多情况下,持久化订阅非常有用,但有的时候并非如此。虽然使用持久还是非持久通常由业务决定。但是,我们还必须考虑消息消耗的存储容量。比如有一个持久订阅者长期处于不活动的状态,那么jms服务器就必须为这个订阅者存储数以千计、万计的无用信息,浪费JMS数据仓库的宝贵空间。因为,我们必须得考虑这个问题。