一、ActiveMQ的作用
先致敬前辈,导入一篇难得一见的好文:ActiveMQ的作用总结(应用场景及优势) 作者: 青莲键仙
消息中间件广泛应用在大型互联网架构中,利用消息中间件队列和广播各自的特性可以支持很多业务,比如群发发送短信、给单个用户发送邮件等。
下面是我搜集的资料
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。
ActiveMQ 实现 JMS 规范并在此之上提供大量额外的特性。ActiveMQ 支持队列和订阅(广播)两种模式的消息发送。
JMS规范 即 Java 消息服务(Java Message Service)应用程序接口规范。
二、具体使用
先导入ActiveMQ依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置文件。一种是独立安装的ActiveMQ,用于生产环境。另外一种是基于内存的ActiveMQ,用于调试阶段。
# 基于内存的 ActiveMQ
spring.activemq.in-memory=true
# 不适应连接池
spring.activemq.pool.enabled=false
# 独立安装的 ActiveMQ
#spring.activemq.broker-url=tcp://192.168.0.1:61616
#spring.activemq.user=admin
#spring.activemq.password=admin
使用ActiveMQ有三种角色。消息发送者向指定的队列或者广播发送消息,接收者如果监听了此队列或者广播,则可以收到消息。
可简单理解为,消息发送者对着某个队列或者广播发送消息,如果接收者在这个队列或者监听了此广播,那么就能够接收到消息。
(1)队列或广播角色
(2)发送消息角色
(3)接收消息角色
三、向队列Queue发送消息
先在服务器启动的时候,创建一个队列广播角色的对象。创建一个 名为 "jay.queue" 的队列。
@Configuration
public class MqConfig {
@Bean
public Queue generateQueue() {
//创建一个名为 jay.queue 的MQ队列
return new ActiveMQQueue("jay.queue");
}
}
然后再创建另外两种角色,先创建发消息角色。
@Component
public class Producer {
/**
* 发送消息的工具类,能够向队列发送消息
*/
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 自动注入创建好的队列
*/
@Autowired
private Queue queue;
public void sendMessageToQueue(String message) {
System.out.println("发送者发送消息:" + message);
jmsMessagingTemplate.convertAndSend(this.queue, message);
}
}
最后再创建接收者。接收者需要使用注解指明它们是哪个队列的。
使用@JmsListener的 destination 属性 ,规定监听哪个队列。
@Component
public class Consumer {
@JmsListener(destination = "jay.queue")
public void receiveQueue(String text) {
System.out.println("接收者A接收消息 : " + text);
}
}
@Component
public class Consumer2 {
@JmsListener(destination = "jay.queue")
public void receiveQueue(String text) {
System.out.println("接收者B接收消息 " + text);
}
}
测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class QueueActiveMQTest {
@Autowired
private Producer producer;
@Test
public void test() {
for (int i = 0; i < 10; i++) {
producer.sendMessageToQueue("test message");
}
}
}
发送者发送消息:test message
接收者B接收消息 test message
接收者A接收消息 : test message
...
四、广播(Topic)
在"neo微笑"老师的指导下,学习了队列消息,那么学习广播消息就易如反掌了。不过还踩了一个坑,需要为配置文件增加一行配置:使用订阅发布模式。
通过配置项 spring.jms.pub-sub-domain 的值来控制,true 为广播模式,false 为队列模式,默认情况下支持队列模式。
# 基于内存的 ActiveMQ
spring.activemq.in-memory=true
# 不适应连接池
spring.activemq.pool.enabled=false
#使用订阅发布模式,即广播模式
spring.jms.pub-sub-domain=true
@Configuration
public class MqConfig {
@Bean
public Topic generateQueue() {
//创建一个名为 jay.topic 的广播
return new ActiveMQTopic("jay.topic");
}
}
发送者,仅仅是把发送者从 queue 改为了 从项目启动就创建的 Topic接口实现。
@Component
public class Producer {
/**
* 发送消息的工具类,能够向队列发送消息
*/
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 自动注入创建好的广播
*/
@Autowired
private Topic topic;
public void sendMessageToTopic(String message) {
System.out.println("发送者发送广播:" + message);
jmsMessagingTemplate.convertAndSend(this.topic, message);
}
}
接收者也很简单,就是使用注解订阅某个广播频道。
@Component
public class Consumer {
@JmsListener(destination = "jay.topic")
public void receiveQueue(String text) {
System.out.println("接收者A接收广播 : " + text);
}
}
@Component
public class Consumer2 {
@JmsListener(destination = "jay.topic")
public void receiveQueue(String text) {
System.out.println("接收者B接收广播 : " + text);
}
}
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 10; i++) {
producer.sendMessageToQueue("test topic message");
}
Thread.sleep(1000L);
}
发送者发送广播:test topic message
接收者B接收广播 : test topic message
接收者A接收广播 : test topic message
...
五、同时支持队列(Queue)和广播(Topic)
ActiveMQ默认只支持一种发送消息方式。如果即希望支持队列模式,也希望支持广播模式,那么就需要创建自定义的消息容器工厂。通过 DefaultJmsListenerContainerFactory 创建自定义的 JmsListenerContainerFactory 实例,之后在 @JmsListener 注解中通过 containerFactory 属性引用它。
@Configuration
@EnableJms
public class ActiveMQConfig {
@Bean("queueListenerFactory")
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(false);
return factory;
}
@Bean("topicListenerFactory")
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
}
@Configuration
public class MqConfig {
@Bean
public Queue queue() {
return new ActiveMQQueue("jay.queue");
}
@Bean
public Topic topic() {
return new ActiveMQTopic("jay.topic");
}
}
@Component
public class Producer {
/**
* 发送消息的工具类,能够向队列发送消息
*/
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 自动注入创建好的广播
*/
@Autowired
private Topic topic;
/**
* 自动注入创建好的队列
*/
@Autowired
private Queue queue;
public void sendMessageToTopic(String message) {
System.out.println("发送者发送广播消息:" + message);
jmsMessagingTemplate.convertAndSend(this.topic, message);
}
public void sendMessageToQueue(String message) {
System.out.println("发送者发送队列消息:" + message);
jmsMessagingTemplate.convertAndSend(this.queue, message);
}
}
@Component
public class Consumer {
@JmsListener(destination = "jay.queue", containerFactory = "queueListenerFactory")
public void receiveQueue(String text) {
System.out.println("Consumer queue msg : " + text);
}
@JmsListener(destination = "jay.topic", containerFactory = "topicListenerFactory")
public void receiveTopic(String text) {
System.out.println("Consumer topic msg : " + text);
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class QueueActiveMQTest {
@Autowired
private Producer producer;
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 10; i++) {
producer.sendMessageToTopic("test topic message");
producer.sendMessageToQueue("test queue message");
}
}
}
接收者B接收广播 : test topic message
Consumer topic msg : test topic message
发送者发送广播消息:test topic message
Consumer queue msg : test queue message
Consumer topic msg : test topic message
接收者B接收广播 : test topic message
发送者发送队列消息:test queue message
...