参考官网文档
前言
ActiveMQ是基于JMS接口标准的消息队列中间件。JMS的了解可以参考这篇博文 ,深入了解可以去看一看《Java消息服务》这本书。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.4</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.4</version>
</dependency>
简单实例
package com.frame.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Create by Administrator on 2018/7/27
*/
public class App {
public static void main(String[] args) throws Exception {
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
Thread.sleep(1000);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(1000);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(1000);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
}
public static void thread(Runnable runnable, boolean daemon) {
Thread brokerThread = new Thread(runnable);
brokerThread.setDaemon(daemon);
brokerThread.start();
}
public static class HelloWorldProducer implements Runnable {
@Override
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","failover://tcp://localhost:61616");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a messages
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);
// Tell the producer to send the message
System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);
// Clean up
session.close();
connection.close();
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
}
public static class HelloWorldConsumer implements Runnable, ExceptionListener {
@Override
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","failover://tcp://localhost:61616");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
@Override
public synchronized void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down client.");
}
}
}
基本流程 :
ConnectionFactory -> Connection -> Session -> Destination -> MessageProducer/MessageCustomer -> send/receive
与spring整合 --点到点通讯模式
spring-activemq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.3.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd">
<context:component-scan base-package="com.frame.spring.activemq"/>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616?trace=true</value>
</property>
</bean>
</property>
<!--最大的connectionsPool数量-->
<property name="maxConnections" value="100"></property>
</bean>
<!--使用缓存可以提升效率-->
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="10"/>
<!-- 设置session缓存个数(示例值),如果连接工厂是生产者使用,那么那么sessionSize的数量影响到并发的性能,如果是消费者使用,那么这个设置没有影响-->
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<!-- 配置转换 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
<!--测试Queue,队列的名字是spring-queue-->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!--<constructor-arg index="0" value="spring-queue"/>-->
<constructor-arg name="name" value="spring-queue"/>
</bean>
<!--测试Topic-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean>
<!--消息接收者容器 匹配对应的destination -->
<!--<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="destinationTopic"/>
<property name="messageListener" ref="amqTestMessageListener"/>
</bean>-->
<!--
<bean id="jmsContainer2"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="destinationTopic"/>
<property name="messageListener" ref="amqTestMessageListener2"/>
</bean> -->
<!-- topic -->
<jms:listener-container
connection-factory="cachingConnectionFactory"
destination-type="queue">
<jms:listener destination="spring-queue" ref="amqTestMessageListener"/>
</jms:listener-container>
<!--消息监听器-->
<bean id="amqTestMessageListener" class="com.frame.spring.activemq.AMQTestMessageListener"/>
<bean id="amqTestMessageListener2" class="com.frame.spring.activemq.AMQTestMessageListener2"/>
</beans>
配置消息监听器(消费者)
package com.frame.spring.activemq;
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 消费者1
* @author Administrator
*
*/
public class AMQTestMessageListener implements MessageListener{
@Override
public void onMessage(Message msg) {
if(msg instanceof TextMessage){
TextMessage txtMsg = (TextMessage) msg;
String message;
try {
message = txtMsg.getText();
//实际项目中拿到String类型的message(通常是JSON字符串)之后,
//会进行反序列化成对象,做进一步的处理
System.out.println(new Date() + "consumer1 receive txt msg===" + message);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
配置生成者服务
package com.frame.spring.activemq;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
/**
* 生产者服务
* @author Administrator
*
*/
@Service("amqSenderService")
public class AMQSenderServiceImpl {
/**
* jms模板
*/
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
/**
* 目的地队列
*/
@Resource(name="destinationQueue")
private Destination destinationQueue;
/**
*
*/
@Resource(name="destinationTopic")
private Destination destinationTopic;
/**
* 发送字符串消息
*/
public void sendTextMsg(String message){
//或调用jmsTemplate配置的messageConverter,这里我们配置的SimpleMessageConverter
jmsTemplate.convertAndSend(destinationQueue, message);
//手动实现转换
/*jmsTemplate.send(destinationQueue,new MessageCreator(){
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});*/
}
public void sendTopicTextMsg(String message){
jmsTemplate.send(destinationTopic,new MessageCreator(){
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
测试
package com.frame.spring.activemq;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.junit.*;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:spring/spring-activemq.xml"})
public class ActivemqTest {
@Autowired
private AMQSenderServiceImpl amqSenderServiceImpl;
@Test
public void test() throws Exception{
amqSenderServiceImpl.sendTextMsg("向消息队列发送消息" + new Date());
TimeUnit.SECONDS.sleep(2);
amqSenderServiceImpl.sendTextMsg("向消息队列发送消息" + new Date());
TimeUnit.SECONDS.sleep(2);
amqSenderServiceImpl.sendTextMsg("向消息队列发送消息" + new Date());
TimeUnit.SECONDS.sleep(2);
amqSenderServiceImpl.sendTextMsg("向消息队列发送消息" + new Date());
}
}
结果
Wed Aug 01 17:22:48 CST 2018consumer1 receive txt msg===向消息队列发送消息Wed Aug 01 17:22:48 CST 2018
Wed Aug 01 17:22:50 CST 2018consumer1 receive txt msg===向消息队列发送消息Wed Aug 01 17:22:50 CST 2018
Wed Aug 01 17:22:52 CST 2018consumer1 receive txt msg===向消息队列发送消息Wed Aug 01 17:22:52 CST 2018
Wed Aug 01 17:22:54 CST 2018consumer1 receive txt msg===向消息队列发送消息Wed Aug 01 17:22:54 CST 2018