目录
ActiveMQ 5.15.4 集成 Spring Boot 2.0.3
ActiveMQ 5.15.4 集成 Spring Boot 2.0.3
1、SpringBoot 提供了对 JMS 的支持,对主流的消息中间件如 RabbitMQ、Apache Kafka、Apache ActiveMQ 等都提供了集成。
2、参照 spring boot 官方文档 ActiveMQ Support 即可轻松集成 ActiveMQ 与 Spring Boot。
3、本文演示环境:Spring Boot 2.0.3 + ActiveMQ 5.25.4 + IDEA 2018。整体结构如下 MessageConsumer 作为消息消费者,监听消息并打印到控制台,MessageController 作为控制层,用户从页面请求发送消息。
(单独运行解压的 ActiveMQ 5.15.9 作为消息服务器)
pom.xml 依赖
1、新建 Spring Boot web 应用,pom.xml 文件内容如下:
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--使用 spring-boot-starter-activemq,将提供连接或嵌入activemq实例所需的依赖项-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
spring-boot-starter-activemq : spring boot 集成的 activemq 依赖,spring boot 2.0.3 版本集成 activemq 5.15.4 版本
spring-boot-starter-web:创建 web 应用,后续可以从浏览器发送消息,方便测试。
application.yml 配置
1、activemq 配置由 spring.activemq.* 属性控制,下面简单演示几项:
spring:
activemq:
broker-url: 'tcp://localhost:61616' #ActiveMQ服务请求地址,不写时默认为 tcp://localhost:61616
in-memory: true #默认代理 URL 是否应在内存中,默认为 true
pool:
enabled: false #是否应创建 JmsPoolConnectionFactory,而不是常规的 ConnectionFactory,默认 false
max-connections: 50 #最大池连接数
jms:
cache:
session-cache-size: 5 #会话缓存的大小(每个jms会话类型),默认为 1
ActiveMQProperties.java:https://github.com/spring-projects/spring-boot/blob/v2.1.6.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/jms/activemq/ActiveMQProperties.java
@JmsListener 接收消息
1、接收消息只需要一个 @JmsListerner 注解即可,自动会监听指定消息队列的消息,官网传送 Receiving a Message。
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* 消息消费者
*/
@Component
public class MessageConsumer {
/**
* 当 jms 基础设施存在时,任何 bean 都可以用 @JmsListener 注解创建监听器端点,如果未定义 JmsListenerContainerFactory,则会自动配置默认值。
* 默认情况下,默认工厂是事务性的。如果在存在 JtaTransactionManager 的基础结构中运行,则默认情况下它与侦听器容器相关联
* @param message
*/
@JmsListener(destination = "my-queue")
public void receiveMessage(String message) {
System.err.println("接收到了消息: " + message);
}
/**
* 可以同时监听任意多个消息队列,都会自动接收消息。
* @param message
*/
@JmsListener(destination = "my-queue2")
public void receiveMessage2(String message) {
System.out.println("收到消息:" + message);
}
}
JmsTemplate 发送消息
1、JmsTemplate 对 Spring boot 支持的所有 JMS 库提供了统一操作的 API。程序员不用再像调用原生的 ActiveMQ API 一样去考虑打开连接、打开、关闭 Session 等操作。官网传送 Sending a Message。
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Destination;
/**
* jms 消息控制层
*/
@RestController
@RequestMapping("jms")
public class MessageController {
/**spring 的 JmsTemplate 是自动配置,可以直接注入使用,如同 JdbcTemplate 一样,非常方便,封装好了 API,直接调用即可*/
@Autowired
private JmsTemplate jmsTemplate;
/**JmsMessagingTemplate 对 JmsTemplate 进行了封装,都可以用来发送消息*/
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**请求地址:http://localhost:8080/jms/sendMessage1?message=谢三哥
* @param message:待发送的消息
* @return
*/
@GetMapping("sendMessage1")
public String sendMessage1(String message) {
String formatMessage = messageFormat(message);
Destination destination = new ActiveMQQueue("my-queue");/**创建消息队列,自定义队列名称*/
/**convertAndSend(D destination, Object payload):转换与发送消息,destination:目的地,payload:待发送的消息,底层调用 send 方法
* send(D destination, Message<?> message):发送消息,convertAndSend 方法会将原始消息加入消息头转换成真正能发送的消息(Message)
* 支持发送的消息类型有:String, byte array, Map<String,?>, Serializable object.
* 如下所示如果直接传 jsonNodes,则会抛异常,因为不支持 jsonNodes
* 待发送的消息不能为 null,也不建议为空,否则接收端默认会抛异常
*/
jmsTemplate.convertAndSend(destination, formatMessage);
return formatMessage;
}
/**发送消息。请求地址:http://localhost:8080/jms/sendMessage2?message=张三哥
* @param message:待发送的消息
* @return
*/
@GetMapping("sendMessage2")
public String sendMessage2(String message) {
String formatMessage = messageFormat(message);
Destination destination = new ActiveMQQueue("my-queue2");/**创建消息队列,自定义队列名称*/
jmsMessagingTemplate.convertAndSend(destination, formatMessage);/**转换与发送消息*/
return formatMessage;
}
/**
* 将待发送的消息先进行 json 格式化一下,便于传输与取值。
*
* @param message :用户待发送的原始消息
* @return :返回转换好的 json 格式的消息
*/
private String messageFormat(String message) {
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.instance;
ObjectNode jsonNodes = jsonNodeFactory.objectNode();
jsonNodes.put("message", message);//message 为 null 时,照样可以 put
jsonNodes.put("status", 200);
jsonNodes.put("timeStamp", System.currentTimeMillis());
return jsonNodes.toString();
}
}
运行测试
1、先启动 ActiveMQ 消息服务器,然后启动本应用,访问浏览器地址,MessageConsumer 能接收到消息,便说明成功了。
项目源码:https://github.com/wangmaoxiong/active_mq_2
内嵌启动 ActiveMQ 服务
1、实际生产环境中大多还是将 ActiveMQ 单独作为服务器启动,但是平时开发、测试时使用内嵌的 ActiveMQ 服务也是很方便的,就像内嵌 tomcat 服务器一样。
2、《ActiveMQ 命令行启动 与 嵌入式启动》中已经介绍了 "嵌入式启动",不过彼时是 Maven 管理的 Java SE 应用,现在是 Spring Boot web 应用。
3、仍然可以沿用此思路,使用 BrokerService API 进行内嵌启动,在 spring boot 启动后自动执行 brokerService.start() 启动即可,显然容易想到的是用 ServletContextListener 应用启动监听器。监听到应用启动后,执行 brokerService.start() 启动内嵌的 ActiveMQ 服务。(这只是多种方式中的其中一个思路,仅供参考)
4、内嵌 ActiveMQ 服务器时,还需要再添加 activemq-kahadb-store 依赖:
<!--使用 spring-boot-starter-activemq,将提供连接或嵌入activemq实例所需的依赖项-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--ActiveMQ 服务器内嵌启动时,需要添加 activemq-kahadb-store,用于数据持久化-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
</dependency>
5、然后写一个应用启动监听器,实现 ServletContextListener 即可,在应用启动初始化方法中启动 ActiveMQ 服务:
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
/**
* 标准的 Servlet 监听器。
*/
public class SystemListener implements ServletContextListener {
private static final Logger logger = LoggerFactory.getLogger(SystemListener.class);
/**
* 应用启动时自动执行
* @param servletContextEvent
*/
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
try {
logger.info("应用启动......");
/**设置 ActiveMQ 消息服务器用于被客户端连接的 url 地址,实际开发中,地址应该在配置文件中可配置,不要写死*/
String serviceURL = "tcp://localhost:61616";
/**BrokerService 表示 ActiveMQ 服务,每一个 BrokerService 表示一个消息服务器实例
* 如果想启动多个,只需要 start 多个不同端口的 BrokerService 即可*/
BrokerService brokerService = new BrokerService();
brokerService.setUseJmx(true);//设置是否应将代理的服务公开到jmx中。默认是 true
brokerService.addConnector(serviceURL);//为指定地址添加新的传输连接器
/**启动 ActiveMQ 服务,此时客户端便可以使用提供的地址进行连接,然后发送消息过来,或者从这里消费消息。
* 注意:这里内嵌启动后,默认是没有提供 8161 端口的 web 管理界面的,照样能做消息中间件使用*/
brokerService.start();
logger.info("启动内嵌 ActiveMQ 服务器完成......");
} catch (Exception e) {
logger.error("启动内嵌 ActiveMQ 服务器失败...");
}
}
/**应用销毁时自动执行*/
@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
logger.info("应用关闭......");
}
}
6、然后写一个配置类注册 servlet 容器启动监听器:
import org.springframework.boot.web.servlet.ServletListenerRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**系统配置类*/
@Configuration
public class SystemConfig {
/**
* 注册 Servlet 三大组件 之 Listner
* 添加 ServletListenerRegistrationBean ,就相当于以前在 web.xml 中配置的 <listener></listener>标签
*/
@Bean
public ServletListenerRegistrationBean myListener() {
/**ServletListenerRegistrationBean<T extends EventListener> 使用的是泛型,可以注册常见的任意监听器
* 将自己的监听器注册进来*/
ServletListenerRegistrationBean registrationBean =
new ServletListenerRegistrationBean(new SystemListener());
return registrationBean;
}
}
7、其它代码不用修改,现在可以关闭独立启动的 ActiveMQ 服务器了,使用内嵌 activeMQ 服务。测试运行: