版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_22271479/article/details/79604986
ActiveMQ基础篇
前言:
本篇介绍两种方式实现activemq的queue模式和topic模式。queue模式是连接在一个目标队列的消费者共享消息,即n消费者平分消息队列中的消息。topic模式是每个消费者在订阅该distation后,都可以获取topic中所有的消息。
1.linux环境下安装ActiveMQ
- 到官网下载linux环境下的压缩包,在linux任意目录下解压。
- 切换到bin 目录执行./activemq start
- 执行 ps -ef|grep activemq 查看activemq是否运行起来了。
2.java程序集成activemq实现queue和topic两种模式
- maven项目的pom.xml
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
- 在使用activemq的时候一般就是获取Connection,启动connection,通过连接创建会话session,然后在穿件消费者/生产者。
- queue模式下的生产者代码
package com.yzz.jms.queen;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by yzz on 2018/3/18.
* mail:[email protected]
* 消息提供者
*/
public class AppProducer {
public static final String url = "tcp://192.168.1.2:61616";
public static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//创建连接
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建会话(单一线程)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目标
Destination destination = session.createQueue(queueName);
//创建生产者
MessageProducer producer = session.createProducer(destination);
//发布消息
for (int i = 0; i < 100; i++) {
TextMessage textMessage = session.createTextMessage("jms-test" + i);
producer.send(textMessage);
System.out.println(textMessage.getText()+"已发送");
}
//关闭连接
connection.close();
}
}
- queue模式下的消费者代码
package com.yzz.jms.queen;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by yzz on 2018/3/18.
* mail:[email protected]
*/
public class AppConsumer {
public static final String url = "tcp://192.168.1.2:61616";
public static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//创建连接
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建会话(单一线程)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目标
Destination destination = session.createQueue(queueName);
//创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText()+"i");
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
- topic模式下的生产者代码
package com.yzz.jms.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by yzz on 2018/3/18.
* mail:[email protected]
* 消息提供者
*/
public class AppProducer {
public static final String url = "tcp://192.168.1.2:61616";
public static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//创建连接
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建会话(单一线程)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目标
Destination destination = session.createTopic(topicName);
//创建生产者
MessageProducer producer = session.createProducer(destination);
//发布消息
for (int i = 0; i < 100; i++) {
TextMessage textMessage = session.createTextMessage("jms-test" + i);
producer.send(textMessage);
System.out.println(textMessage.getText()+"已发送");
}
//关闭连接
connection.close();
}
}
-topic模式下的消费者代码
package com.yzz.jms.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by yzz on 2018/3/18.
* mail:[email protected]
*/
public class AppConsumer {
public static final String url = "tcp://192.168.1.2:61616";
public static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//创建连接
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建会话(单一线程)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目标
Destination destination = session.createTopic(topicName);
//创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText()+"i");
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
3.spring集成activemq实现queue和topic两种模式
- 先引入spring所需的jar包
- 引入spring-jms,activemq-core(去掉spring-context)
queen模式
新建ProducerServiceImpl类来负责发消息,通过JmsTemplate来发送。
package com.yzz.jms.queue.producer.impl; import com.yzz.jms.queue.producer.IProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * Created by yzz on 2018/3/18. * mail:[email protected] */ public class ProducerServiceImpl implements IProducerService{ @Autowired JmsTemplate jmsTemplate; @Resource(name="queueDestination") Destination destination; public void sendMessage(final String message) { jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
- 新建spring配置文件common.xml,consumer.xml,producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!--开启注解--> <context:annotation-config/> <!--activemq提供的连接工厂--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.1.2:61616"></property> </bean> <!--spring jms提供的连接池--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"></property> </bean> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue"/> </bean> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic"/> </bean> </beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<import resource="common.xml"/>
<!--消息监听者-->
<bean id="consumerMessageListener" class="com.yzz.jms.queue.consumer.ConsumerMessageListener"/>
<!--消息监听者容器-->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!--连接工厂-->
<property name="connectionFactory" ref="connectionFactory"/>
<!--目的地-->
<property name="destination" ref="queueDestination"/>
<!--消息监听类-->
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<import resource="common.xml"/>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"></property>
</bean>
<bean class="com.yzz.jms.queue.producer.impl.ProducerServiceImpl"/>
</beans>
- 生产者main方法
package com.yzz.jms.topic.producer.impl;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* Created by yzz on 2018/3/18.
* mail:[email protected]
*/
public class AppProducerLaunch {
public static void main(String[] args){
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("topicProducer.xml");
ProducerServiceImpl producerService = context.getBean(ProducerServiceImpl.class);
for (int i = 0; i <100 ; i++) {
producerService.sendMessage("哈哈"+i);
System.out.println("============"+i);
}
context.close();
}
}
- 消费者main方法
public class AppConsumerLaunch {
public static void main(String[] args){
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("topicConsumer.xml");
}
}
- topic模式下的消费者代码
- 在common.xml添加topic目的地的bean,构造参数是topic,class是org.apache.activemq.command.ActiveMQTopic
- 将生产者的desition换成topic的desition
- 将消费者的listenerContainer的desition换成topic的desition