创建一个名为CLUSTER1的群集,其中包含QMGR1,QMGR2和QMGRI三个队列管理器,其中在QMGR1和QMGR2上创建INPUTQ的接收队列,并使得它们在群集中共享。
配置步骤如下(以下各步中给出相应的MQSC命令):
1) 分别创建三个队列管理器。
2) 设置QMGR1和QMGR2为群集的两个完全仓储库,在两个队列管理上执行MQSC命令:ALTER QMGR REPOS(CLUSTER1) MQSC命令
·在QMGR1上执行MQSC命令:
DEFINE CHANNEL(TO.QMGR1) CHLTYPE(CLUSRCVR) TRPTYPE(TCP) CONNAME('9.68.58.228(1414)') CLUSTER(CLUSTER1) DEFINE CHANNEL(TO.QMGR2) CHLTYPE(CLUSSDR) TRPTYPE(TCP) CONNAME('9.68.58.228(1415)') CLUSTER(CLUSTER1) |
· 在QMGR2上执行MQSC命令:
DEFINE CHANNEL(TO.QMGR2) CHLTYPE(CLUSRCVR) TRPTYPE(TCP) CONNAME('9.68.58.228(1415)') CLUSTER(CLUSTER1) DEFINE CHANNEL(TO.QMGR1) CHLTYPE(CLUSSDR) TRPTYPE(TCP) CONNAME('9.68.58.228(1414)') CLUSTER(CLUSTER1) |
·在QMGRI上执行MQSC命令:
DEFINE CHANNEL(TO.QMGRI) CHLTYPE(CLUSRCVR) TRPTYPE(TCP) CONNAME('9.68.58.228(1416)') CLUSTER(CLUSTER1) DEFINE CHANNEL(TO.QMGR1) CHLTYPE(CLUSSDR) TRPTYPE(TCP) CONNAME('9.68.58.228(1414)') CLUSTER(CLUSTER1) |
3) 定义三个队列管理器的群集发送和群集接收通道。
4) 在QMGR1和QMGR2上定义群集共享队列INPUTQ,在QMGR1和QMGR2上,分别执行MQSC命令:DEFINE QLOCAL(INPUTQ) CLUSTER(CLUSTER1)这时,在QMGRI上可以看到这两个被共享出来的队列。
5) 验证配置成功在QMGRI上,利用MQ第一步中提供的例子程序API Exerciser,向INPUTQ发送消息,注意,在打开该队列的时候,一定要选择MQOO_BIND_NOT_FIXED选项。
public class MQMessageSender { public static void main(String[] args) { try { // set mq environment MQEnvironment.hostname = "0.0.0.0"; //MQEnvironment.port = 1414; MQEnvironment.channel = "CHL.SVRCONN"; // define mq manager MQQueueManager mqmanager = new MQQueueManager("TEST"); int openOptions = MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING; // connect to queue MQQueue mq = mqmanager.accessQueue("QUE_IN", openOptions); // define message MQMessage putmessage = new MQMessage(); String msgtext = "TEST2"; /*String filename = "C:\\Test.txt"; BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(filename))); StringWriter sw = new StringWriter(); String temp = ""; while ((temp = in.readLine()) != null) { sw.write(temp); sw.write('\n'); } msgtext = sw.toString();*/ //putmessage.writeString(msgtext); putmessage.writeString(msgtext); // send message MQPutMessageOptions pmo = new MQPutMessageOptions(); mq.put(putmessage, pmo); // disconnect mq manager mqmanager.disconnect(); System.out.println("put message ok"); } catch (Exception e) { System.out.println(e.toString()); } } }
public class MQMessageReceiver { private String hostname; private int port; private String channel; private String queueManager; private String queue; private int depth; private static final Log log = LogFactoryImpl.getLog(MQMessageReceiver.class.getName()); public static void main(String[] args) { log.info("-----[Start]-----"); ClassPathXmlApplicationContext xmlContext = new ClassPathXmlApplicationContext("MQSetting.xml"); MQMessageReceiver receiver = (MQMessageReceiver) xmlContext.getBean("mqMessageReceiver"); MQQueueManager mqmanager = null; try { // Set mq environment MQEnvironment.hostname = receiver.getHostname(); MQEnvironment.port = receiver.getPort(); MQEnvironment.channel = receiver.getChannel(); // Define mq manager mqmanager = new MQQueueManager(receiver.getQueueManager()); int openOptions = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING; // Connect to queue MQQueue mq = mqmanager.accessQueue(receiver.getQueue(), openOptions); // Define message for (int i = 0; i < receiver.getDepth(); i++) { MQMessage retrievedMessage = new MQMessage(); retrievedMessage.resizeBuffer(14436); // Receive message MQGetMessageOptions pmo = new MQGetMessageOptions(); pmo.options = MQC.MQGMO_ACCEPT_TRUNCATED_MSG | MQC.MQGMO_SYNCPOINT | MQC.MQGMO_WAIT; // pmo.waitInterval = 3000; mq.get(retrievedMessage, pmo); String msgText = retrievedMessage.readString(retrievedMessage.getMessageLength()); log.info("Message id: " + retrievedMessage.messageId.toString()); log.info("Message: " + msgText); } // Disconnect mq manager mqmanager.disconnect(); log.info("-----[Completed]-----"); } catch (Exception e) { log.info(e.toString()); try { mqmanager.disconnect(); } catch (Exception e1) { ; } } }