文章目录
Spring Boot 2.X - Spring Boot整合JMS之ActiveMQ
Spring Boot 2
整合ActiveMQ案例之订阅发布方式。文中Spring Boot
版本为2.1.4.RELEASE
。
上篇 Spring Boot 2.X - Spring Boot整合JMS之ActiveMQ介绍了ActiveMQ点对点的模式,这里我们来看下订阅发布模式。
1.修改配置
开启pub-sub模式,默认是关闭的,也就是默认是点对点模式:
spring:
activemq:
broker-url: tcp://192.168.0.2:61616 # activemq消息组件的连接主机
user: admin
password: admin # 账号密码默认为admin
jms:
pub-sub-domain: true # 启用发布订阅模式(默认关闭是点对点模式)
2.订阅者
新建订阅者TopicSub
@Component
public class TopicSub {
@JmsListener(destination = "topic.test"/*,containerFactory = "topicListenerContainerFactory"*/)
public void receive1(String message) {
System.err.println("Topic.Subscribe.receive1接收消息:" + message);
}
@JmsListener(destination = "topic.test"/*,containerFactory = "topicListenerContainerFactory"*/)
public void receive2(String message) {
System.err.println("Topic.Subscribe.receive2接收消息:" + message);
}
@JmsListener(destination = "topic.test"/*,containerFactory = "topicListenerContainerFactory"*/)
public void receive3(String message) {
System.err.println("Topic.Subscribe.receive3接收消息:" + message);
}
@JmsListener(destination = "topic.test"/*,containerFactory = "topicListenerContainerFactory"*/)
public void receive4(String message) {
System.err.println("Topic.Subscribe.receive4接收消息:" + message);
}
}
并在ActiveMQ管理后台增加一条名为topic.test
的Topic,如下:
3.修改接口
在MsgController
中增加消息发布接口
@GetMapping("/sendTopic")
public String sendTopic(String msg) {
jmsTemplate.convertAndSend(new ActiveMQTopic("topic.test"), msg);
return "SUCCESS";
}
4.启动测试
启动ActiveMQ服务,并启动XxxApplication本项目,测试
5.问题?
这里有个问题,我们一下之前的点对点消息模式:
我们发现,并没有看到消费的信息打印出来。查阅ActiveMQ管理后台,有消息未被消费
6.点对点和发布订阅共存解决
6.1 新建配置类
新建配置类,并添加Bean:JmsListenerContainerFactory
,设置监听容器支持Pub-Sub模式
@Configuration
public class MyConfig {
@Bean
public JmsListenerContainerFactory<?> topicListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
6.2 修改配置
注释掉spring.jms.pub-sub-domain
,或者设为false
spring:
activemq:
broker-url: tcp://192.168.0.2:61616 # activemq消息组件的连接主机
user: admin
password: admin # 账号密码默认为admin
# jms:
# pub-sub-domain: true # 启用发布订阅模式(默认关闭是点对点模式)
6.3 修改订阅者
指定containerFactory
为上一步设置的topicListenerContainerFactory
。
@Component
public class TopicSub {
@JmsListener(destination = "topic.test",containerFactory = "topicListenerContainerFactory")
public void receive1(String message) {
System.err.println("Topic.Subscribe.receive1接收消息:" + message);
}
@JmsListener(destination = "topic.test",containerFactory = "topicListenerContainerFactory")
public void receive2(String message) {
System.err.println("Topic.Subscribe.receive2接收消息:" + message);
}
@JmsListener(destination = "topic.test"/*,containerFactory = "topicListenerContainerFactory"*/)
public void receive3(String message) {
System.err.println("Topic.Subscribe.receive3接收消息:" + message);
}
@JmsListener(destination = "topic.test"/*,containerFactory = "topicListenerContainerFactory"*/)
public void receive4(String message) {
System.err.println("Topic.Subscribe.receive4接收消息:" + message);
}
}
6.4 启动测试
启动ActiveMQ和本项目,
- 测试点对点模式:
- 测试发布订阅模式:
已经成功实现点对点和发布订阅模式共存了。
7.发现另一种方法
在实际使用中,我发现,其实可以不用配置Bean,原生可以实现2中模式共存的情况。
7.1 修改配置
再次开启pub-sub模式
spring:
activemq:
broker-url: tcp://192.168.0.2:61616 # activemq消息组件的连接主机
user: admin
password: admin # 账号密码默认为admin
jms:
pub-sub-domain: true # 启用发布订阅模式(默认关闭是点对点模式)
7.2 注释掉配置类
我们不需要这个配置类,注释掉
@Configuration
public class MyConfig {
// @Bean
// public JmsListenerContainerFactory<?> topicListenerContainerFactory(ConnectionFactory connectionFactory) {
// DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// factory.setPubSubDomain(true);
// factory.setConnectionFactory(connectionFactory);
// return factory;
// }
}
7.3 修改订阅者
去掉指定的containerFactory
@Component
public class TopicSub {
@JmsListener(destination = "topic.test"/*,containerFactory = "topicListenerContainerFactory"*/)
public void receive1(String message) {
System.err.println("Topic.Subscribe.receive1接收消息:" + message);
}
@JmsListener(destination = "topic.test"/*,containerFactory = "topicListenerContainerFactory"*/)
public void receive2(String message) {
System.err.println("Topic.Subscribe.receive2接收消息:" + message);
}
@JmsListener(destination = "topic.test"/*,containerFactory = "topicListenerContainerFactory"*/)
public void receive3(String message) {
System.err.println("Topic.Subscribe.receive3接收消息:" + message);
}
@JmsListener(destination = "topic.test"/*,containerFactory = "topicListenerContainerFactory"*/)
public void receive4(String message) {
System.err.println("Topic.Subscribe.receive4接收消息:" + message);
}
}
7.4 修改点对点调用方法
修改Controller中点对点调用方法:
@GetMapping("/send")
public String sendMsg(String msg){
// 这么写的话,即便启用pub-sub模式,不需要增加额外配置JmsListenerContainerFactory,也可以实现Topic和Queue共存。
jmsTemplate.convertAndSend("queue.test",msg);
// jmsTemplate.convertAndSend(new ActiveMQQueue("queue.test"), msg);
return "SUCCESS";
}
7.5 启动测试
- 发布订阅模式:
- 点对点模式:
OK。这样也实现了点对点和发布订阅两种模式的共存。
8.项目代码
完整代码:Github