一个发布者对多个订阅者的消息传递
Topic和queue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。
首先我们如第一章类似,为订阅者1,2分别创建两个监听器MyMessageListener,MyMessageListener2 实现MessageListener接口
- package cn.com.evan.Jms.activemq;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.TextMessage;
- public class MyMessageListener implements MessageListener{
- @Override
- public void onMessage(Message msg) {
- try {
- System.out.println("订阅者1接受消息:"+((TextMessage)msg).getText());
- } catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- package cn.com.evan.Jms.activemq;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.TextMessage;
- public class MyMessageListener2 implements MessageListener{
- @Override
- public void onMessage(Message msg) {
- try {
- System.out.println("订阅者2接受消息:"+((TextMessage)msg).getText());
- } catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
然后我们分别创建订阅者1,2,并创建一个Topic “MyTopic1”用于消息的订阅,订阅者1,2分别设置对应的监听器
- package cn.com.evan.Jms.activemq;
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import org.apache.activemq.ActiveMQConnection;
- import org.apache.activemq.ActiveMQConnectionFactory;
- public class JmsComsumer {
- private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
- private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
- private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
- private static Integer SENDNUM = 10;
- public static void main(String[] args) {
- ConnectionFactory connectionFactory;
- Connection connection = null;
- Session session;
- Destination destination;
- MessageConsumer messageConsumer;
- connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
- BROKERURL);
- try {
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(Boolean.FALSE,
- Session.AUTO_ACKNOWLEDGE);
- destination = session.createTopic("MyTopic1");//create topic
- messageConsumer = session.createConsumer(destination);// Create
- // producer
- messageConsumer.setMessageListener(new MyMessageListener());// set listener which we created before
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- package cn.com.evan.Jms.activemq;
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import org.apache.activemq.ActiveMQConnection;
- import org.apache.activemq.ActiveMQConnectionFactory;
- public class JmsComsumer2 {
- private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
- private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
- private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
- private static Integer SENDNUM = 10;
- public static void main(String[] args) {
- ConnectionFactory connectionFactory;
- Connection connection = null;
- Session session;
- Destination destination;
- MessageConsumer messageConsumer;
- connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
- BROKERURL);
- try {
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(Boolean.FALSE,
- Session.AUTO_ACKNOWLEDGE);
- destination = session.createTopic("MyTopic1");//create topic
- messageConsumer = session.createConsumer(destination);// Create
- // producer
- messageConsumer.setMessageListener(new MyMessageListener2());// set listener which we created before
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
接着我们创建发布者,发布10条消息
- package cn.com.evan.Jms.activemq;
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import org.apache.activemq.ActiveMQConnection;
- import org.apache.activemq.ActiveMQConnectionFactory;
- public class JmsProducer {
- private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
- private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
- private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
- private static Integer SENDNUM = 10;
- public static void main(String[] args) {
- ConnectionFactory connectionFactory;
- Connection connection = null;
- Session session;
- Destination destination;
- MessageProducer messageProducer;
- connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
- BROKERURL);
- try {
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(Boolean.TRUE,
- Session.AUTO_ACKNOWLEDGE);
- destination = session.createTopic("MyTopic1");// Create topic
- messageProducer = session.createProducer(destination);// Create producer
- sendMessage(session,messageProducer);
- session.commit();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }finally{
- if(connection!=null){
- try {
- connection.close();
- } catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- }
- public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{
- for(int i=0;i<JmsProducer.SENDNUM;i++){
- TextMessage message = session.createTextMessage("ActiveMQ"+i);
- messageProducer.send(message);
- System.out.println("发布者发布消息:"+message.getText());
- }
- }
- }
我们打开activeMq 控制台可以看到有2位消费者,消息被消费了20次(每位订阅者分别消费了10条消息)