一处产生消息,两处消费,故采取Topic模式的activemq.
二. 下载及安装
http://activemq.apache.org/下载最新版本。
解压, 进入安装目录 bin/activemq start xbean:conf/activemq.xml
http://localhost:8161/admin查看mq状态,用户名和密码在conf/jetty-realm.properties,默认是admin, admin
至此server已启动。
三. server配置
conf/activemq.xml
1. message cursors,
引用
介绍的比较详细,大部分情况下用默认的Store-based cursor就能满足需求
2. producer-flow-control
http://activemq.apache.org/producer-flow-control.html
生产者流量控制,当内存和硬盘空间到达的时候控制流量,抛出异常或挂起等待,个人觉得在有message cursors的时候可以禁用
3. slow consumer handling
对于nondurable 的consumer,要求broker未其保存未消费的message在内存中,会导致生产者阻塞,其他的快速消费者变慢,所以可以设置一个限制大小的消息队列。
constantPendingMessageLimitStrategy
4. jmx enable
未研究todo
四. ps
about durable topics, non-durable模式下,如果订阅者不在线,那么其不在线期间topic上收到的消息将不会被该订阅者收到,如果想要一个订阅者收到topic上的所有消息,请使用durable模式, 推荐使用durable模式,可以免去生产者流量控制和慢消费者处理的麻烦
about transaction mode, server 有transactionstore, 缓存所有的message和acks不执行,直到commit 和 rollback到达,另外也不把commit 和 rollback之前的message和acks放到分发的队列上去
五. durable subscriber样例代码
public class AMQConnectionFactory { private String url; private String user; private String pwd; private ActiveMQConnectionFactory activeMQConnectionFactory; public AMQConnectionFactory(String user, String pwd, String url){ this.url = url; this.user = user; this.pwd = pwd; this.activeMQConnectionFactory = new ActiveMQConnectionFactory(user, pwd, url); } public Connection getConnection(){ Connection connection = null; try { connection = activeMQConnectionFactory.createConnection(); connection.start(); } catch (JMSException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } return connection; } public Connection getDurableConnection(String clientId){ Connection connection = null; try { connection = activeMQConnectionFactory.createConnection(); connection.setClientID(clientId); connection.start(); } catch (JMSException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } return connection; } public Session getACKSession(){ Session session = null; Connection connection = getConnection(); if(connection != null){ try { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } return session; } public Session getDurableACKSession(String clientId){ Session session = null; Connection connection = getDurableConnection(clientId); if(connection != null){ try { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } return session; } }
producer:
public class TestProducer { public void run() { ApplicationContext ctx=new ClassPathXmlApplicationContext(new String[]{"applicationContext.xml"}); AMQConnectionFactory amqConnectionFactory = (AMQConnectionFactory) ctx.getBean("amqConnectionFactory"); Session session = amqConnectionFactory.getACKSession(); try{ Topic topic = session.createTopic("inc.message"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); produce(producer); }catch (JMSException jm){ jm.printStackTrace(); } System.exit(0); } public void produce(MessageProducer producer) { Message message = new ActiveMQMapMessage(); try{ message.setStringProperty("message", "hello"); producer.send(message); int i = 0; while(true){ i++; message.setStringProperty("message", "hello" +i); producer.send(message); System.out.println("produce message: " + message.getStringProperty("message")); try { Thread.sleep(100L); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } }catch (JMSException exception){ exception.printStackTrace(); } } }
consumer
public class TestConsumer { public void run(String clientId) { ApplicationContext ctx=new ClassPathXmlApplicationContext(new String[]{"applicationContext.xml"}); AMQConnectionFactory amqConnectionFactory = (AMQConnectionFactory) ctx.getBean("amqConnectionFactory"); Session session2 = amqConnectionFactory.getDurableACKSession(clientId); try{ Destination destination = session2.createTopic("inc.message"); IncConsumerListener incConsumerListener = (IncConsumerListener) ctx.getBean("incConsumerListener"); MessageConsumer consumer = session2.createDurableSubscriber((Topic) destination, "bbbb"); consumer.setMessageListener(incConsumerListener); }catch (JMSException jmse){ jmse.printStackTrace(); } } public static void main (String args[]){ TestConsumer testConsumer = new TestConsumer(); testConsumer.run("iiii"); } }