ActiveMQ概述及配置
为什么要使用消息队列
未使用消息队列:
使用消息队列:
ActiveMQ概述
ActiveMQ是Apache出品,最流行的,能力强劲的消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中仍然伴随着特殊的地位。
1.多种语言和协议编写客户端。语言:Java,C,C++,C#,Ruby,Perl,Python,PHP。
2.完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事物)
3.对Spring的支持,ActiveMQ可以很容易内嵌到Spring的系统里面去。
4.通过了常见J2EE服务器,可以让ActiveMQ自动部署到任何兼容J2EE1.4商业服务器。
5.支持多种传输协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。
6.支持通过JDBC和journal提供高速的信息持久化。
7.从设计上保证了高性能的集群,客户端-服务器,点对点。
8.支持Ajax。
9.支持与Axis的整合。
10. 可以很容易的调用内嵌JMS provider,进行测试。
ActiveMQ在Linux系统下的配置
Linux JRE配置
解压缩JRE:
tar -zxvf jre-8u161-linux-i586.tar.gz
修改环境变量:/etc/profile
export JAVA_HOME=/usr/local/jre1.8.0_161
export PATH=$PATH:$JAVA_HOME/bin
环境变量生效:
source /etc/profile
Linux环境安装ActiveMQ
解压缩ActiveMQ
tar -zxvf apache-activemq-5.15.3-bin.tar.gz
安装ActiveMQ
cd apach-activemq-5.15.3
./bin/activemq setup/root/.activemqrc
启动ActiveMQ服务
./bin/linux-x86-32/activemq start
测试服务状态
http://127.0.0.1:61616/
http://127.0.0.1:8161/admin
账户名称/密码:admin
Java连接ActiveMQ
添加POM依赖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency> |
生产者:消息发布者
// 创建ActiveMQ连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.136.131:61616"); // 创建连接对象 Connection connection = connectionFactory.createConnection(); // 启动连接对象 connection.start(); // 创建与ActiveMQ服务器的会话 Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); // 创建消息队列 Queue queue = session.createQueue("BWF"); // 创建生产者 (消息发布者) MessageProducer producer = session.createProducer(queue); // 设置持久化模式 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 准备一条要发送的消息 TextMessage message = session.createTextMessage("BWF发送来的消息!"); // 生产者(消息发布者)发送消息 producer.send( message ); // 会话提交事务 session.commit(); connection.close(); // 关闭连接 |
消费者:消息接收者
// 创建ActiveMQ连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.136.131:61616"); // 创建连接对象 Connection connection = connectionFactory.createConnection(); // 启动连接对象 connection.start(); // 创建与ActiveMQ服务器的会话 Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); // 创建消息队列 Queue queue = session.createQueue("BWF"); // 创建消费者(消息订阅者) MessageConsumer consumer = session.createConsumer(queue); // 循环接受消息 while( true ) { // 接受消息 TextMessage message =(TextMessage) consumer.receive(); System.out.println("接受到来自ActiveMQ的消息:"+message.toString()); } |
SSM整合ActiveMQ
Pom添加依赖:
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.12.2</version> </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.5</version> </dependency> |
添加ActiveMQ配置文件:Spring-activemq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.12.2.xsd"> <!-- 配置ActiveMQ连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.136.131:61616"/> <property name="userName" value="admin"/> <property name="password" value="admin"/> <property name="trustAllPackages" value="true"/> </bean> <!-- 定义消息队列(Queue) --> <bean id="BWFOrderQueue" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>BWFOrderQueue</value> </constructor-arg> </bean> <bean id="BWFProductQueue" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>BWFProductQueue</value> </constructor-arg> </bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="BWFOrderQueue" /> <!--JmsTemplate的接收是阻塞式的接收,默认会一直阻塞等待,直到接收到了消息。也可以设置一个最长的等待时间参数,超过这个时间,接收的方法将得到null的结果。 --> <property name="receiveTimeout" value="10000" /> <!-- true是topic,false是queue,默认是false,此处显示写出false。非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false--> <property name="explicitQosEnabled" value="true" /> <!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久--> <property name="deliveryMode" value="1" /> </bean> <!-- 配置消息队列监听者(Queue) --> <bean id="BWFOrderQueueListener" class="com.bwf.ssm.listener.BWFOrderQueueListener" /> <bean id="BWFProductQueueListener" class="com.bwf.ssm.listener.BWFProductQueueListener" /> <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标,监听器 --> <bean id="queueOrderListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="BWFOrderQueue" /> <property name="messageListener" ref="BWFOrderQueueListener" /> </bean> <bean id="queueProductListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="BWFProductQueue" /> <property name="messageListener" ref="BWFProductQueueListener" /> </bean> </beans> |
消息发布者(生产者):
IOrderProducerService
package com.bwf.ssm.service; import com.bwf.ssm.entity.Order; public interface IOrderProducerService { void add(Order order,Destination destination); } |
OrderProducerService
package com.bwf.ssm.service.impl; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import com.bwf.ssm.entity.Order; import com.bwf.ssm.service.IOrderProducerService; @Service public class OrderProducerService implements IOrderProducerService { @Override public void add(Order order,Destination destination) { System.out.println("向ActiveMQ发送订单:"); jmsTemplate.send(destination,new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(order); } }); } @Resource private JmsTemplate jmsTemplate; } |
消息队列目标:
@Resource(name = "BWFOrderQueue") private ActiveMQQueue orderQueue; @Resource(name = "BWFProductQueue") private ActiveMQQueue productQueue; |
消息订阅者(消费者):
BWFOrderQueueListener
package com.bwf.ssm.listener; import javax.annotation.Resource; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.TextMessage; import com.bwf.ssm.dao.OrderInfoMapper; import com.bwf.ssm.dao.OrderMapper; import com.bwf.ssm.entity.Order; import com.bwf.ssm.entity.OrderInfo; public class BWFOrderQueueListener implements MessageListener { @Override public void onMessage(Message message) { // 获取消息 ObjectMessage om = (ObjectMessage) message; try { // 反序列化消息中的订单对象 Order order = (Order)om.getObject(); System.out.println("收到消息!准备添加订单!"); // 添加订单到数据库 ordermapper.add(order); // 循环订单详情 for( OrderInfo info : order.getOrderInfoList() ) { // 添加订单详情到数据库 orderinfomapper.add(info); } } catch (JMSException e) { e.printStackTrace(); } } @Resource private OrderMapper ordermapper; @Resource private OrderInfoMapper orderinfomapper; } |
package com.bwf.ssm.listener; import javax.annotation.Resource; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import com.bwf.ssm.dao.ProductMapper; import com.bwf.ssm.entity.Order; import com.bwf.ssm.entity.OrderInfo; import com.bwf.ssm.entity.Product; public class BWFProductQueueListener implements MessageListener { @Override public void onMessage(Message message) { // 获取消息 ObjectMessage om = (ObjectMessage) message; try { // 反序列化消息中的订单对象 Order order = (Order)om.getObject(); System.out.println("收到消息!准备修改商品库存和销量!"); // 循环订单中的订单详情 for( OrderInfo info : order.getOrderInfoList() ) { // 获取订单详情中的订购商品 Product product = info.getProduct(); // 修改订购商品的库存和销量 product.setProdStock( product.getProdStock() - info.getAmount() ); product.setProdSale( product.getProdSale() + info.getAmount() ); // 更新商品的库存和销量到数据库 productmapper.update(product); } } catch (JMSException e) { e.printStackTrace(); } } @Resource private ProductMapper productmapper; } |