初学amq,根据网上学习到的一些,整理了一下amq嵌入Java代码的使用,如有错误请指出。
首先需要导入两个jar包:
activemq-all-5.5.1.jar
slf4j-nop-1.4.3.jar
import java.net.URI; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; public class AmqManager { public static BrokerService broker = new BrokerService(); private final static AmqManager amq = new AmqManager(); /** 连接器 */ private static Connection connection = null; /** 按队列名获取session */ private static Map<String, Session> sessionMap = new ConcurrentHashMap<String, Session>(); /** 按队列名称获取生产者对象 */ private static Map<String, MessageProducer> producerMap = new ConcurrentHashMap<String, MessageProducer>(); /** 按队列名称获取消费者对象 */ private static Map<String, MessageConsumer> consumerMap = new ConcurrentHashMap<String, MessageConsumer>(); private AmqManager(){}; public static synchronized AmqManager getAMQ() throws Exception { return amq; } /** * 获取连接器 * @param brokerUri * @param clientID * @return * @throws Exception */ public synchronized Connection initConnection(String brokerUri, String clientID) throws Exception { if (null == connection) { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri); connection = connectionFactory.createConnection(); connection.setClientID(clientID); connection.start(); } return connection; } /** * 初始化生产者 * @param queueName * @param acknowledgeMode * @param deliveryMode * @return * @throws JMSException */ public MessageProducer initProducer(String queueName, int acknowledgeMode, int deliveryMode) throws JMSException { MessageProducer producer = null; if (connection != null) { Session session = connection.createSession(false, acknowledgeMode); Destination destination = session.createQueue(queueName); producer = session.createProducer(destination); producer.setDeliveryMode(deliveryMode); sessionMap.put(queueName, session); producerMap.put(queueName, producer); } return producer; } /** * 初始化消费者 * @param queueName * @param acknowledgeMode * @param deliveryMode * @return * @throws JMSException */ public MessageConsumer initConsumer(String queueName, int acknowledgeMode) throws JMSException { MessageConsumer consumer = null; if (connection != null) { Session session = connection.createSession(false, acknowledgeMode); Destination destination = session.createQueue(queueName); consumer = session.createConsumer(destination); consumerMap.put(queueName, consumer); } return consumer; } /** * 接收消息 * @param consumer * @throws JMSException */ public void getMessage(String queueName) throws JMSException { MessageConsumer consumer = consumerMap.get(queueName); while (true) { TextMessage textMessage = (TextMessage) consumer.receive(100000); if(textMessage != null){ System.out.println("收到消息:" + textMessage.getText()); }else { System.out.println("接收消息异常"); break; } } } /** * 发送消息 * @param queueName * @param message * @throws JMSException */ public void sendMessage(String queueName, String message) throws JMSException { TextMessage msg = sessionMap.get(queueName).createTextMessage(message); //msg.setStringProperty("sqlId", "comstar-market-data-feedhub"); producerMap.get(queueName).send(msg); } /** * 发送消息 * @param queueName * @param message * @param headName * @param headValue * @throws JMSException */ public void sendMessage(String queueName, String message, String headName, String headValue) throws JMSException { TextMessage msg = sessionMap.get(queueName).createTextMessage(message); msg.setStringProperty(headName, headValue); producerMap.get(queueName).send(msg); } /** * 关闭session * @param queueName * @throws JMSException */ public void close(String queueName) throws JMSException { Session session = sessionMap.get(queueName); if (null != session) { session.close(); sessionMap.remove(queueName); destroy(); } } /** * 销毁连接 * note:如果session都没有了,则销毁连接 * @throws JMSException */ private synchronized void destroy() throws JMSException { if (connection != null) { if (0 == sessionMap.size()) { connection.close(); connection = null; } } } public synchronized static void initAMQ() { try { TransportConnector connector = new TransportConnector(); connector.setUri(new URI("tcp://localhost:61616")); broker.addConnector(connector); broker.start(); } catch (JMSException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } public void destoryAMQ() { try { //close(""); broker.stop(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { try { getAMQ(); //BrokerService broker = new BrokerService(); TransportConnector connector = new TransportConnector(); connector.setUri(new URI("tcp://localhost:61616")); broker.addConnector(connector); broker.start(); amq.initConnection("failover://tcp://localhost:61616", "cning"); amq.initProducer("queue.test", Session.AUTO_ACKNOWLEDGE, DeliveryMode.NON_PERSISTENT); amq.initConsumer("queue.test", Session.AUTO_ACKNOWLEDGE); amq.sendMessage("queue.test", "hahahahahahahhahaha"); amq.getMessage("queue.test"); amq.destoryAMQ(); } catch (JMSException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }