开头篇
本人发表的博客,虽然是入门级文章,欢迎大家的探讨、指教和学习,但是版权所有。虽然欢迎转载,但是对于山寨本人深恶痛绝。之前先发表的文章,就被某个网站山寨,文章大部分内容和图等都一样,但是被改的很乱,错别字也很多,直接无法阅读。本人直接把山寨的网站列出来:http://www.bwxxkj.com/a/jishuzhongxin/xingyeyingyong/2013/0123/162277.html
转载学习没关系,但是山寨了,实在对不起原著。本人实在不能容忍山寨。
第一篇主要讨论了IBM MQ的安装以及调试样例遇到的几个问题。
这一篇文章主要针对点对点模式来学习一下。学习的样例来源为IBM MQ的sample中的例子。
点对点模式下有一个消息生产者,有消息消费者。一条消息只能消费一次。
JmsProducer 是消息的生产者。
package test; // SCCSID "@(#) MQMBID sn=p000-L120604 su=_H-IvIK4nEeGko6IWl3MDhA pn=MQJavaSamples/jms/JmsProducer.java" /* * <copyright * notice="lm-source-program" * pids="5724-H72,5655-R36,5655-L82,5724-L26," * years="2008,2012" * crc="279216363" > * Licensed Materials - Property of IBM * * 5724-H72,5655-R36,5655-L82,5724-L26, * * (C) Copyright IBM Corp. 2008, 2012 All Rights Reserved. * * US Government Users Restricted Rights - Use, duplication or * disclosure restricted by GSA ADP Schedule Contract with * IBM Corp. * </copyright> */ import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import com.ibm.msg.client.jms.JmsConnectionFactory; import com.ibm.msg.client.jms.JmsFactoryFactory; import com.ibm.msg.client.wmq.WMQConstants; /** * A JMS producer (sender or publisher) application that sends a simple message to the named * destination (queue or topic). * * Notes: * * API type: IBM JMS API (v1.1, unified domain) * * Messaging domain: Point-to-point or Publish-Subscribe * * Provider type: WebSphere MQ * * Connection mode: Client connection * * JNDI in use: No * * Usage: * * JmsProducer -m queueManagerName -d destinationName [-h host -p port -l channel] * * for example: * * JmsProducer -m QM1 -d Q1 * * JmsProducer -m QM1 -d topic://foo -h localhost -p 1414 */ public class JmsProducer { private static String host = "localhost"; private static int port = 1414; private static String channel = "SYSTEM.DEF.SVRCONN"; private static String queueManagerName = null; private static String destinationName = null; //这里用来判断是不是点对点模式 private static boolean isTopic = false; // System exit status value (assume unset value to be 1) private static int status = 1; /** * Main method * * @param args */ public static void main(String[] args) { // Parse the arguments //队列管理器名称如果出现下划线的话会提示( 'MQCC_FAILED' ),原因为 '2058' ( 'MQRC_Q_MGR_NAME_ERROR' )。 args = new String[]{"-m","QMTest", "-d","testQueue"}; // args = new String[]{"-m","aaaa", "-d","topic://zhuti","-h","localhost","-p","14114"}; parseArgs(args); // Variables Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; try { // Create a connection factory //JmsFactoryFactory用来根据指定类型来创建connection factory和destination objects JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); //根据工厂工厂创建连接工厂类的实例 JmsConnectionFactory cf = ff.createConnectionFactory(); // Set the properties //封装连接信息 //使用JmsPropertyContext接口中的方法封装信息 cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, host); cf.setIntProperty(WMQConstants.WMQ_PORT, port); //SYSTEM.DEF.SVRCONN是通道的连接类型 cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel); //WMQ_CM_CLIENT的含义,什么时候用,目前还不清楚 cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT); cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManagerName); // Create JMS objects connection = cf.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); if (isTopic) { destination = session.createTopic(destinationName); } else { destination = session.createQueue(destinationName); } producer = session.createProducer(destination); long uniqueNumber = System.currentTimeMillis() % 1000; TextMessage message = session.createTextMessage("JmsProducer: Your lucky number today is " + uniqueNumber); // Start the connection connection.start(); // And, send the message producer.send(message); System.out.println("Sent message:\n" + message); recordSuccess(); } catch (JMSException jmsex) { recordFailure(jmsex); } finally { if (producer != null) { try { producer.close(); } catch (JMSException jmsex) { System.out.println("Producer could not be closed."); recordFailure(jmsex); } } if (session != null) { try { session.close(); } catch (JMSException jmsex) { System.out.println("Session could not be closed."); recordFailure(jmsex); } } if (connection != null) { try { connection.close(); } catch (JMSException jmsex) { System.out.println("Connection could not be closed."); recordFailure(jmsex); } } } System.exit(status); return; } // end main() /** * Process a JMSException and any associated inner exceptions. * * @param jmsex */ private static void processJMSException(JMSException jmsex) { System.out.println(jmsex); Throwable innerException = jmsex.getLinkedException(); if (innerException != null) { System.out.println("Inner exception(s):"); } while (innerException != null) { System.out.println(innerException); innerException = innerException.getCause(); } return; } /** * Record this run as successful. */ private static void recordSuccess() { System.out.println("SUCCESS"); status = 0; return; } /** * Record this run as failure. * * @param ex */ private static void recordFailure(Exception ex) { if (ex != null) { if (ex instanceof JMSException) { processJMSException((JMSException) ex); } else { System.out.println(ex); } } System.out.println("FAILURE"); status = -1; return; } /** * Parse user supplied arguments. * * @param args */ private static void parseArgs(String[] args) { try { int length = args.length; if (length == 0) { throw new IllegalArgumentException("No arguments! Mandatory arguments must be specified."); } if ((length % 2) != 0) { throw new IllegalArgumentException("Incorrect number of arguments!"); } int i = 0; while (i < length) { if ((args[i]).charAt(0) != '-') { throw new IllegalArgumentException("Expected a '-' character next: " + args[i]); } char opt = (args[i]).toLowerCase().charAt(1); switch (opt) { case 'h' : host = args[++i]; break; case 'p' : port = Integer.parseInt(args[++i]); break; case 'l' : channel = args[++i]; break; case 'm' : queueManagerName = args[++i]; break; case 'd' : destinationName = args[++i]; break; default : { throw new IllegalArgumentException("Unknown argument: " + opt); } } ++i; } if (queueManagerName == null) { throw new IllegalArgumentException("A queueManager name must be specified."); } if (destinationName == null) { throw new IllegalArgumentException("A destination name must be specified."); } // Whether the destination is a queue or a topic. Apply a simple check. if (destinationName.startsWith("topic://")) { isTopic = true; } else { // Otherwise, let's assume it is a queue. isTopic = false; } } catch (Exception e) { System.out.println(e.getMessage()); printUsage(); System.exit(-1); } return; } /** * Display usage help. */ private static void printUsage() { System.out.println("\nUsage:"); System.out .println("JmsProducer -m queueManagerName -d destinationName [-h host -p port -l channel]"); return; } } // end class
注意:如果还抛( 'MQCC_FAILED' ),原因为 '2035' ( 'MQRC_NOT_AUTHORIZED' )这个异常,请看上一篇文章。
下面JmsConsumer。JmsConsumer的连接方式和JmsProducer一样,不再赘述。
JmsProducer和JmsConsumer都需要指出主机名,端口,通道,队列名称和队列。
package test; // SCCSID "@(#) MQMBID sn=p000-L120604 su=_H-IvIK4nEeGko6IWl3MDhA pn=MQJavaSamples/jms/JmsConsumer.java" /* * <copyright * notice="lm-source-program" * pids="5724-H72,5655-R36,5655-L82,5724-L26," * years="2008,2012" * crc="39457954" > * Licensed Materials - Property of IBM * * 5724-H72,5655-R36,5655-L82,5724-L26, * * (C) Copyright IBM Corp. 2008, 2012 All Rights Reserved. * * US Government Users Restricted Rights - Use, duplication or * disclosure restricted by GSA ADP Schedule Contract with * IBM Corp. * </copyright> */ import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import com.ibm.msg.client.jms.JmsConnectionFactory; import com.ibm.msg.client.jms.JmsFactoryFactory; import com.ibm.msg.client.wmq.WMQConstants; /** * A JMS consumer (receiver or subscriber) application that receives a message from the named * destination (queue or topic). * * Tip: A subscriber application must be started before the publisher application. * * Notes: * * API type: IBM JMS API (v1.1, unified domain) * * Messaging domain: Point-to-point or Publish-Subscribe * * Provider type: WebSphere MQ * * Connection mode: Client connection * * JNDI in use: No * * Usage: * * JmsConsumer -m queueManagerName -d destinationName [-h host -p port -l channel] * * for example: * * JmsConsumer -m QM1 -d Q1 * * JmsConsumer -m QM1 -d topic://foo -h localhost -p 1414 */ public class JmsConsumer { private static String host = "localhost"; private static int port = 1414; private static String channel = "SYSTEM.DEF.SVRCONN"; private static String queueManagerName = null; private static String destinationName = null; private static boolean isTopic = false; private static int timeout = 15000; // in ms or 15 seconds // System exit status value (assume unset value to be 1) private static int status = 1; /** * Main method * * @param args */ public static void main(String[] args) { // args = new String[]{"-m","aaaa", "-d","aa"}; // args = new String[]{"-m","aaaa", "-d","topic://zhuti","-h","localhost","-p","1414"}; // Parse the arguments args = new String[]{"-m","QMTest", "-d","testQueue"}; parseArgs(args); // Variables Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try { // Create a connection factory JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); JmsConnectionFactory cf = ff.createConnectionFactory(); // Set the properties cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, host); cf.setIntProperty(WMQConstants.WMQ_PORT, port); cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel); cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT); cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManagerName); // Create JMS objects connection = cf.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); if (isTopic) { destination = session.createTopic(destinationName); } else { destination = session.createQueue(destinationName); } consumer = session.createConsumer(destination); // Start the connection connection.start(); // And, receive the message //在指定的超市时间内接收下一条消息 Message message = consumer.receive(timeout); if (message != null) { // System.err.println("Received message:\n" + message); System.out.println("Received message:\n" + message); } else { System.out.println("No message received!\n"); recordFailure(null); } recordSuccess(); } catch (JMSException jmsex) { recordFailure(jmsex); } finally { if (consumer != null) { try { consumer.close(); } catch (JMSException jmsex) { System.out.println("Consumer could not be closed."); recordFailure(jmsex); } } if (session != null) { try { session.close(); } catch (JMSException jmsex) { System.out.println("Session could not be closed."); recordFailure(jmsex); } } if (connection != null) { try { connection.close(); } catch (JMSException jmsex) { System.out.println("Connection could not be closed."); recordFailure(jmsex); } } } System.exit(status); return; } // end main() /** * Process a JMSException and any associated inner exceptions. * * @param jmsex */ private static void processJMSException(JMSException jmsex) { System.out.println(jmsex); Throwable innerException = jmsex.getLinkedException(); if (innerException != null) { System.out.println("Inner exception(s):"); } while (innerException != null) { System.out.println(innerException); innerException = innerException.getCause(); } return; } /** * Record this run as successful. */ private static void recordSuccess() { System.out.println("SUCCESS"); status = 0; return; } /** * Record this run as failure. * * @param ex */ private static void recordFailure(Exception ex) { if (ex != null) { if (ex instanceof JMSException) { processJMSException((JMSException) ex); } else { System.out.println(ex); } } System.out.println("FAILURE"); status = -1; return; } /** * Parse user supplied arguments. * * @param args */ private static void parseArgs(String[] args) { try { int length = args.length; if (length == 0) { throw new IllegalArgumentException("No arguments! Mandatory arguments must be specified."); } if ((length % 2) != 0) { throw new IllegalArgumentException("Incorrect number of arguments!"); } int i = 0; while (i < length) { if ((args[i]).charAt(0) != '-') { throw new IllegalArgumentException("Expected a '-' character next: " + args[i]); } char opt = (args[i]).toLowerCase().charAt(1); switch (opt) { case 'h' : host = args[++i]; break; case 'p' : port = Integer.parseInt(args[++i]); break; case 'l' : channel = args[++i]; break; case 'm' : queueManagerName = args[++i]; break; case 'd' : destinationName = args[++i]; break; default : { throw new IllegalArgumentException("Unknown argument: " + opt); } } ++i; } if (queueManagerName == null) { throw new IllegalArgumentException("A queueManager name must be specified."); } if (destinationName == null) { throw new IllegalArgumentException("A destination name must be specified."); } // Whether the destination is a queue or a topic. Apply a simple check. if (destinationName.startsWith("topic://")) { isTopic = true; } else { // Otherwise, let's assume it is a queue. isTopic = false; } } catch (Exception e) { System.out.println(e.getMessage()); printUsage(); System.exit(-1); } return; } /** * Display usage help. */ private static void printUsage() { System.out.println("\nUsage:"); System.out .println("JmsConsumer -m queueManagerName -d destinationName [-h host -p port -l channel]"); return; } } // end class
生产者需要由session创建message producer,消费者需要由session创建message consumer。
JMSBrowser和JMSCustomer的区别在于前者只能浏览消息,后者是消费消息。前者能够浏览所有的消息,而后者一次消费一条消息。
package test; // SCCSID "@(#) MQMBID sn=p000-L120604 su=_H-IvIK4nEeGko6IWl3MDhA pn=MQJavaSamples/jms/JmsBrowser.java" /* * <copyright * notice="lm-source-program" * pids="5724-H72,5655-R36,5655-L82,5724-L26," * years="2008,2012" * crc="3912865343" > * Licensed Materials - Property of IBM * * 5724-H72,5655-R36,5655-L82,5724-L26, * * (C) Copyright IBM Corp. 2008, 2012 All Rights Reserved. * * US Government Users Restricted Rights - Use, duplication or * disclosure restricted by GSA ADP Schedule Contract with * IBM Corp. * </copyright> */ import java.util.Enumeration; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; import com.ibm.msg.client.jms.JmsConnectionFactory; import com.ibm.msg.client.jms.JmsFactoryFactory; import com.ibm.msg.client.wmq.WMQConstants; /** * A JMS queue browser application that looks at all available messages on the named queue, without * removing them, in the order they would be received by a consumer application. * * Tip: A browser is not applicable for topics. * * Notes: * * API type: IBM JMS API (v1.1, unified domain) * * Messaging domain: Point-to-point * * Provider type: WebSphere MQ * * Connection mode: Client connection * * JNDI in use: No * * Usage: * * JmsBrowser -m queueManagerName -d queueName [-h host -p port -l channel] * * for example: * * JmsBrowser -m QM1 -d Q1 * * JmsBrowser -m QM1 -d Q1 -h localhost -p 1414 */ public class JmsBrowser { private static String host = "localhost"; private static int port = 1414; private static String channel = "SYSTEM.DEF.SVRCONN"; private static String queueManagerName = null; private static String queueName = null; // System exit status value (assume unset value to be 1) private static int status = 1; /** * Main method * * @param args */ public static void main(String[] args) { // Parse the arguments args = new String[]{"-m","QMTest", "-d","testQueue"}; parseArgs(args); // Variables Connection connection = null; Session session = null; Queue destination = null; QueueBrowser browser = null; try { // Create a connection factory JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); JmsConnectionFactory cf = ff.createConnectionFactory(); // Set the properties cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, host); cf.setIntProperty(WMQConstants.WMQ_PORT, port); cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel); cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT); cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManagerName); // Create JMS objects connection = cf.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(queueName); browser = session.createBrowser(destination); // Start the connection connection.start(); // And, browse the message //浏览多条消息,并放到枚举类型里 Enumeration messages = browser.getEnumeration(); int count = 0; Message current; System.out.println("Browse starts"); while (messages.hasMoreElements()) { current = (Message) messages.nextElement(); System.out.println("\nMessage " + ++count + ":\n"); System.out.println(current); } System.out.println("\nNo more messages\n"); recordSuccess(); } catch (JMSException jmsex) { recordFailure(jmsex); } finally { if (browser != null) { try { browser.close(); } catch (JMSException jmsex) { System.out.println("Browser could not be closed."); recordFailure(jmsex); } } if (session != null) { try { session.close(); } catch (JMSException jmsex) { System.out.println("Session could not be closed."); recordFailure(jmsex); } } if (connection != null) { try { connection.close(); } catch (JMSException jmsex) { System.out.println("Connection could not be closed."); recordFailure(jmsex); } } } System.exit(status); return; } // end main() /** * Process a JMSException and any associated inner exceptions. * * @param jmsex */ private static void processJMSException(JMSException jmsex) { System.out.println(jmsex); Throwable innerException = jmsex.getLinkedException(); if (innerException != null) { System.out.println("Inner exception(s):"); } while (innerException != null) { System.out.println(innerException); innerException = innerException.getCause(); } return; } /** * Record this run as successful. */ private static void recordSuccess() { System.out.println("SUCCESS"); status = 0; return; } /** * Record this run as failure. * * @param ex */ private static void recordFailure(Exception ex) { if (ex != null) { if (ex instanceof JMSException) { processJMSException((JMSException) ex); } else { System.out.println(ex); } } System.out.println("FAILURE"); status = -1; return; } /** * Parse user supplied arguments. * * @param args */ private static void parseArgs(String[] args) { try { int length = args.length; if (length == 0) { throw new IllegalArgumentException("No arguments! Mandatory arguments must be specified."); } if ((length % 2) != 0) { throw new IllegalArgumentException("Incorrect number of arguments!"); } int i = 0; while (i < length) { if ((args[i]).charAt(0) != '-') { throw new IllegalArgumentException("Expected a '-' character next: " + args[i]); } char opt = (args[i]).toLowerCase().charAt(1); switch (opt) { case 'h' : host = args[++i]; break; case 'p' : port = Integer.parseInt(args[++i]); break; case 'l' : channel = args[++i]; break; case 'm' : queueManagerName = args[++i]; break; case 'd' : queueName = args[++i]; break; default : { throw new IllegalArgumentException("Unknown argument: " + opt); } } ++i; } if (queueManagerName == null) { throw new IllegalArgumentException("A queueManager name must be specified."); } if (queueName == null) { throw new IllegalArgumentException("A queue name must be specified."); } } catch (Exception e) { System.out.println(e.getMessage()); printUsage(); System.exit(-1); } return; } /** * Display usage help. */ private static void printUsage() { System.out.println("\nUsage:"); System.out.println("JmsBrowser -m queueManagerName -d queueName [-h host -p port -l channel]"); return; } } // end class
运行之前当然需要先创建好IBM MQ的服务端,并且创建好队列管理器和队列。