调度平台YYSchedule-细节-activemq的使用

一、简单介绍:略

二、环境搭建:略

三、本章核心技术关键点

1、使用spring整合activemq

2、使用activemq实现队列中按优先级排序

3、使用JMSTemplate.browse()方法获取队列具体信息

四、spring整合activemq并实现任务优先级的简单demo

步骤:

1、首先需要在pom.xml中配置以下依赖

 	<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.0.0.RELEASE</version>
	</dependency>
	<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.9.0</version>
	</dependency>
	<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.9.0</version>
	</dependency>

2、编写发送者

public interface ProducerService {

    public void sendMessage(final String msg, int priority);
}
@Service
public class ProducerServiceImpl implements ProducerService{


    @Resource(name="jmsTemplate")
    private JmsTemplate jmsTemplate;


    @Value("#{config['queueName']}")
    private String queueName;
    
    public void sendMessage(final String msg, int priority){
        String destination = queueName;
        System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息---------------------->"+msg);
        
        //设置队列优先级
        jmsTemplate.setExplicitQosEnabled(true);
        jmsTemplate.setPriority(priority);
        
        jmsTemplate.send(destination,new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }
}

3、编写消费者

public interface ConsumerService {
    /**
     * 接受消息
     */
    void receive();
}
@Service("consumerService")
public class ConsumerServiceImpl implements ConsumerService {
    @Resource
    private JmsTemplate jmsTemplate;


