java代码
package com.yanzhi.system; import com.yanzhi.test.TestObject; import com.yanzhi.tools.C; import com.yanzhi.tools.Global; import com.yanzhi.tools.StringUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.*; import org.apache.activemq.pool.PooledConnectionFactory; import org.springframework.jms.connection.SingleConnectionFactory; import org.springframework.jms.core.JmsTemplate; import javax.jms.*; import java.io.Serializable; import java.util.Date; /** * Created by xiaoyunlian on 2016/2/24. */ public class MQProducer { public static Connection connection; public Connection getConnection() { if (connection == null) { connection = getConnectionObject(); } return connection; } public static Connection getConnectionObject() { try { //连接 ActiveMQConnectionFactory targetConnectionFactory = new ActiveMQConnectionFactory(); targetConnectionFactory.setBrokerURL(Global.getBrokerURL()); targetConnectionFactory.setTrustAllPackages(true); SingleConnectionFactory connectionFactory = new SingleConnectionFactory(); connectionFactory.setTargetConnectionFactory(targetConnectionFactory);//根据applicationContext.xml文件配置连接 Connection connection = connectionFactory.createConnection(); connection.start(); return connection; } catch (Exception e) { e.printStackTrace(); } return null; } public Session session; public Session getSession() { try { if (session == null) { Connection connection = getConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } return session; } catch (JMSException e) { e.printStackTrace(); } return session; } public ActiveMQQueue getActiveMQQueue() { try { Session session = getSession(); return (ActiveMQQueue) session.createQueue("testQueue1,testQueue2,testQueue3"); } catch (JMSException e) { e.printStackTrace(); } return null; } public ActiveMQDestination getActiveMQDestination(String destinationName) { return getDestination(destinationName, getActiveMQQueue()); } public MessageProducer messageProducer; /** * 获取生产者 * * @return */ public MessageProducer getMessageProducer(String destinationName) { Session session = getSession(); ActiveMQDestination activeMQDestination = getActiveMQDestination(destinationName); try { if (messageProducer == null) { messageProducer = session.createProducer(activeMQDestination); } return messageProducer; } catch (JMSException e) { e.printStackTrace(); } return messageProducer; } /** * 获取某个队列:默认获取临时队列templateQueue * * @return */ public static ActiveMQDestination getDestination(String destinationName, Destination yanzhiQueueDestination) { ActiveMQDestination destination = (ActiveMQDestination) yanzhiQueueDestination; ActiveMQDestination[] destinations = destination.getCompositeDestinations(); if (StringUtils.isBlank(destinationName)) { return null; } ActiveMQDestination mqDestination = null; for (ActiveMQDestination activeMQDestination : destinations) { String name = activeMQDestination.getPhysicalName(); if (destinationName.equals(name)) { mqDestination = activeMQDestination; break; } } return mqDestination; } public static void sendMessage(String msgType, Session session, MessageProducer producer) { try { // 发送文本消息 if (C.ACTIVEMQ_MSG_TYPE_TEXT.equalsIgnoreCase(msgType)) { String textMsg = "~~~~~~~~~~~~~~测试消息 ActiveMQ Text Message!~~~~~~~~~~~~~~" + new Date() + "," + AppicationManager.getServerIP(); TextMessage msg = session.createTextMessage(); msg.setText(textMsg); producer.send(msg); } // 发送Map消息 if (C.ACTIVEMQ_MSG_TYPE_MAP.equalsIgnoreCase(msgType)) { MapMessage msg = session.createMapMessage(); msg.setBoolean("boolean", true); msg.setShort("short", (short) 0); msg.setLong("long", 123456); msg.setString("MapMessage", "ActiveMQ Map Message!"); producer.send(msg); } // 发送流消息 if (C.ACTIVEMQ_MSG_TYPE_STREAM.equalsIgnoreCase(msgType)) { String streamValue = "ActiveMQ stream Message!"; StreamMessage msg = session.createStreamMessage(); msg.writeString(streamValue); msg.writeBoolean(false); msg.writeLong(1234567890); producer.send(msg); } // 发送对象消息 if (C.ACTIVEMQ_MSG_TYPE_OBJECT.equalsIgnoreCase(msgType)) { TestObject object = new TestObject(); object.setName("对象名称"); object.setType(1); object.setFaceValue(45678); ObjectMessage msg = session.createObjectMessage(); msg.setObject(object); producer.send(msg); } // 发送字节消息 if (C.ACTIVEMQ_MSG_TYPE_BYTES.equalsIgnoreCase(msgType)) { String byteValue = "字节消息"; BytesMessage msg = session.createBytesMessage(); msg.writeBytes(byteValue.getBytes()); producer.send(msg); } } catch (Exception e) { e.printStackTrace(); } } /** * 发送对象消息 * * @param session * @param producer * @param object */ public static void sendObjectMessage(Session session, MessageProducer producer, Object object) { try { ObjectMessage msg = session.createObjectMessage(); msg.setObject((Serializable) object); producer.send(msg); } catch (Exception e) { e.printStackTrace(); } } }
测试代码:
在单元测试中放入如下代码:
MQProducer mqProducer = new MQProducer(); MessageProducer producer = mqProducer.getMessageProducer("testQueue1"); MQProducer.sendObjectMessage(mqProducer.getSession(),producer,recordList);
其中,Global.getBrokerURL()的值是:tcp://192.168.199.149:61616?wireFormat.maxInactivityDuration=0&connectionTimeout=0&keepAlive=true