1、什么是JMS
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持(百度百科给出的概述)。我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合。
ActiveMQ 有如下特点
- 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
- 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
- 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
- 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 支持通过JDBC和journal提供高速的消息持久化
- 从设计上保证了高性能的集群,客户端-服务器,点对点
- 支持Ajax
- 支持与Axis的整合
- 可以很容易得调用内嵌JMS provider,进行测试
在接下来的这篇博客中,我会和大家一起来整合Spring 和ActiveMq,这篇博文,我们基于Spring+JMS+ActiveMQ+Tomcat
spring整合active MQ需要使用如下jar
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.6</version>
</dependency>
<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.12.1</version>
</dependency>
1.jar 引用完后 需要配置active MQ相关的首先配置连接工厂
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://192.168.25.132:61616" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>
2.配置JmsTemplate ,他有queue 和topic两种模式,主要通过pubSubDomain 属性来指定,true是topic,false是queue,默认是false
<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 配置JMS模板(Topic),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默认是false,此处显示写出true -->
<property name="pubSubDomain" value="true" />
</bean>
3.定义队列-----由于send方法是重载方法,支持多种定义目的地的方式,在这里简单直接的点的话 ,在定义queue 或者topic的对列名字的时候我们可以直接写字符串的,也可以在配置文件中配置,具体如下
jmsTemplate.send()方法用于发送消息,可以设置消息的送达地,可以是字符串指定,也可以通过配置文件指定Destination 然后注入使用。在使用字符串的时候queue和topic都可以,但是通过Destination 配置时候有点差别
、
<!-- 定义消息队列(Queue) 队列名字summerQueue-->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>summerQueue</value>
</constructor-arg>
</bean>
<!-- 定义消息队列(topic) 队列名字summerTopic-->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>summerTopic</value>
</constructor-arg>
</bean>
public void sendMessage(Destination destination,final String msg){
System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+msg);
jmsQueueTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
消息的接收
对于消息的接收配置方式也有两种,一种是直接配置bean,一种是使用jms标签
<!-- 配置消息队列监听者(Queue) -->
<bean id="queueMessageListener" class="com.summer.smart.listener.QueueMessageListener" />
<!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="demoQueueDestination" />
<property name="messageListener" ref="queueMessageListener" />
</bean>
<!-- 配置消息队列监听者(Queue) -->
<bean id="queueAaaMessageListener" class="com.summer.smart.listener.QueueAaaMessageListener" />
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue"
container-type="default" connection-factory="connectionFactory"
acknowledge="auto">
<jms:listener destination="aaa" ref="queueAaaMessageListener" />
</jms:listener-container>
<!-- 配置消息队列监听者(topic) -->
<bean id="messageListener1" class="com.summer.smart.listener.MessageListener1" />
<!-- 定义topic监听器 -->
<jms:listener-container destination-type="topic"
container-type="default" connection-factory="connectionFactory"
acknowledge="auto">
<jms:listener destination="aaa" ref="messageListener1" />
</jms:listener-container>
监听器需要实现MessageListener 接口
package com.summer.smart.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class QueueAaaMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println("aaa这个queue监听到了:"+text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}