ActiveMQ 2 订阅者模式实现

一个发布者对多个订阅者的消息传递


Topic和queue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。


首先我们如第一章类似,为订阅者1,2分别创建两个监听器MyMessageListener,MyMessageListener2 实现MessageListener接口

[java]  view plain  copy
  1. package cn.com.evan.Jms.activemq;  
  2.   
  3. import javax.jms.JMSException;  
  4. import javax.jms.Message;  
  5. import javax.jms.MessageListener;  
  6. import javax.jms.TextMessage;  
  7.   
  8. public class MyMessageListener implements MessageListener{  
  9.   
  10.     @Override  
  11.     public void onMessage(Message msg) {  
  12.           
  13.         try {  
  14.             System.out.println("订阅者1接受消息:"+((TextMessage)msg).getText());  
  15.         } catch (JMSException e) {  
  16.             // TODO Auto-generated catch block  
  17.             e.printStackTrace();  
  18.         }  
  19.           
  20.     }  
  21.       
  22.       
  23.   
  24. }  

[java]  view plain  copy
  1. package cn.com.evan.Jms.activemq;  
  2.   
  3. import javax.jms.JMSException;  
  4. import javax.jms.Message;  
  5. import javax.jms.MessageListener;  
  6. import javax.jms.TextMessage;  
  7.   
  8. public class MyMessageListener2 implements MessageListener{  
  9.   
  10.     @Override  
  11.     public void onMessage(Message msg) {  
  12.           
  13.         try {  
  14.             System.out.println("订阅者2接受消息:"+((TextMessage)msg).getText());  
  15.         } catch (JMSException e) {  
  16.             // TODO Auto-generated catch block  
  17.             e.printStackTrace();  
  18.         }  
  19.           
  20.     }  
  21.       
  22.       
  23.   
  24. }  

然后我们分别创建订阅者1,2,并创建一个Topic “MyTopic1”用于消息的订阅,订阅者1,2分别设置对应的监听器

[java]  view plain  copy
  1. package cn.com.evan.Jms.activemq;  
  2.   
  3. import javax.jms.Connection;  
  4. import javax.jms.ConnectionFactory;  
  5. import javax.jms.Destination;  
  6. import javax.jms.JMSException;  
  7. import javax.jms.MessageConsumer;  
  8. import javax.jms.MessageProducer;  
  9. import javax.jms.Session;  
  10. import javax.jms.TextMessage;  
  11.   
  12. import org.apache.activemq.ActiveMQConnection;  
  13. import org.apache.activemq.ActiveMQConnectionFactory;  
  14.   
  15. public class JmsComsumer {  
  16.     private static String USERNAME = ActiveMQConnection.DEFAULT_USER;  
  17.     private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;  
  18.     private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;  
  19.     private static Integer SENDNUM = 10;  
  20.   
  21.     public static void main(String[] args) {  
  22.         ConnectionFactory connectionFactory;  
  23.         Connection connection = null;  
  24.         Session session;  
  25.         Destination destination;  
  26.         MessageConsumer messageConsumer;  
  27.   
  28.         connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,  
  29.                 BROKERURL);  
  30.         try {  
  31.             connection = connectionFactory.createConnection();  
  32.             connection.start();  
  33.             session = connection.createSession(Boolean.FALSE,  
  34.                     Session.AUTO_ACKNOWLEDGE);  
  35.             destination = session.createTopic("MyTopic1");//create topic  
  36.             messageConsumer = session.createConsumer(destination);// Create  
  37.                                                                     // producer  
  38.   
  39.             messageConsumer.setMessageListener(new MyMessageListener());// set listener which we created before  
  40.         } catch (Exception e) {  
  41.             // TODO Auto-generated catch block  
  42.             e.printStackTrace();  
  43.         }  
  44.   
  45.     }  
  46.   
  47. }  
