java代码:
package com.yanzhi.system; import com.yanzhi.tools.C; import com.yanzhi.tools.Global; import com.yanzhi.tools.StringUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.pool.PooledConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.connection.SingleConnectionFactory; import javax.jms.*; import java.sql.Timestamp; import java.util.List; /** * Created by xiaoyunlian on 2016/2/14. * activemq消费者实例 */ public class ConsumerApp implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerApp.class); public static void start(){ try { //连接 ActiveMQConnectionFactory targetConnectionFactory = new ActiveMQConnectionFactory(); targetConnectionFactory.setBrokerURL(Global.getBrokerURL()); targetConnectionFactory.setTrustAllPackages(true); SingleConnectionFactory connectionFactory = new SingleConnectionFactory(); connectionFactory.setTargetConnectionFactory(targetConnectionFactory);//根据applicationContext.xml文件配置连接 Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //队列 ActiveMQQueue yanzhiQueueDestination = (ActiveMQQueue) session.createQueue("testQueue1,testQueue2,testQueue3"); ConsumerApp consumerMessageListener = new ConsumerApp(); MessageConsumer consumer = session.createConsumer(yanzhiQueueDestination); consumer.setMessageListener(consumerMessageListener); System.err.println("~~~~~~~~~~~~~~~~~~~~~~ active MQ 消费者监听器 启动成功~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); }catch (Exception e){ e.printStackTrace(); } } @Override public void onMessage(Message msg) { try { Destination dest = msg.getJMSDestination(); if (msg instanceof TextMessage) { ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage)msg; String name = activeMQTextMessage.getDestination().getPhysicalName(); TextMessage message = (TextMessage) msg; System.err.println("队列:"+name+"接收者接到一个String消息:"+message.getText()); } else if (msg instanceof MapMessage) { ActiveMQMapMessage activeMQMapMessage = (ActiveMQMapMessage) msg; String destinationName = activeMQMapMessage.getDestination().getPhysicalName(); } else if (msg instanceof StreamMessage) { StreamMessage message = (StreamMessage) msg; System.out.println("------Received StreamMessage------"); System.out.println(message.readString()); System.out.println(message.readBoolean()); System.out.println(message.readLong()); } else if (msg instanceof ObjectMessage) { ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage)msg; String destinationName = activeMQObjectMessage.getDestination().getPhysicalName(); if (Global.getPkQuene().equals(destinationName)){ ObjectMessage objectMessage = (ObjectMessage) msg; Object object = objectMessage.getObject(); if (object instanceof List){ // do something } } if (Global.getFaceValueReportQuene().equals(destinationName)){ // do something } if (Global.getRegUserQuene().equals(destinationName)){ // do something } } else if (msg instanceof BytesMessage) { System.out.println("------Received BytesMessage------"); BytesMessage message = (BytesMessage) msg; byte[] byteContent = new byte[1024]; int length = -1; StringBuffer content = new StringBuffer(); while ((length = message.readBytes(byteContent)) != -1) { content.append(new String(byteContent, 0, length)); } System.out.println(content.toString()); } else { System.out.println(msg); } } catch (JMSException e) { LOGGER.error("error {}", e); } } }
调用上面的start()方法,即可启动消息队列的消费者。