JMS Api Demo

jms.JMSFactory
package jms;
import javax.jms.TopicConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSFactory {
	private static ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
	
	public static TopicConnectionFactory getActiveMQConnectionFactory(){
		return activeMQConnectionFactory;
	}
}

jms.JMSMessageActor
package jms;
import javax.jms.*;
public abstract class JMSMessageActor {
	protected String name=null;
	protected String defaultQueueName="defalut-queue";
	protected String defaultTopicName="defalut-topic";
	protected final int  DESTIONATION_TYPE_TOPIC=1;
	protected final int DESTIONATION_TYPE_QUEUE=2;
	public JMSMessageActor(String name) {
		this.name = name;
	}
	public abstract Destination  getDestination();
	public Destination createDefaultDestination(int type){
		Destination dest=null;
		switch(type){
			case DESTIONATION_TYPE_TOPIC:
				dest=new Topic(){
					@Override
					public String getTopicName() throws JMSException {
						return defaultTopicName;
					}};break;
			case DESTIONATION_TYPE_QUEUE:
				dest=new Queue(){
					@Override
					public String getQueueName() throws JMSException {
						return defaultQueueName;
					}};
				break;
		}
		return dest;
	}
}

jms.JMSMessageConsumer
package jms;
import javax.jms.*;
public abstract class JMSMessageConsumer extends JMSMessageActor implements Runnable,MessageListener{
	public JMSMessageConsumer(String name){
		super(name);
	}
	@Override
	public void onMessage(Message message) {
		synchronized(JMSMessageConsumer.class){
			System.out.println("##### consumer "+ name +" receive message. #####");
			System.out.println(JMSUtil.formatMessage(message));
		}
	}
	@Override
	public void run() {
		try{			
			// get topic connect factory
			ConnectionFactory factory = JMSFactory.getActiveMQConnectionFactory();
			// create connection
			Connection connection = factory.createConnection();
			// create unique client id for the connection
			connection.setClientID("consumer_connection_"+name);
			// if the connection start method is not invoked , the consumer may be not receive the message
			connection.start();
			// create session
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination=getDestination();
			// if the destination is an instance of Queue , 
			// it will receive the message from the queue,
			// in other words the message can be consumed one time by one consumer 
			// and the message is durable.
			// if the destination is an instance of Topic , 
			// the subscribers of the Topic can receive the message,
			// but the message is non durable.
			MessageConsumer consumer  =session.createConsumer(destination,null,true);
			// if the destination is an instance of Topic, 
			// specify the clientID of the connection
			// and create MessageConsumer like this,
			// the subscribers of the Topic can receive the message
			// and the message is durable.
			//consumer =session.createDurableSubscriber((Topic)destination, "durable topic", null, true);
			consumer.setMessageListener(this);
		}catch(Exception e){
			throw new RuntimeException(e);
		}
	}
}

jms.JMSMessageProducer
package jms;
import javax.jms.*;
public abstract class JMSMessageProducer extends JMSMessageActor implements Runnable{
	public JMSMessageProducer(String name){
		super(name);
	}
	@Override
	public void run() {
		try{
			// get topic connect factory
			ConnectionFactory factory = JMSFactory.getActiveMQConnectionFactory(); 
			// create connection
			Connection connection = factory.createConnection();
			// create session
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination=getDestination();
			// create message producer
			MessageProducer producer = session.createProducer(destination);
			// create message
			MapMessage mapMessage=session.createMapMessage();
			mapMessage.setObjectProperty("lock", "key");
			mapMessage.setObjectProperty("vegetables", "greens");
			mapMessage.setObjectProperty("fruit", "apple");
			mapMessage.setObjectProperty("meat", "pork");
			
			// producer.setTimeToLive(1000);// set the message expiration time
			
			producer.send(mapMessage);// send message
			connection.close();
			System.out.println(name+" send message success!");
		}catch(Exception e){
			throw new RuntimeException(e);
		}
	}
}

jms.JMSMessageQueueReceiver
package jms;
import javax.jms.Destination;
public class JMSMessageQueueReceiver extends JMSMessageConsumer {
	public JMSMessageQueueReceiver(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_QUEUE);
	}
}

jms.JMSMessageQueueSender
package jms;
import javax.jms.Destination;
public class JMSMessageQueueSender extends JMSMessageProducer {
	public JMSMessageQueueSender(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_QUEUE);
	}
}

jms.JMSMessageTopicPublisher
package jms;
import javax.jms.Destination;
public class JMSMessageTopicPublisher extends JMSMessageProducer {
	public JMSMessageTopicPublisher(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_TOPIC);
	}
}

jms.JMSMessageTopicSubscriber
package jms;
import javax.jms.Destination;
public class JMSMessageTopicSubscriber extends JMSMessageConsumer {
	public JMSMessageTopicSubscriber(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_TOPIC);
	}
}

jms.JMSUtil
package jms;
import java.text.SimpleDateFormat;
import java.util.*;
import javax.jms.*;
public class JMSUtil {
	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	@SuppressWarnings("unchecked")
	public static String formatMessage(Message message) {
		StringBuffer result=new StringBuffer();
		try {
			String correlationId=message.getJMSCorrelationID();
			int deliveryMode=message.getJMSDeliveryMode();
			long expiration=message.getJMSExpiration();
			String messageId=message.getJMSMessageID();
			int priority=message.getJMSPriority();
			long timestamp=message.getJMSTimestamp();
			String type=message.getJMSType();
			result.append("##### message property #####\n")
				  .append("correlationId : "+correlationId+"\n")
				  .append("deliveryMode : "+deliveryMode+"\n")
				  .append("expiration : "+expiration+"\n")
				  .append("messageId : "+messageId+"\n")
				  .append("priority : "+priority+"\n")
				  .append("timestamp : "+sdf.format(new Date(timestamp))+"\n")
				  .append("type : "+type+"\n");
			Enumeration<String> names=message.getPropertyNames();
			result.append("##### message content #####\n");
			while(names.hasMoreElements()){
				String name=names.nextElement();
				String value=message.getStringProperty(name);
				result.append(name +" : "+value+"\n");
			}
		} catch (JMSException e) {
			throw new RuntimeException(e);
		}
		return result.toString();
	}
}

jms.Main
package jms;
public class Main {
	public static void main(String[] args) throws InterruptedException {
		sendMesasge();
		Thread.sleep(2000);
		createMessageReceiver();
		
		Thread.sleep(3000);
		System.exit(0);
	}
	/* create message receiver */
	private static void createMessageReceiver(){
		// create queue message receiver
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageQueueReceiver("queue_receiver_"+i));
			t.start();
		}
		// create topic message subscriber
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageTopicSubscriber("topic_subscriber_"+i));
			t.start();
		}
	}
	/* send message */
	private static void sendMesasge(){
		// create queue message sender
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageQueueSender("queue_sender_"+i));
			t.start();
		}
		// create topic publisher
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageTopicPublisher("topic_publisher_"+i));
			t.start();
		}
	}
}

猜你喜欢

转载自antlove.iteye.com/blog/1756213
jms