[java]  view plain  copy
  1. package cn.com.evan.Jms.activemq;  
  2.   
  3. import javax.jms.Connection;  
  4. import javax.jms.ConnectionFactory;  
  5. import javax.jms.Destination;  
  6. import javax.jms.JMSException;  
  7. import javax.jms.MessageConsumer;  
  8. import javax.jms.MessageProducer;  
  9. import javax.jms.Session;  
  10. import javax.jms.TextMessage;  
  11.   
  12. import org.apache.activemq.ActiveMQConnection;  
  13. import org.apache.activemq.ActiveMQConnectionFactory;  
  14.   
  15. public class JmsComsumer2 {  
  16.     private static String USERNAME = ActiveMQConnection.DEFAULT_USER;  
  17.     private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;  
  18.     private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;  
  19.     private static Integer SENDNUM = 10;  
  20.   
  21.     public static void main(String[] args) {  
  22.         ConnectionFactory connectionFactory;  
  23.         Connection connection = null;  
  24.         Session session;  
  25.         Destination destination;  
  26.         MessageConsumer messageConsumer;  
  27.   
  28.         connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,  
  29.                 BROKERURL);  
  30.         try {  
  31.             connection = connectionFactory.createConnection();  
  32.             connection.start();  
  33.             session = connection.createSession(Boolean.FALSE,  
  34.                     Session.AUTO_ACKNOWLEDGE);  
  35.             destination = session.createTopic("MyTopic1");//create topic  
  36.             messageConsumer = session.createConsumer(destination);// Create  
  37.                                                                     // producer  
  38.   
  39.             messageConsumer.setMessageListener(new MyMessageListener2());// set listener which we created before  
  40.         } catch (Exception e) {  
  41.             // TODO Auto-generated catch block  
  42.             e.printStackTrace();  
  43.         }  
  44.   
  45.     }  
  46.   
  47. }  

接着我们创建发布者,发布10条消息

[java]  view plain  copy
  1. package cn.com.evan.Jms.activemq;  
  2.   
  3. import javax.jms.Connection;  
  4. import javax.jms.ConnectionFactory;  
  5. import javax.jms.Destination;  
  6. import javax.jms.JMSException;  
  7. import javax.jms.MessageProducer;  
  8. import javax.jms.Session;  
  9. import javax.jms.TextMessage;  
  10.   
  11. import org.apache.activemq.ActiveMQConnection;  
  12. import org.apache.activemq.ActiveMQConnectionFactory;  
  13.   
  14. public class JmsProducer {  
  15.   
  16.     private static String USERNAME = ActiveMQConnection.DEFAULT_USER;  
  17.     private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;  
  18.     private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;  
  19.     private static Integer SENDNUM = 10;  
  20.   
  21.     public static void main(String[] args) {  
  22.         ConnectionFactory connectionFactory;  
  23.         Connection connection = null;  
  24.         Session session;  
  25.         Destination destination;  
  26.         MessageProducer messageProducer;  
  27.   
  28.         connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,  
  29.                 BROKERURL);  
  30.         try {  
  31.             connection = connectionFactory.createConnection();  
  32.             connection.start();  
  33.             session = connection.createSession(Boolean.TRUE,  
  34.                     Session.AUTO_ACKNOWLEDGE);  
  35.             destination = session.createTopic("MyTopic1");// Create topic  
  36.             messageProducer = session.createProducer(destination);// Create producer  
  37.             sendMessage(session,messageProducer);  
  38.             session.commit();  
  39.         } catch (Exception e) {  
  40.             // TODO Auto-generated catch block  
  41.             e.printStackTrace();  
  42.         }finally{  
  43.             if(connection!=null){  
  44.                 try {  
  45.                     connection.close();  
  46.                 } catch (JMSException e) {  
  47.                     // TODO Auto-generated catch block  
  48.                     e.printStackTrace();  
  49.                 }  
  50.             }  
  51.         }  
  52.   
  53.     }  
  54.       
  55.     public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{  
  56.         for(int i=0;i<JmsProducer.SENDNUM;i++){  
  57.             TextMessage message = session.createTextMessage("ActiveMQ"+i);  
  58.             messageProducer.send(message);  
  59.             System.out.println("发布者发布消息:"+message.getText());  
  60.         }  
  61.     }  
  62.   
  63. }  
以上发布者和2个订阅者都已经创建完毕,我们先运行订阅者1,2,再运行发布者(就像我们平时要接收到什么新闻,都得先订阅那频道,才可以接收到相关频道发布的消息),运行结果如下





我们打开activeMq 控制台可以看到有2位消费者,消息被消费了20次(每位订阅者分别消费了10条消息)

猜你喜欢

转载自blog.csdn.net/clz1314521/article/details/51854698