ActiveMQ入门示例
ActiveMQ有两种模式,点对点和发布/订阅模式,点对点中消息只能被一个消费者消费,而发布订阅中,消息可以被一群消费者消费,很好理解。下面的例子是点对点的
安装ActiveMQ很简单就不说了,客户端使用API只需添加以下依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
代码:
Sender.java:
package cc.lixiaohui.test.jms.activemq.p2p; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; import cc.lixiaohui.test.jms.activemq.Constants; public class Sender { //发送间隔 private long interval = 1 * 1000; private ConnectionFactory factory; private Connection conn; private Destination dest; private Session session; private MessageProducer producer; // 单线程池负责发送 private ExecutorService worker = Executors.newSingleThreadExecutor(); private volatile boolean stop = false; private static final Logger logger = Logger.getLogger(Sender.class); public Sender(String brokenURL, String user, String passwd, String queueName) throws JMSException { factory = new ActiveMQConnectionFactory(user, passwd, brokenURL); conn = factory.createConnection(); conn.start(); session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); dest = session.createQueue(queueName); producer = session.createProducer(dest); } public void setInterval(long l) { interval = l; } public void start() { worker.submit(new SendTask()); } public synchronized void stop() { stop = true; worker.shutdown(); } private TextMessage randomMsg() { String uuid = UUID.randomUUID().toString(); TextMessage msg = null; try { msg = session.createTextMessage(uuid);//把uuid作为消息 msg.setJMSCorrelationID(uuid); } catch (JMSException e) { e.printStackTrace(); } return msg; } private class SendTask implements Runnable { public void run() { logger.info("Send task begin..."); while (!stop) { try { // 发送 TextMessage msg = randomMsg(); producer.send(msg); session.commit(); // commit后消息才会发送到服务端 logger.info("Send text message : " + msg.getText()); // 间隔 Thread.sleep(interval); } catch (Exception e) { e.printStackTrace(); break; } } logger.info("Send task finished..."); } } public static void main(String[] args) throws JMSException { Sender sender = new Sender(Constants.URL, Constants.USER, Constants.PASSWD, Constants.DEFAULT_QUEUE); sender.start(); } }
Reciever.java:
package cc.lixiaohui.test.jms.activemq.p2p; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; import cc.lixiaohui.test.jms.activemq.Constants; public class Reciever { public static final int RECIEVE_MODE_SYNC = 0; public static final int RECIEVE_MODE_ASYNC = 1; private ConnectionFactory factory; private Connection conn; private Destination dest; private Session session; private MessageConsumer consumer; private ExecutorService worker = Executors.newSingleThreadExecutor(); private volatile boolean stop = false; private long interval = 3 * 1000; private static final Logger logger = Logger.getLogger(Reciever.class); // 同步/异步接收模式,默认同步 private int mode = RECIEVE_MODE_SYNC; public Reciever(String brokenURL, String user, String passwd, String queueName) throws JMSException { factory = new ActiveMQConnectionFactory(user, passwd, brokenURL); conn = factory.createConnection(); conn.start(); session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); dest = session.createQueue(queueName); consumer = session.createConsumer(dest); } public void setInterval(long l) { interval = l; } public void setMode(int mode) { this.mode = mode; } public void start() throws JMSException { if (mode == RECIEVE_MODE_ASYNC) {// 异步 logger.info("Recieved task begin in async mode..."); // 由activemq组件回调 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { handleRecievedMessage(message); try { session.commit(); } catch (JMSException e) { e.printStackTrace(); stop = true; } } }); } else if (mode == RECIEVE_MODE_SYNC) {// 同步, 由于另起线程, 这里也不阻塞 worker.submit(new RecieveTask()); } } private void handleRecievedMessage(Message recievedMsg) { if (recievedMsg instanceof TextMessage) { TextMessage msg = (TextMessage) recievedMsg; try { logger.info("Recieved message : " + msg.getText()); } catch (JMSException e) { e.printStackTrace(); } } } public void stop() { stop = true; } private class RecieveTask implements Runnable { public void run() { logger.info("Recieved task begin in sync mode..."); while (!stop) { try { Message msg = consumer.receive(); handleRecievedMessage(msg); session.commit(); Thread.sleep(interval); } catch (Exception e) { e.printStackTrace(); break; } } logger.info("Recieve task finished..."); } } public static void main(String[] args) throws JMSException { Reciever reciever = new Reciever(Constants.URL, Constants.USER, Constants.PASSWD, Constants.DEFAULT_QUEUE); reciever.setMode(RECIEVE_MODE_ASYNC); reciever.start(); } }
测试结果:
1.Reciever在ASYNC模式,注意看日志时间,可以看到Sender一旦了消息,Reciever就会接受到消息(忽略网络)
2.Reciever在ASYNC模式下,Sender每秒发一个消息,而接收者每3秒接收一个消息:可以看到Reciever接受的时间是和Sender发送的时间是无联系的。