package cn.itcats.activeMQTest;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
public class ActiveMQTest {
/**
* 点到点形式发送消息
* @throws JMSException
*/
@Test
public void queue_ProducerTest() throws JMSException {
//创建连接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://10.211.55.9:61616");
//获取连接对象
Connection conn = connectionFactory.createConnection();
//开启连接
conn.start();
//创建session对象 参数1:是否开启activemq事务,true开启事务,则参数2无意义 参数2:应答模式,分为自动应答和手动应答
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //自动提交
//通过session获取Destination对象 两种形式 queue 和 topic
Queue queue = session.createQueue("queueName");
//通过session获取生产者对象
MessageProducer producer = session.createProducer(queue);
//通过session获取消费者对象
//封装消息 还可以 TextMessage textMessage = new ActiveMQTextMessage();
//textMessage.sendText("hello!");
TextMessage textMessage = session.createTextMessage("message!!!");
//发送消息
producer.send(textMessage);
//关闭资源
producer.close();
session.close();
conn.close();
}
}
/**
* queue_consume 队列 消费者取消息
* @throws JMSException
* @throws IOException
*/
@Test
public void queue_consumerTest() throws JMSException, IOException {
// 创建工厂连接对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://10.211.55.9:61616");
// 获取一个连接对象
Connection conn = connectionFactory.createConnection();
// 开启连接
conn.start();
// 通过连接对象获取一个session对象
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 不开启事务,自动应答
// 通过session创建一个Destination对象,此处是Queue对象
Queue queue = session.createQueue("queueName"); // 参数要和提供者一致,否则取不到消息
// 创建consumer对象
MessageConsumer consumer = session.createConsumer(queue);
// 接受消息(消费者消费消息)
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
String text = "";
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 等待消息,防止资源关闭
System.in.read();
// 关闭资源
conn.close();
session.close();
consumer.close();
}
package cn.itcats.activeMQTest;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
public class ActiveMQTopicTest {
/**
* topic模式下的 producer方代码
*
* @throws JMSException
*/
@Test
public void topic_producerTest() throws JMSException {
// 创建一个连接工厂对象
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://10.211.55.9:61616");
// 获取连接对象
Connection conn = factory.createConnection();
// 开启连接
conn.start();
// 通过连接对象获取一个session对象
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Destination对象(子接口为Queue和Topic),此处使用Topic
Topic topic = session.createTopic("myTopicName");
// 创建一个producer对象
MessageProducer producer = session.createProducer(topic);
// 封装消息,并发送消息
TextMessage text = session.createTextMessage("message content");
producer.send(text);
// 关闭资源
producer.close();
session.close();
conn.close();
}
/**
* topic模式下的 consumer方代码
*
* @throws JMSException
*/
@Test
public void topic_consumerTest() throws Exception {
// 创建一个连接工厂对象
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://10.211.55.9:61616");
// 获取连接对象
Connection conn = factory.createConnection();
// 开启连接
conn.start();
// 通过连接对象获取一个session对象
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Destination对象(子接口为Queue和Topic),此处使用Topic
Topic topic = session.createTopic("myTopicName");
// 创建一个producer对象
MessageConsumer consumer = session.createConsumer(topic);
// 接收信息(监听器)
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
String text = "";
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("consumer3 is loading...");
// 线程等待,不让程序结束,一直处于监听状态
System.in.read();
// 关闭资源
consumer.close();
session.close();
conn.close();
}
}
Activemq整合spring
第一步:引用相关的jar包。
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> |
第二步:配置Activemq整合spring。配置ConnectionFactory
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://10.211.55.9:61616" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> </beans> |
第三步:配置生产者。
使用JMSTemplate对象。发送消息。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://10.211.55.9:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="itemAddToSolr" />
</bean>
</beans>
第四步:配置消费者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://10.211.55.9:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="itemAddToSolr" />
</bean>
<!--自定义的消息监听器,实现MessageListener接口 -->
<bean id="xxxMessageListener" class="cn.xxxx.xxxMessageListener" />
<!-- 接收消息 -->
<!-- 配置监听器 -->
<!-- 消息监听容器 -->
<bean
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="xxxMessageListener" />
</bean>
</beans>
自定义的监听器用于接受消息
package cn.xxx.xxx.message;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
String text = textMessage.getText();
System.out.println(text);
// 书写后续业务,若消息生产者存在事务中,则考虑接收后等待事务提交
// 如对数据库进行插入后发消息,则需等待,防止事务未提交而对传递到消费者出的text进行查询操作
// thread.sleep(500);
}catch(Exception e) {
}
}
}