简介
在前面的 一些文章里,我曾经对消息队列编程,以及消息队列通信方式做了一些总结。总的来说,那些示例是采用activemq服务器并且使用原生的代码来访问它们。在实际的代码实现中,采用原生api访问会显得非常的繁琐。因为我们要和里面一堆复杂的东西打交道,比如说Connection, ConnectionFactory, Session, Producer, Destination等等。这种复杂的代码结构非常容易出错而且也不容易关注于具体的业务逻辑。于是spring提供了一个jmsTemplate,可以在很大程度上简化它们。
比较
在使用spring开发具体示例前,我们先看一下一个简单发送消息到消息队列的示例:
public class Producer { private static String brokerURL = "tcp://localhost:61616"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; private transient MessageProducer producer; private static int id = 1000000; private String jobs[] = new String[]{"suspend", "delete"}; public Producer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException { Producer producer = new Producer(); producer.sendMessage(); producer.close(); } public void sendMessage() throws JMSException { Destination destination = session.createQueue("JOBS"); Message message = session.createObjectMessage(id); producer.send(destination, message); } }
在上述的代码里,我们需要定义消息队列服务器的连接地址,然后创建ConnectionFactory, connection, session等等。而真正发消息的动作呢,在sendMessage方法里就一个producer.send()方法。从真正使用者的角度来说,我们是希望将brokerURL, connection, session的创建等东西都封装和隐藏起来,通过提供参数让它们可以很好的配置。这样才能让我们的代码更加简洁。于是在这一点上,spring jmstemplate确实帮了不少忙。
依赖定义
Activemq服务器
为了运行示例,我们需要有一个activemq的服务器。activemq的下载地址如下:http://activemq.apache.org/download.html 下载到本地解压到某个目录。然后进入到bin目录下,运行命令:
./activemq start
这样activemq服务器就运行起来了。这个时候,服务器的默认brokerURL是: tcp://localhost:61616。在服务器运行起来之后,如果我们想要了解它的详细情况,可以通过一个web console来查看。在浏览器里输入如下地址: http://localhost:8161/admin 则可以看到如下的页面。
这是系统提示的登录页面,默认的用户名和密码都是admin。输入之后则可以看到如下的页面:
在图中的Queues和Topics这一栏中我们还可以看到运行时具体的Queue和Topic有哪些。针对具体activemq的配置和管理可以参考activemq的官方文档。这里就不再赘述。
工程依赖定义
在我们使用maven创建的工程里,主要依赖的类库内容如下:
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.6.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.6.RELEASE</version> </dependency> <dependency>
其中activemq-all主要用来提供访问所有activemq的api,只是提供的都是原生的api。而引入的spring-jms类库则是在它们的基础上提供了的封装。在我们后面的示例里,因为对ConnectionFactory作了连接池封装,所以还额外引入了如下的两个依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.10.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.3</version> </dependency>
定义好了基础的依赖之后,剩下的就是针对消息的发送和接收进行讨论了。
发送消息
既然在前面的讨论里已经提到过,对于spring来说,它收发消息的核心就是jmsTemplate。那么首先应该看看它的配置是怎么样的。下面是一个典型的jmsTemplate配置文件内容:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="location" value="classpath:datasource.properties"/> </bean> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.brokerURL}"/> <property name="userName" value="${jms.userName}"/> <property name="password" value="${jms.password}"/> </bean> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="${jms.messageTopic}" /> </bean> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="${jms.messageTopic}" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestination" ref="destination"/> </bean> <bean id="messageSender" class="com.yunzero.activemq.ActivemqMessageSender"> <constructor-arg index="0" ref="jmsTemplate"/> </bean> </beans>
我们针对前面的内容做一个解释。首先我们需要建立一个类型为org.apache.activemq.ActiveMQConnectionFactory的connectionFactory。这里它定义了我们需要访问的activemq服务器的url以及访问它需要的用户名和密码。
接着我们定义了两个bean, 一个是org.apache.activemq.command.ActiveMQQueue的destination。它表示对应的一个点对点通信的queue。而后面定义的destinationTopic对应的是采用广播方式通信的topic。我们知道,在jms通信的规范里有两种发送消息的方式,一种是基于点对点的queue的方式,主要用于一个发送者发消息给一个接收者的情况。另外一种则是基于广播的topic的方式,主要用于一个发送者发消息给若干个接收者。
接着就是我们要用到的关键部分,spring里面预先定义好了的jmsTemplate,它的类型是org.springframework.jms.core.JmsTemplate。它需要配置的两个主要属性分别就是connectionFactory和destination。这样,通过这个模板就已经解决了往哪个服务器的哪个地方发的问题了。剩下的就是我们定义的一个bean,它封装了jmsTemplate来发送消息。
有了这些配置,我们实际上使用它们的代码则非常简单。我们定义的ActivemqMessageSender的实现代码如下:
package com.yunzero.activemq; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; @Component public class ActivemqMessageSender implements MessageSender { private JmsTemplate jmsTemplate; public ActivemqMessageSender(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Override public void sendMessage(String message) { jmsTemplate.convertAndSend(message); } }
在上述的代码里,实际上jmsTemplate发送消息的方法有若干个, 我们可以参照文档针对发送的不同类型消息来处理。在示例里我们仅仅是发送一个简单的字符串。
这里为了保证一定程度的松耦合,专门定义了一个接口MessageSender:
package com.yunzero.activemq; public interface MessageSender { void sendMessage(String message); }
接下来,我们尝试发送一个简单的消息出去:
public class App { public static void main( String[] args ) { ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("sample1.xml"); MessageSender sender = ctx.getBean(MessageSender.class); sender.sendMessage("Hello"); ctx.close(); } }
在执行完上面这部分代码之后,我们看消息队列服务器里面的消息,则会发现里面多了一个名字为test的队列。因为我们前面对应的配置文件里指定了连接配置消息服务器的信息:
jms.brokerURL=tcp://localhost:61616 jms.userName= jms.password= jms.messageTopic=test
在页面里,显示的内容如下图:
这里表示我们发送成功的一条消息已经保存在服务器了。我们剩下的就是需要有一个接收端来处理这个消息。
接收消息
同步接收消息
我们接收消息的方式其实也有两种方式。一种相当于同步接收消息的方式。这种方式就是利用jmsTemplate提供的receive方法。采用这种方式的一种最简单接收消息的方式如下:
public class ActivemqMessageReceiver implements MessageReceiver { private JmsTemplate jmsTemplate; public ActivemqMessageReceiver(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Override public String receiveMessage() { String result = (String)jmsTemplate.receiveAndConvert(); return result; } }
上述代码中接收消息的方法除了receiveAndConvert,还要其他的方法。具体可以查阅相关的文档。该类实现的接口如下:
public interface MessageReceiver { String receiveMessage(); }
同时,我们需要在配置文件里定义该bean对象,里面需要增加如下部分:
<bean id="messageReceiver" class="com.yunzero.activemq.ActivemqMessageReceiver"> <constructor-arg index="0" ref="jmsTemplate"/> </bean>
当然,我们可以根据文档提供的方法来对接收到的消息做各种处理和类型转换。
这种同步接收消息的方式比较有意思,因为我们在这里每调用它一次,它就接收一条消息。如果我们希望它保持监听某个队列的状态时,一般需要定义一个无限循环。
除了这种方式,还要一种就是消息监听的方式。相当于一个异步的消息处理。
异步接收消息
异步接收消息的方式在用纯jms的api时,我们也可以找到官方的介绍。无非就是定义一个实现MessageListener接口的类。在onMessage方法里实现接收到消息后的处理办法。在spring jms里也很简单。它需要做的改动有两个。一个是定义配置文件,一个典型的配置如下:
<jms:listener-container container-type="default" connection-factory="connectionFactory" acknowledge="auto" concurrency="10"> <jms:listener destination="${jms.messageTopic}" ref="messageListener" method="onMessage" /> </jms:listener-container> <bean id="messageListener" class="com.yunzero.activemq.ActivemqMessageListener"/>
在这里,我们引入了一个jms的命名空间。所以在定义文档的开头,我们需要引入它们的空间定义。文件定义头如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd">
在这里引入了相关的命名空间定义之后。我们需要的就是做一个接收消息的实现。这个实现和纯jms的基本上一样。也是实现MessageListener:
public class ActivemqMessageListener implements MessageListener { @Override public void onMessage(Message msg) { TextMessage textMsg = (TextMessage)msg; try { String message = textMsg.getText(); System.out.println(message); } catch (JMSException e) { e.printStackTrace(); } } }
而执行这部分的代码很简单。只要用applicationContext创建起来,它就相当于被注册了在保持一个运行的状态。完整的实现可以参看附件。
总结
Spring集成jms的过程其实并不复杂。它们交互的核心就是jmsTemplate。对于jmsTemplate的配置需要指定至少两个参数,一个是连接服务器的connectionFactory,另外一个则是发送的目的主题,比如是哪个quue还是topic。另外,消息交互的方式有两种,分别是基于quue和topic的。而接收消息也有两种方式,一种是基于jmsTemplate本身的同步接收消息方法,还要一种是注册事件通知的方式。它是一种异步的接收消息方式。在实际的情况我们可以根据具体业务的需要来选择。
参考材料
http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/core/JmsTemplate.html