简介
ActiveMQ是Apache出品的一个消息队列(Message Queue)软件,它可以与诸如C#、C++、PHP、Java等语言进行整合。本文重点叙述的是与Java Web中Spring框架的整合,ActiveMQ很好地实现了JMS接口,为编写高并发的应用程序提供了高效的解决方案。
整合思路
Spring最厉害的地方就是它的Bean了,还有它特有的IOC(控制反转)和AOP(面向切面编程)技术。有了这些,我们就可以不用new关键字构造对象,同时,可以方便地使用注入往类中的属性进行初始化。如果你编写过ActiveMQ之类的JMS应用程序,无论对于消息的生产者还是消费者,最重要的接口有以下两个:
1.ConnectionFactory
2.Destination
ConnectionFactory是一切的基础,有了它才有了Connection,然后才有Session,只有通过Session对象,我们才能创建消息队列、构建生产者/消费者,继而发送/接收消息。
Destination是一切的归宿,它就像总线一样,生产者发出消息要发到它上面,消费者取消息也要从这上面取。
试想,如果这一切都能借助Spring强大的Bean管理的话,我们在编写程序的时候会更加的方便简洁。幸运的是,ActiveMQ官方提供了完美的Spring框架支持,一切只需要在xml文件中配置即可~
Spring官方提供了一个叫JmsTemplate的类,这个类就专门用来处理JMS的,在该类的Bean配置标签中有两个属性connectionFactory-ref和defaultDestination-ref正好对应JMS中的ConnectionFactory和Destination,如果你有兴趣查看源码的话,就可以发现JmsTemplate帮我们做了大量创建的工作,我们只需要用它来进行收发信息就ok了,而ActiveMQ官方也提供了对应的实现包。
以上是一个基本的思路,还有一些细节会在整合的过程中说明。
第一步,加入jar包
基本的jar包就不细说了,包括java.jms、activemq-core、activemq-pool、spring、log4j..
第二步,加入配置
进入正题,我们首先需要在Spring的applicationContext.xml 中添加相关的配置:如下
1.首先是各种限制和约束,你的框架搭好也就有了
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:util="http://www.springframework.org/schema/util" xmlns:mvn="http://www.springframework.org/schema/mvc"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd">
2.因为要用到注解,我们开启自动扫描
<context:component-scan base-package="com.zhangb">
<context:exclude-filter type="annotation"
expression="org.springframework.stereotype.Controller" />
</context:component-scan>
3.加载配置文件,方便我们配置activemq以及其他配置文件
<!-- 加载配置文件 -->
<bean id="propertyConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:jdbc.properties</value>
<value>classpath:common.properties</value>
</list>
</property>
</bean>
4.整合mq相关的配置文件
<!--连接到ActiveMQ,创建一个ConnectionFactory -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jmsconfig.url}" />
<property name="userName" value="${jmsconfig.user}"/>
<property name="password" value="${jmsconfig.password}"/>
</bean>
<!--对上步创建的ConnectionFactory进行缓存包装,这样做的目的是提升性能,
对sessions, connections 和 producers进行缓存复用,减少开销。 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg name="targetConnectionFactory" ref="activeMQConnectionFactory" />
<property name="sessionCacheSize" value="10" />
</bean>
<!--构建JmsTemplate,JmsTemplate帮我们做了大量创建的工作,我们只需要用它来进行收发信息就ok -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg name="connectionFactory" ref="connectionFactory" />
<property name="pubSubDomain" value="false" />
</bean>
5.对于消息消费者Spring官方提供了一个叫 DMLS(DefaultMessageListenerContainer)的容器,它能有效克制MDB(Message Driven Beans)的缺点。要使用这个容器,我们需要创建自己的监听器,并且注册进容器中,这样一旦目的地有消息,就会自动触发监听事件。
<!-- 创建监听, -->
<jms:listener-container destination-type="queue"
connection-factory="connectionFactory">
<jms:listener destination="${jmsconfig.queue_name1}" ref="jmsMessageListener"/>
<!-- <jms:listener destination="${jmsconfig.queue_name3}" ref="jmsMessageListenerFromSP"/> -->
</jms:listener-container>
其中,connection-factory
与前面用org.apache.activemq.ActiveMQConnectionFactory
包创建的bean的id对应,listener标签中destination
填前面创建的目的地名称,ref
填listener的bean id。
第三步:创建监听器
我们创建一个名为JmsMessageListener的监听器
package com.zhangb.auth.jms;
import java.io.Serializable;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
/**
* 用户同步信息监听类,在监听类中处理消费逻辑
*
* @author run
* @date 2019/10/15
*/
@Component
public class JmsMessageListener implements MessageListener {
@Autowired
private OrganizationService organizationService;
@Autowired
private UserService userService;
@Autowired
private SyncDeviceService syncDeviceService;
@Autowired
TaskInfoService taskInfoService;
private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageListener.class);
private static final String MESSAGE_SUCCESS = "消费成功";
private static final String MESSAGE_FAILE = "消费失败";
@Override
public void onMessage(Message paramMessage) {
// TODO
try {
if (paramMessage instanceof TextMessage) {
TextMessage message = (TextMessage) paramMessage;
String messageSb = message.getText();
Map<String, String> parseObject = JSONObject.parseObject(messageSb,
new TypeReference<Map<String, String>>() {
});
if (parseObject.get("org") != null) {
// 组织信息消费
String string = parseObject.get("org");
List<Organization> parseArray = JSONObject.parseArray(string, Organization.class);
organizationService.save(parseArray);
LOGGER.info(MESSAGE_SUCCESS + messageSb);
}
if (parseObject.get("user") != null) {
// 用户信息消费
String string = parseObject.get("user");
List<User> parseArray = JSONObject.parseArray(string, User.class);
userService.save(parseArray);
LOGGER.info(MESSAGE_SUCCESS + messageSb);
}
if (parseObject.get("device") != null) {
// 设备信息消费
String string = parseObject.get("device");
List<SyncDevice> parseArray = JSONObject.parseArray(string, SyncDevice.class);
syncDeviceService.saveDevice(parseArray);
LOGGER.info(MESSAGE_SUCCESS + messageSb);
}
} else if (paramMessage instanceof MapMessage) {
MapMessage message = (MapMessage) paramMessage;
Enumeration enumer = message.getMapNames();
while (enumer.hasMoreElements()) {
Object obj = enumer.nextElement();
System.out.println(message.getObject(obj.toString()));
}
// saveTaskInfos(paramMessage);
} else if (paramMessage instanceof StreamMessage) {
StreamMessage message = (StreamMessage) paramMessage;
System.out.println(message.readString());
System.out.println(message.readBoolean());
System.out.println(message.readLong());
System.out.println(message.readByte());
} else if (paramMessage instanceof ObjectMessage) {
ObjectMessage message = (ObjectMessage) paramMessage;
Serializable object = message.getObject();
} else if (paramMessage instanceof BytesMessage) {
BytesMessage message = (BytesMessage) paramMessage;
byte[] byteContent = new byte[1024];
StringBuffer content = new StringBuffer();
int length = -1;
while ((length = message.readBytes(byteContent)) != -1) {
content.append(new String(byteContent, 0, length));
}
System.out.println(content.toString());
} else {
System.out.println(paramMessage);
}
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
LOGGER.info(MESSAGE_FAILE);
}
}
}
继续提供一个审批相关的例子
import java.io.Serializable;
import java.util.Date;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
/**
* 用户同步信息监听类,在监听类中处理消费逻辑
*
* @author run
* @date 2019/10/15
*/
@Component
public class JmsMessageListenerFromSP implements MessageListener {
@Autowired
TaskInfoService taskInfoService;
private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageListenerFromSP.class);
private static final String MESSAGE_SUCCESS = "消费成功";
private static final String MESSAGE_FAILE = "消费失败";
@Override
public void onMessage(Message paramMessage) {
// TODO
try {
if (paramMessage instanceof TextMessage) {
TextMessage message = (TextMessage) paramMessage;
String messageSb = message.getText();
Map<String, String> parseObject = JSONObject.parseObject(messageSb,
new TypeReference<Map<String, String>>() {
});
// 审批中心对接用户操作权限审批流程
if (parseObject.get("taskCenter") != null) {
String string = parseObject.get("taskCenter");
List parseArray = JSONObject.parseArray(string, List.class);
Map<String, String> map = (Map<String, String>) parseArray.get(0);
String taskStatus = MapUtils.getString(map, "taskStatus");
String taskId = MapUtils.getString(map, "taskInfoId");
TaskInfo task = taskInfoService.getTaskById(taskId);
if (task != null) {
if ("1".equals(taskStatus)) {
task.setTaskStatus("4");// 状态(0.草稿 1.待审批 2.审批中 3.审批未通过 4审批通过)
} else {
task.setTaskStatus("3");
}
task.setLastUpdateDate(new Date());
task.setEndType("1");
taskInfoService.saveTaskInfo(task);
}
LOGGER.info(MESSAGE_SUCCESS + messageSb);
}
} else if (paramMessage instanceof ObjectMessage) {
ObjectMessage message = (ObjectMessage) paramMessage;
Serializable object = message.getObject();
} else if (paramMessage instanceof BytesMessage) {
BytesMessage message = (BytesMessage) paramMessage;
byte[] byteContent = new byte[1024];
StringBuffer content = new StringBuffer();
int length = -1;
while ((length = message.readBytes(byteContent)) != -1) {
content.append(new String(byteContent, 0, length));
}
System.out.println(content.toString());
} else {
System.out.println(paramMessage);
}
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
LOGGER.info(MESSAGE_FAILE);
}
}
}
第四步:创建生产者
我们创建生产者就可以对应的往服务器上的mq发送相关的消息了
package com.zhangb.auth.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
@Component
public class JmsQueueProducer {
@Autowired
private JmsTemplate jmsTemplate;
private static final Logger LOGGER = LoggerFactory.getLogger(JmsQueueProducer.class);
public void send(String queue, String message) {
jmsTemplate.send(queue, new MessageCreator() {
@Override
public Message createMessage(Session session) {
TextMessage createTextMessage = null;
try {
createTextMessage = session.createTextMessage(message);
LOGGER.info("Queue :" + queue + " Success ! message:" + message);
} catch (JMSException e) {
// TODO Auto-generated catch block
LOGGER.error("Queue :" + queue + " Error ! message:" + message);
e.printStackTrace();
}
return createTextMessage;
}
});
}
}
给我们的审批发送相关消息的例子
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class JmsQueueProducerToRZ extends JmsQueueProducer {
@Value("${jmsconfig.queue_name2}")
private String queue;
public void send(String message) {
super.send(queue, message);
}
}
那么我们如何调用上面发送消息的方法呢,如下
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
@Controller
@RequestMapping(value = "/subuser", produces = "application/json;charset=utf-8")
public class SubUserController extends BaseController {
@Autowired
private UserSubService subUserService;
//先将此类注入到我们的控制器
@Autowired
private JmsQueueProducerToRZ jmsQueueProducerToRZ;
@RequestMapping(value = "/addNewSubUser")
@ResponseBody
public String addNewSubUser(HttpServletRequest request) {
JSONObject resultObj = new JSONObject();
return resultObj.toString();
}
@RequestMapping(value = "/updateSubUserStatus")
@ResponseBody
public String updateSubUserStatus(HttpServletRequest request, String subUserId, String subStatus) {
JSONObject resultObj = new JSONObject();
boolean isSucc = false;
try {
subUserService.updateSubUserStatus(Long.parseLong(subUserId), Integer.parseInt(subStatus));
getSubUserList(request);
isSucc = true;
} catch (Exception e) {
e.printStackTrace();
}
if (isSucc) {
resultObj.put("status", "success");
resultObj.put("message", "修改子用户状态成功");
UserSub subUser = subUserService.getSubUserById(Long.valueOf(subUserId));
User user = null;
if (subUser != null) {
user = userService.findUserByUserId(subUser.getUserId());
List<Organization> org = userService.getUserOrganizations(subUser.getUserId());
if (org != null && org.size() > 0) {
long[] organIds = new long[] { org.get(0).getId() };
user.setOrganizationIds(organIds);
}
}
// 发送至MQ
List<UserSub> subList = new ArrayList<>();
subList.add(subUser);
user.setSubUserList(subList);
List<User> addList = new ArrayList<>();
addList.add(user);
Map<String, Object> map = new HashMap<>();
map.put("user", addList);
//调用方法发送到mq
jmsQueueProducerToRZ.send(JsonUtil.toJson(map));
}
return getSubUserList(request);
}
}