    @Value("#{config['queueName']}")
    private String queueName;
    /**
     * 接受消息
     */
    public void receive() {
        TextMessage tm = (TextMessage) jmsTemplate.receive(queueName);
        try {
            System.out.println("从队列" + queueName.toString() + "收到了消息:\t"
                    + tm.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

4、配置文件

applicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
        http://www.springframework.org/schema/util 
        http://www.springframework.org/schema/util/spring-util-4.0.xsd">

    <context:component-scan base-package="com.winner.spring"/>

	 <bean id="amqConnectionFactory" 
        class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.1.184:61616"></property>
        <property name="userName" value="admin"></property>
        <property name="password" value="admin"></property>
    </bean>

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <property name="sessionCacheSize" value="100" />
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
    
    <context:component-scan base-package="com.activemq"></context:component-scan>

	<util:properties id="config" location="classpath:properties/config.properties"/>
</beans>

config.properties:

activemq_url 							= 	failover:(tcp://192.168.1.184:61616)
queueName									=	test

5、测试类

package com.activemq.spring;

import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class TestSpringSend
{
	public static void main(String[] args)
	{
		AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-*.xml");
		
		ProducerServiceImpl bean = applicationContext.getBean(ProducerServiceImpl.class);
		
		bean.sendMessage("0",0);
		bean.sendMessage("2",2);
		bean.sendMessage("0",0);
		bean.sendMessage("6",6);
		bean.sendMessage("2",2);
		bean.sendMessage("9",9);
		bean.sendMessage("4",4);
		bean.sendMessage("9",9);
		bean.sendMessage("6",6);
		bean.sendMessage("4",4);		
		
		applicationContext.close();
	}
}
package com.activemq.spring;

import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class TestSpringReceive
{
	public static void main(String[] args)
	{
		AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-*.xml");
		
		ConsumerService bean = applicationContext.getBean(ConsumerService.class);
		
		while(true)
		{
			try {
				bean.receive();
				Thread.sleep(3000);
			} catch (Exception e) {
			}
		}
		
		
	}
}

  经过上述代码,我们便实现了最基础的同步收/发操作,同时也实现了优先级的功能。其结果如下:

  发送信息:

  接受信息:从结果中我们可以看到,消息确实按照了优先级顺序进行读取

四、ActivemqUtils的编写

    为了方便,我们编写了ActivemqUtils.java类,该类实现YYSchedule所需要的activemq的基本功能,比如消息的发送、消息的接受、消息的浏览、查看队列的长度等。凡是需要用到activemq的,都可以直接引用该类来进行直接操作。

  在下面的代码中,sendContext以及receiveContext方法 是基于上一大点的简单demo进行修改,因此在这里便不进行讲解。

实际上本节需要关注的,是剩余的三个方法,它们都采用了jmsTemplate的browse方法。

  以getQueueSize()方法为例:browse()方法实际上就是获取队列的所有信息,与平时方法不同之处在于,它不会消费这些信息,因此可以用于查看队列长度,在getQueueSize()方法中,我们通过browser.getEnumeration()方法浏览了队列的所有消息,然后再遍历所有的Enumeration即消息,统计消息的个数,从而得到了队列现在的长度。

/**
 * 
 */
package com.YYSchedule.store.util;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;
import org.springframework.jms.core.BrowserCallback;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import com.YYSchedule.common.rpc.domain.container.Context;

/**
 * @author ybt
 *
 * @date 2018年7月12日  
 * @version 1.0  
 */
public class ActiveMQUtils
{
	private static final Logger LOGGER = Logger.getLogger(ActiveMQUtils.class);

	/**
	 * 发送context到指定queue
	 * 有优先级
	 * 
	 * @param jmsTemplate
	 * @param queue
	 * @param context
	 * @param priority
	 */
	 public static void sendContext(JmsTemplate jmsTemplate,String queue,final Context context, int priority)
	 {
	        LOGGER.info(Thread.currentThread().getName() + " 向队列" + queue + "发送Context[" + context.getTaskId() + "]");
	        
	        //设置队列优先级
	        jmsTemplate.setExplicitQosEnabled(true);
	        jmsTemplate.setPriority(priority);
	        
	        jmsTemplate.send(queue,new MessageCreator() {
	            public ObjectMessage createMessage(Session session) throws JMSException {
	                return session.createObjectMessage(context);
	            }
	        });
	    }
	 
		/**
		 * 发送context到指定queue
		 * 无优先级
		 * 
		 * @param jmsTemplate
		 * @param queue
		 * @param context
		 * @param priority
		 */
		 public static void sendContext(JmsTemplate jmsTemplate,String queue,final Context context)
		 {
		        LOGGER.info(Thread.currentThread().getName() + " 向队列" + queue + "发送Context[" + context.getTaskId() + "]");
		        
		        jmsTemplate.send(queue,new MessageCreator() {
		            public ObjectMessage createMessage(Session session) throws JMSException {
		                return session.createObjectMessage(context);
		            }
		        });
		    }
	 
	 
	 /**
	  * 从指定queue中获取context
	  * 
	  * @param jmsTemplate
	  * @param queue
	  * @return context
	 * @throws JMSException 
	  */
	 public static Context receiveContext(JmsTemplate jmsTemplate,String queue) throws JMSException
	 {
		 Context context;
		 try {
			 ObjectMessage objectMessage = (ObjectMessage) jmsTemplate.receive(queue);
			 context = (Context)objectMessage.getObject();
	        } catch (JMSException e) {
	            LOGGER.error("从队列"+queue+"中获取Context失败!");
	            throw new JMSException("从队列"+queue+"中获取Context失败!");
	        }
		 return context;
	 }
	 
	 /**
	  * 
	  * @param queue
	  * @return 队列未消费的信息数queueSize
	  */
	 public static int getQueueSize(JmsTemplate jmsTemplate,String queue)
	 {
		 Integer queueSize = jmsTemplate.browse(queue, new BrowserCallback<Integer>(){

			@Override
			public Integer doInJms(Session session, QueueBrowser browser)
					throws JMSException
			{
				Enumeration enumeration = browser.getEnumeration();
				int num = 0;
		        while (enumeration.hasMoreElements()) {
		        	enumeration.nextElement();
		        	num++;
		        }
		        return num;
			}
			 
		});
		
		return queueSize;
	 }
	 
	 /**
	  * 
	  * @param queue
	  * @return 队列中所有消息的taskId属性组成的list
	  */
	 public static List<Long> getQueueTaskIdList(JmsTemplate jmsTemplate,String queue)
	 {
		 List<Long> taskIdList = jmsTemplate.browse(queue, new BrowserCallback<List<Long>>(){

			@Override
			public List<Long> doInJms(Session session, QueueBrowser browser)
					throws JMSException
			{
				List<Long> taskIdList = new ArrayList<Long>();
				
				@SuppressWarnings("unchecked")
				Enumeration<ObjectMessage> enumeration = browser.getEnumeration();
		        while (enumeration.hasMoreElements()) {
		        	ObjectMessage objectMessage = enumeration.nextElement();
		        	Context context = (Context)objectMessage.getObject();
		        	if(context!=null)
		        	{
		        		taskIdList.add(context.getTaskId());
		        	}
		        }
		        return taskIdList;
			}
			 
		});
		 
		return taskIdList;
	 }
	 
	 /**
	  * 
	  * @param queue
	  * @return 队列中所有消息的timeout属性组成的list
	  */
	 public static List<Long> getQueueTimeoutList(JmsTemplate jmsTemplate,String queue)
	 {
		 List<Long> timeoutList = jmsTemplate.browse(queue, new BrowserCallback<List<Long>>(){

			@Override
			public List<Long> doInJms(Session session, QueueBrowser browser)
					throws JMSException
			{
				List<Long> timeoutList = new ArrayList<Long>();
				
				@SuppressWarnings("unchecked")
				Enumeration<ObjectMessage> enumeration = browser.getEnumeration();
		        while (enumeration.hasMoreElements()) {
		        	ObjectMessage objectMessage = enumeration.nextElement();
		        	Context context = (Context)objectMessage.getObject();
		        	if(context!=null)
		        	{
		        		timeoutList.add(context.getTimeout());
		        	}
		        }
		        return timeoutList;
			}
			 
		});
		 
		return timeoutList;
	 }
}

    

猜你喜欢

转载自blog.csdn.net/bintoYu/article/details/80825665