mq 分为 queue(队列模式,生产者生产了一个消息,只能由一个消费者进行消费) topic(发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费)
application.properties 配置
# http port
server.port=9090
env.host.mq=192.168.46.128
#========================================================MQ=================================================#
spring.activemq.broker-url=tcp://${env.host.mq}:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
#MQ白名单信任
spring.activemq.packages.trust-all=true
mq config类
package com.wl.dubbo.blog.mq;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
/**
* Created by wl on 2017/12/28.
*/
@Configuration
@EnableJms
public class MqConfig {
@Value("${spring.activemq.broker-url}")
private String broker_url;
@Value("${spring.activemq.user}")
private String jmsUser;
@Value("${spring.activemq.password}")
private String jsmPass;
private static final String QUEUE_NAME_ = "queue";
private static final String TOPIC_NAME = "orders";
@Bean("queue")
public Queue queueQueue(){
return new ActiveMQQueue(QUEUE_NAME_);
}
@Bean("topicQueue")
public ActiveMQTopic topicQueue(){
return new ActiveMQTopic(TOPIC_NAME);
}
@Bean(name = "activeMQConnectionFactory")
public ActiveMQConnectionFactory activeMQConnectionFactory(){
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(jmsUser,jsmPass,broker_url);
activeMQConnectionFactory.setTrustAllPackages(true);
return activeMQConnectionFactory;
}
@Bean(name = "queueListenerFactory")
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory activeMQConnectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(false);
factory.setConnectionFactory(activeMQConnectionFactory);
return factory;
}
@Bean(name = "topicListenerFactory")
public JmsListenerContainerFactory<DefaultMessageListenerContainer> topicListenerFactory(ConnectionFactory activeMQConnectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(activeMQConnectionFactory);
return factory;
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate(ConnectionFactory activeMQConnectionFactory){
return new JmsMessagingTemplate(activeMQConnectionFactory);
}
}
publish
package com.wl.dubbo.blog.mq;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
/**
* Created by wl on 2017/12/28.
*/
@Component
public class ActiveMQSendService {
private static final Logger logger = LoggerFactory.getLogger(ActiveMQSendService.class);
private JmsMessagingTemplate jmsMessagingTemplate;
private Queue queueQueue;
private ActiveMQTopic activeMQTopic;
@Autowired
public ActiveMQSendService(JmsMessagingTemplate jmsMessagingTemplate,
Queue queueQueue,
ActiveMQTopic topicQueue){
this.jmsMessagingTemplate = jmsMessagingTemplate;
this.queueQueue = queueQueue;
this.activeMQTopic = topicQueue;
}
public void sendQueueMessage(String message){
logger.info("send queue:{}",message);
jmsMessagingTemplate.convertAndSend(queueQueue,message);
}
public void sendTopicMessage(String message){
logger.info("send topic:{}",message);
jmsMessagingTemplate.convertAndSend(activeMQTopic,message);
}
}
consumer
package com.wl.dubbo.blog.mq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
/**
* Created by wl on 2017/12/28.
*/
@Component
public class ReceiverService {
private static final Logger logger = LoggerFactory.getLogger(ReceiverService.class);
@JmsListener(destination = "queue",containerFactory = "queueListenerFactory")
public void receiveTestQueue(String receiveStr) throws JMSException {
logger.info("=======================queue:{}",receiveStr);
}
@JmsListener(destination = "orders",containerFactory = "topicListenerFactory" )
public void receiveTopicQueue(String receiveStr) throws JMSException {
logger.info("=======================orders:{}",receiveStr);
}
}
启动类
package com.wl.dubbo.blog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
/**
* Created by wl on 2018/8/3.
*/
@SpringBootApplication(exclude = {
DataSourceAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class,
HibernateJpaAutoConfiguration.class, //不使用数据库
},scanBasePackages = "com.wl")
public class Application {
private static final Logger logger = LoggerFactory.getLogger(BlogApplication.class);
public static void main(String[] args) {
SpringApplication app = new SpringApplication(BlogApplication.class);
app.setWebEnvironment(true);
app.run(args);
logger.info("application init success");
}
}
测试类
package com.wl.dubbo.blog;
import com.wl.dubbo.blog.mq.ActiveMQSendService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* Created by wl on 2018/8/27.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class ApplicationTest {
@Autowired
private ActiveMQSendService activeMQSendService;
@Test
public void testSendQueue(){
activeMQSendService.sendQueueMessage("queue===============");
}
@Test
public void testSendTopic(){
activeMQSendService.sendTopicMessage("topic================");
}
}
上面的topic 消息是没有持久化的, 如果在消费的应用启动之前就已经发送了topic消息,消费者是无法消费该消息的;并且如果在分布式多机部署的情况下,一个topic 会被相同的应用消费多次。解决上面的问题需要 持久化topic 以及使用virtual topic
topic 持久化
修改 activemq_home/config 目录下 activemq.xml broker 标签添加 persistent="true"属性
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" persistent="true" dataDirectory="${activemq.data}">
activemq 默认使用 kahaDB 做持久化 数据存放在 acivemq_home/data/kahaDB 目录下
mq config 类 修改 topic 的 JmsListenerContainerFactory
@Bean(name = "topicListenerFactory")
public JmsListenerContainerFactory<DefaultMessageListenerContainer> topicListenerFactory(ConnectionFactory activeMQConnectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);//"true" for the Publish/Subscribe domain
factory.setSubscriptionDurable(true);// Set this to "true" to register a durable subscription,
factory.setClientId("A");
//
factory.setConnectionFactory(activeMQConnectionFactory);
return factory;
}
持久化 topic 的listenerContainerFactory必须设置 setSubscriptionDurable(true) setPubSubDomain(true) 并且设置 clientId
注意:不同的destination 需要的listenerContainerFactory 的 clientId必须不同,所以clientId最好根据业务类型来命名。相同的destination 在分布式部署下如果clientId 相同则后启动的应用无法消费该desination(如果之前启动的应用关闭后,后启动的应用可以消费该destination)。所以要添加 新的destination 的消费者,必须要新建对应的listenerContainerFactory,并且clientId不同
虚拟topic(Virtual topic)
虚拟topic 是通过queue 实现的,在分布式部署下,相同的destination 不会被重复消费
虚拟topic的名字必须以VirtualTopic.前缀开始(可以通过改变activemq_home/conf 下 activemq.xml配置修改 )
虚拟topic 的destination 必须已Consumer.*.VirtualTopic.前缀开始(可以通过改变activemq_home/conf 下 activemq.xml配置修改 ) *号相当于分组
eg: 如果 队列名称为 VirtualTopic.Orders ,detination 为 Consumer.A.VirtualTopic.Orders、Consumer.B.VirtualTopic.Orders在分布式部署下Consumer.A.VirtualTopic.Orders 只会被消费一次 Consumer.B.VirtualTopic.Orders也只会消费一次
mq config 增加 如下
private static final String VIRTUAL_TOPIC_NAME = "VirtualTopic.Orders";
@Bean("virtualTopicQueue")
public ActiveMQTopic virtualTopicQueue(){
return new ActiveMQTopic(VIRTUAL_TOPIC_NAME);
}
publish 修改如下
package com.wl.dubbo.blog.mq;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
/**
* Created by wl on 2017/12/28.
*/
@Component
public class ActiveMQSendService {
private static final Logger logger = LoggerFactory.getLogger(ActiveMQSendService.class);
private JmsMessagingTemplate jmsMessagingTemplate;
private Queue queueQueue;
private ActiveMQTopic topicQueue;
private ActiveMQTopic virtualTopicQueue;
@Autowired
public ActiveMQSendService(JmsMessagingTemplate jmsMessagingTemplate,
Queue queueQueue,
ActiveMQTopic topicQueue,
ActiveMQTopic virtualTopicQueue){
this.jmsMessagingTemplate = jmsMessagingTemplate;
this.queueQueue = queueQueue;
this.topicQueue = topicQueue;
this.virtualTopicQueue = virtualTopicQueue;
}
public void sendQueueMessage(String message){
logger.info("send queue:{}",message);
jmsMessagingTemplate.convertAndSend(queueQueue,message);
}
public void sendTopicMessage(String message){
logger.info("send topic:{}",message);
jmsMessagingTemplate.convertAndSend(topicQueue,message);
}
public void sendVirtualTopicQueue(String message){
logger.info("send virtualTopic:{}",message);
jmsMessagingTemplate.convertAndSend(virtualTopicQueue,message);
}
}
consumer 增加 VirtualTopic.Orders 监听器
@JmsListener(destination = "Consumer.B.VirtualTopic.Orders",containerFactory = "queueListenerFactory" )
public void receiveTopicVirtualQueue(String receiveStr) throws JMSException {
logger.info("=======================Consumer.B.VirtualTopic.topic:{}",receiveStr);
}
注意使用的containnerFactory是 queueListenerFactory(与topic相反,不同的queue 可以使用相同的containnerFactory)
测试
@Test
public void testSendVirtualTopic(){
activeMQSendService.sendVirtualTopicQueue("virtualTopic==============");
}
结果
2018-08-28 00:18:18,814 INFO (DefaultLifecycleProcessor.java:343)- Starting beans in phase 2147483647
2018-08-28 00:18:19,228 INFO (StartupInfoLogger.java:57)- Started ApplicationTest in 6.17 seconds (JVM running for 7.442)
2018-08-28 00:18:19,291 INFO (ActiveMQSendService.java:50)- send virtualTopic:virtualTopic==============
2018-08-28 00:18:22,280 INFO (ReceiverService.java:31)- =======================Consumer.B.VirtualTopic.topic:virtualTopic==============