package jms; import javax.jms.TopicConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSFactory { private static ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); public static TopicConnectionFactory getActiveMQConnectionFactory(){ return activeMQConnectionFactory; } }
jms.JMSMessageActor
package jms; import javax.jms.*; public abstract class JMSMessageActor { protected String name=null; protected String defaultQueueName="defalut-queue"; protected String defaultTopicName="defalut-topic"; protected final int DESTIONATION_TYPE_TOPIC=1; protected final int DESTIONATION_TYPE_QUEUE=2; public JMSMessageActor(String name) { this.name = name; } public abstract Destination getDestination(); public Destination createDefaultDestination(int type){ Destination dest=null; switch(type){ case DESTIONATION_TYPE_TOPIC: dest=new Topic(){ @Override public String getTopicName() throws JMSException { return defaultTopicName; }};break; case DESTIONATION_TYPE_QUEUE: dest=new Queue(){ @Override public String getQueueName() throws JMSException { return defaultQueueName; }}; break; } return dest; } }
jms.JMSMessageConsumer
package jms; import javax.jms.*; public abstract class JMSMessageConsumer extends JMSMessageActor implements Runnable,MessageListener{ public JMSMessageConsumer(String name){ super(name); } @Override public void onMessage(Message message) { synchronized(JMSMessageConsumer.class){ System.out.println("##### consumer "+ name +" receive message. #####"); System.out.println(JMSUtil.formatMessage(message)); } } @Override public void run() { try{ // get topic connect factory ConnectionFactory factory = JMSFactory.getActiveMQConnectionFactory(); // create connection Connection connection = factory.createConnection(); // create unique client id for the connection connection.setClientID("consumer_connection_"+name); // if the connection start method is not invoked , the consumer may be not receive the message connection.start(); // create session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination=getDestination(); // if the destination is an instance of Queue , // it will receive the message from the queue, // in other words the message can be consumed one time by one consumer // and the message is durable. // if the destination is an instance of Topic , // the subscribers of the Topic can receive the message, // but the message is non durable. MessageConsumer consumer =session.createConsumer(destination,null,true); // if the destination is an instance of Topic, // specify the clientID of the connection // and create MessageConsumer like this, // the subscribers of the Topic can receive the message // and the message is durable. //consumer =session.createDurableSubscriber((Topic)destination, "durable topic", null, true); consumer.setMessageListener(this); }catch(Exception e){ throw new RuntimeException(e); } } }
jms.JMSMessageProducer
package jms; import javax.jms.*; public abstract class JMSMessageProducer extends JMSMessageActor implements Runnable{ public JMSMessageProducer(String name){ super(name); } @Override public void run() { try{ // get topic connect factory ConnectionFactory factory = JMSFactory.getActiveMQConnectionFactory(); // create connection Connection connection = factory.createConnection(); // create session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination=getDestination(); // create message producer MessageProducer producer = session.createProducer(destination); // create message MapMessage mapMessage=session.createMapMessage(); mapMessage.setObjectProperty("lock", "key"); mapMessage.setObjectProperty("vegetables", "greens"); mapMessage.setObjectProperty("fruit", "apple"); mapMessage.setObjectProperty("meat", "pork"); // producer.setTimeToLive(1000);// set the message expiration time producer.send(mapMessage);// send message connection.close(); System.out.println(name+" send message success!"); }catch(Exception e){ throw new RuntimeException(e); } } }
jms.JMSMessageQueueReceiver
package jms; import javax.jms.Destination; public class JMSMessageQueueReceiver extends JMSMessageConsumer { public JMSMessageQueueReceiver(String name) { super(name); } @Override public Destination getDestination() { return createDefaultDestination(DESTIONATION_TYPE_QUEUE); } }
jms.JMSMessageQueueSender
package jms; import javax.jms.Destination; public class JMSMessageQueueSender extends JMSMessageProducer { public JMSMessageQueueSender(String name) { super(name); } @Override public Destination getDestination() { return createDefaultDestination(DESTIONATION_TYPE_QUEUE); } }
jms.JMSMessageTopicPublisher
package jms; import javax.jms.Destination; public class JMSMessageTopicPublisher extends JMSMessageProducer { public JMSMessageTopicPublisher(String name) { super(name); } @Override public Destination getDestination() { return createDefaultDestination(DESTIONATION_TYPE_TOPIC); } }
jms.JMSMessageTopicSubscriber
package jms; import javax.jms.Destination; public class JMSMessageTopicSubscriber extends JMSMessageConsumer { public JMSMessageTopicSubscriber(String name) { super(name); } @Override public Destination getDestination() { return createDefaultDestination(DESTIONATION_TYPE_TOPIC); } }
jms.JMSUtil
package jms; import java.text.SimpleDateFormat; import java.util.*; import javax.jms.*; public class JMSUtil { private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @SuppressWarnings("unchecked") public static String formatMessage(Message message) { StringBuffer result=new StringBuffer(); try { String correlationId=message.getJMSCorrelationID(); int deliveryMode=message.getJMSDeliveryMode(); long expiration=message.getJMSExpiration(); String messageId=message.getJMSMessageID(); int priority=message.getJMSPriority(); long timestamp=message.getJMSTimestamp(); String type=message.getJMSType(); result.append("##### message property #####\n") .append("correlationId : "+correlationId+"\n") .append("deliveryMode : "+deliveryMode+"\n") .append("expiration : "+expiration+"\n") .append("messageId : "+messageId+"\n") .append("priority : "+priority+"\n") .append("timestamp : "+sdf.format(new Date(timestamp))+"\n") .append("type : "+type+"\n"); Enumeration<String> names=message.getPropertyNames(); result.append("##### message content #####\n"); while(names.hasMoreElements()){ String name=names.nextElement(); String value=message.getStringProperty(name); result.append(name +" : "+value+"\n"); } } catch (JMSException e) { throw new RuntimeException(e); } return result.toString(); } }
jms.Main
package jms; public class Main { public static void main(String[] args) throws InterruptedException { sendMesasge(); Thread.sleep(2000); createMessageReceiver(); Thread.sleep(3000); System.exit(0); } /* create message receiver */ private static void createMessageReceiver(){ // create queue message receiver for(int i=0;i<3;i++){ Thread t=new Thread(new JMSMessageQueueReceiver("queue_receiver_"+i)); t.start(); } // create topic message subscriber for(int i=0;i<3;i++){ Thread t=new Thread(new JMSMessageTopicSubscriber("topic_subscriber_"+i)); t.start(); } } /* send message */ private static void sendMesasge(){ // create queue message sender for(int i=0;i<3;i++){ Thread t=new Thread(new JMSMessageQueueSender("queue_sender_"+i)); t.start(); } // create topic publisher for(int i=0;i<3;i++){ Thread t=new Thread(new JMSMessageTopicPublisher("topic_publisher_"+i)); t.start(); } } }