一、简单介绍:略
二、环境搭建:略
三、本章核心技术关键点
1、使用spring整合activemq
2、使用activemq实现队列中按优先级排序
3、使用JMSTemplate.browse()方法获取队列具体信息
四、spring整合activemq并实现任务优先级的简单demo
步骤:
1、首先需要在pom.xml中配置以下依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
2、编写发送者
public interface ProducerService {
public void sendMessage(final String msg, int priority);
}
@Service
public class ProducerServiceImpl implements ProducerService{
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
@Value("#{config['queueName']}")
private String queueName;
public void sendMessage(final String msg, int priority){
String destination = queueName;
System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息---------------------->"+msg);
//设置队列优先级
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setPriority(priority);
jmsTemplate.send(destination,new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
}
3、编写消费者
public interface ConsumerService {
/**
* 接受消息
*/
void receive();
}
@Service("consumerService")
public class ConsumerServiceImpl implements ConsumerService {
@Resource
private JmsTemplate jmsTemplate;
@Value("#{config['queueName']}")
private String queueName;
/**
* 接受消息
*/
public void receive() {
TextMessage tm = (TextMessage) jmsTemplate.receive(queueName);
try {
System.out.println("从队列" + queueName.toString() + "收到了消息:\t"
+ tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
4、配置文件
applicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-4.0.xsd">
<context:component-scan base-package="com.winner.spring"/>
<bean id="amqConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.1.184:61616"></property>
<property name="userName" value="admin"></property>
<property name="password" value="admin"></property>
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<context:component-scan base-package="com.activemq"></context:component-scan>
<util:properties id="config" location="classpath:properties/config.properties"/>
</beans>
config.properties:
activemq_url = failover:(tcp://192.168.1.184:61616)
queueName = test
5、测试类
package com.activemq.spring;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TestSpringSend
{
public static void main(String[] args)
{
AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-*.xml");
ProducerServiceImpl bean = applicationContext.getBean(ProducerServiceImpl.class);
bean.sendMessage("0",0);
bean.sendMessage("2",2);
bean.sendMessage("0",0);
bean.sendMessage("6",6);
bean.sendMessage("2",2);
bean.sendMessage("9",9);
bean.sendMessage("4",4);
bean.sendMessage("9",9);
bean.sendMessage("6",6);
bean.sendMessage("4",4);
applicationContext.close();
}
}
package com.activemq.spring;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TestSpringReceive
{
public static void main(String[] args)
{
AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-*.xml");
ConsumerService bean = applicationContext.getBean(ConsumerService.class);
while(true)
{
try {
bean.receive();
Thread.sleep(3000);
} catch (Exception e) {
}
}
}
}
经过上述代码,我们便实现了最基础的同步收/发操作,同时也实现了优先级的功能。其结果如下:
发送信息:
接受信息:从结果中我们可以看到,消息确实按照了优先级顺序进行读取
四、ActivemqUtils的编写
为了方便,我们编写了ActivemqUtils.java类,该类实现YYSchedule所需要的activemq的基本功能,比如消息的发送、消息的接受、消息的浏览、查看队列的长度等。凡是需要用到activemq的,都可以直接引用该类来进行直接操作。
在下面的代码中,sendContext以及receiveContext方法 是基于上一大点的简单demo进行修改,因此在这里便不进行讲解。
实际上本节需要关注的,是剩余的三个方法,它们都采用了jmsTemplate的browse方法。
以getQueueSize()方法为例:browse()方法实际上就是获取队列的所有信息,与平时方法不同之处在于,它不会消费这些信息,因此可以用于查看队列长度,在getQueueSize()方法中,我们通过browser.getEnumeration()方法浏览了队列的所有消息,然后再遍历所有的Enumeration即消息,统计消息的个数,从而得到了队列现在的长度。
/**
*
*/
package com.YYSchedule.store.util;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import org.springframework.jms.core.BrowserCallback;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import com.YYSchedule.common.rpc.domain.container.Context;
/**
* @author ybt
*
* @date 2018年7月12日
* @version 1.0
*/
public class ActiveMQUtils
{
private static final Logger LOGGER = Logger.getLogger(ActiveMQUtils.class);
/**
* 发送context到指定queue
* 有优先级
*
* @param jmsTemplate
* @param queue
* @param context
* @param priority
*/
public static void sendContext(JmsTemplate jmsTemplate,String queue,final Context context, int priority)
{
LOGGER.info(Thread.currentThread().getName() + " 向队列" + queue + "发送Context[" + context.getTaskId() + "]");
//设置队列优先级
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setPriority(priority);
jmsTemplate.send(queue,new MessageCreator() {
public ObjectMessage createMessage(Session session) throws JMSException {
return session.createObjectMessage(context);
}
});
}
/**
* 发送context到指定queue
* 无优先级
*
* @param jmsTemplate
* @param queue
* @param context
* @param priority
*/
public static void sendContext(JmsTemplate jmsTemplate,String queue,final Context context)
{
LOGGER.info(Thread.currentThread().getName() + " 向队列" + queue + "发送Context[" + context.getTaskId() + "]");
jmsTemplate.send(queue,new MessageCreator() {
public ObjectMessage createMessage(Session session) throws JMSException {
return session.createObjectMessage(context);
}
});
}
/**
* 从指定queue中获取context
*
* @param jmsTemplate
* @param queue
* @return context
* @throws JMSException
*/
public static Context receiveContext(JmsTemplate jmsTemplate,String queue) throws JMSException
{
Context context;
try {
ObjectMessage objectMessage = (ObjectMessage) jmsTemplate.receive(queue);
context = (Context)objectMessage.getObject();
} catch (JMSException e) {
LOGGER.error("从队列"+queue+"中获取Context失败!");
throw new JMSException("从队列"+queue+"中获取Context失败!");
}
return context;
}
/**
*
* @param queue
* @return 队列未消费的信息数queueSize
*/
public static int getQueueSize(JmsTemplate jmsTemplate,String queue)
{
Integer queueSize = jmsTemplate.browse(queue, new BrowserCallback<Integer>(){
@Override
public Integer doInJms(Session session, QueueBrowser browser)
throws JMSException
{
Enumeration enumeration = browser.getEnumeration();
int num = 0;
while (enumeration.hasMoreElements()) {
enumeration.nextElement();
num++;
}
return num;
}
});
return queueSize;
}
/**
*
* @param queue
* @return 队列中所有消息的taskId属性组成的list
*/
public static List<Long> getQueueTaskIdList(JmsTemplate jmsTemplate,String queue)
{
List<Long> taskIdList = jmsTemplate.browse(queue, new BrowserCallback<List<Long>>(){
@Override
public List<Long> doInJms(Session session, QueueBrowser browser)
throws JMSException
{
List<Long> taskIdList = new ArrayList<Long>();
@SuppressWarnings("unchecked")
Enumeration<ObjectMessage> enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
ObjectMessage objectMessage = enumeration.nextElement();
Context context = (Context)objectMessage.getObject();
if(context!=null)
{
taskIdList.add(context.getTaskId());
}
}
return taskIdList;
}
});
return taskIdList;
}
/**
*
* @param queue
* @return 队列中所有消息的timeout属性组成的list
*/
public static List<Long> getQueueTimeoutList(JmsTemplate jmsTemplate,String queue)
{
List<Long> timeoutList = jmsTemplate.browse(queue, new BrowserCallback<List<Long>>(){
@Override
public List<Long> doInJms(Session session, QueueBrowser browser)
throws JMSException
{
List<Long> timeoutList = new ArrayList<Long>();
@SuppressWarnings("unchecked")
Enumeration<ObjectMessage> enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
ObjectMessage objectMessage = enumeration.nextElement();
Context context = (Context)objectMessage.getObject();
if(context!=null)
{
timeoutList.add(context.getTimeout());
}
}
return timeoutList;
}
});
return timeoutList;
}
}