特点
1.通过一个queque(队列)的通道传递。
2.队列可以被多个消费者申请监听,但是只有一个获取消息,获取后,消息会从队列去除。
3.消息是有顺序的(先进先出),但是设置优先级的除外。
4.消费者和发送者的无偶性,两者之间的先后运行顺序没有关系。
在监听模式(实现onMessage)是异步的,而调用receive方法则是同步,会占用本线程的资源。
General API | Point-to-point API |
ConnectionFactory | QueueConnectionFactory |
Destination | Queue |
Connection | QueueConnection |
Session | QueueSession |
MessageConsumer | QueueSender |
MessageProducer | QueueReceiver |
以下例子(来源Java Message Service_2nd),是一个关于借贷的例子
package lyx.money; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.StringTokenizer; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; public class QBorrower { private QueueConnection qConnect = null; private QueueSession qSession = null; private Queue responseQ = null; private Queue requestQ = null; //这里也是初始化 public QBorrower(String queuecf, String requestQueue, String responseQueue) { try { // Connect to the provider and get the JMS connection Context ctx = new InitialContext(); QueueConnectionFactory qFactory = (QueueConnectionFactory) ctx .lookup(queuecf); qConnect = qFactory.createQueueConnection(); // Create the JMS Session qSession = qConnect.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // Lookup the request and response queues requestQ = (Queue) ctx.lookup(requestQueue); responseQ = (Queue) ctx.lookup(responseQueue); // Now that setup is complete, start the Connection //如果只是发送消息,而不需要接收消息,这部其实可以省略的,但是这种情况很少。 qConnect.start(); } catch (JMSException jmse) { jmse.printStackTrace(); System.exit(1); } catch (NamingException jne) { jne.printStackTrace(); System.exit(1); } } private void sendLoanRequest(double salary, double loanAmt) { try { // Create JMS message MapMessage msg = qSession.createMapMessage(); msg.setDouble("Salary", salary); msg.setDouble("LoanAmount", loanAmt); //这里有设置,当接收者要回复的时候,需要给哪个目标回复。这里使用的是另外一个通道,是因为接收者在接收的时候,没有进行过滤,会接收任何消息,使用另外一个通道是防止回复信息被其他接收者给接收掉。 msg.setJMSReplyTo(responseQ); // Create the sender and send the message //关于这里为何发送者和接收者都使用同一个session,而不是不同的session,其实可以这个例子的想象业务是发送一个贷款请求,系统需要等待有人处理申请,并等待处理结果,因此可以认为是同步的业务,这种情况使用同步会比异步更加合理。 QueueSender qSender = qSession.createSender(requestQ); qSender.send(msg); // Wait to see if the loan request was accepted or declined //这里有一个过滤器,这种过滤器是为了在获取贷款处理结果的时候,只获取自己的结果而不是别人的。 String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'"; QueueReceiver qReceiver = qSession .createReceiver(responseQ, filter); TextMessage tmsg = (TextMessage) qReceiver.receive(3000000); if (tmsg == null) { System.out.println("QLender not responding"); } else { System.out.println("Loan request was " + tmsg.getText()); } } catch (JMSException jmse) { jmse.printStackTrace(); System.exit(1); } } private void exit() { try { qConnect.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } System.exit(0); } //初始化的问题,不许多加说明 public static void main(String argv[]) { String queuecf = null; String requestq = null; String responseq = null; if (argv.length == 3) { queuecf = argv[0]; requestq = argv[1]; responseq = argv[2]; } else { System.out.println("Invalid arguments. Should be: "); System.out .println("java QBorrower factory requestQueue responseQueue"); System.exit(0); } QBorrower borrower = new QBorrower(queuecf, requestq, responseq); try { // Read all standard input and send it as a message BufferedReader stdin = new BufferedReader(new InputStreamReader( System.in)); System.out.println("QBorrower Application Started"); System.out.println("Press enter to quit application"); System.out.println("Enter: Salary, Loan_Amount"); System.out.println("\ne.g. 50000, 120000"); while (true) { System.out.print("> "); String loanRequest = stdin.readLine(); if (loanRequest == null || loanRequest.trim().length() <= 0) { borrower.exit(); } // Parse the deal description StringTokenizer st = new StringTokenizer(loanRequest, ","); double salary = Double.valueOf(st.nextToken().trim()) .doubleValue(); double loanAmt = Double.valueOf(st.nextToken().trim()) .doubleValue(); borrower.sendLoanRequest(salary, loanAmt); } } catch (IOException ioe) { ioe.printStackTrace(); } } }
jndi.properties
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url=tcp://localhost:61616 java.naming.security.principal=system java.naming.security.credentials=manager connectionFactoryNames=lyxcf topic.topic1=jms.topic1 queue.qs=jms.qs queue.qr=jms.qr
通过connection的ConnectionMetaData metadata = qConnect.getMetaData();放方法,可以获取到数据源,里面包含了一些信息。
System.out.println("JMS Version: " + metadata.getJMSMajorVersion() + "." + metadata.getJMSMinorVersion()); System.out.println("JMS Provider: " + metadata.getJMSProviderName()); System.out.println("JMSX Properties Supported: "); Enumeration e = metadata.getJMSXPropertyNames(); while (e.hasMoreElements()) { System.out.println(" " + e.nextElement()); }
另外message必须通过session创建,以便完成对应的初始化,而不是通过new创建,new创建出来的对象,确实很多信息。
package lyx.money; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; public class QLender implements MessageListener { private QueueConnection qConnect = null; private QueueSession qSession = null; private Queue requestQ = null; public QLender(String queuecf, String requestQueue) { try { // Connect to the provider and get the JMS connection Context ctx = new InitialContext(); QueueConnectionFactory qFactory = (QueueConnectionFactory) ctx .lookup(queuecf); qConnect = qFactory.createQueueConnection(); // Create the JMS Session qSession = qConnect.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // Lookup the request queue requestQ = (Queue) ctx.lookup(requestQueue); // Now that setup is complete, start the Connection qConnect.start(); // Create the message listener QueueReceiver qReceiver = qSession.createReceiver(requestQ); qReceiver.setMessageListener(this); System.out.println("Waiting for loan requests..."); } catch (JMSException jmse) { jmse.printStackTrace(); System.exit(1); } catch (NamingException jne) { jne.printStackTrace(); System.exit(1); } } //使用监听模式 public void onMessage(Message message) { try { boolean accepted = false; // Get the data from the message MapMessage msg = (MapMessage)message; double salary = msg.getDouble("Salary"); double loanAmt = msg.getDouble("LoanAmount"); // Determine whether to accept or decline the loan if (loanAmt < 200000) { accepted = (salary / loanAmt) > .25; } else { accepted = (salary / loanAmt) > .33; } System.out.println("" + "Percent = " + (salary / loanAmt) + ", loan is " + (accepted ? "Accepted!" : "Declined")); // Send the results back to the borrower TextMessage tmsg = qSession.createTextMessage(); tmsg.setText(accepted ? "Accepted!" : "Declined"); //为了和消息发送者保持逻辑一致,(这里有一定耦合度了) tmsg.setJMSCorrelationID(message.getJMSMessageID()); // Create the sender and send the message //在进行回复的时候,使用发送者指定的通道。 QueueSender qSender = qSession.createSender((Queue)message.getJMSReplyTo()); qSender.send(tmsg); System.out.println("\nWaiting for loan requests..."); } catch (JMSException jmse) { jmse.printStackTrace(); System.exit(1); } catch (Exception jmse) { jmse.printStackTrace(); System.exit(1); } } private void exit() { try { qConnect.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } System.exit(0); } public static void main(String argv[]) { String queuecf = null; String requestq = null; if (argv.length == 2) { queuecf = argv[0]; requestq = argv[1]; } else { System.out.println("Invalid arguments. Should be: "); System.out.println("java QLender factory request_queue"); System.exit(0); } QLender lender = new QLender(queuecf, requestq); try { // Run until enter is pressed BufferedReader stdin = new BufferedReader(new InputStreamReader( System.in)); System.out.println("QLender application started"); System.out.println("Press enter to quit application"); stdin.readLine(); lender.exit(); } catch (IOException ioe) { ioe.printStackTrace(); } } }
QueueBrowser
通过它可以访问队列中的快照
QueueSession session = connection.createQueueSession (false, Session.AUTO_ACKNOWLEDGE); QueueBrowser browser = session.createBrowser(queue); Enumeration e = browser.getEnumeration(); while (e.hasMoreElements()) { TextMessage msg = (TextMessage)e.nextElement(); System.out.println("Browsing: " + msg.getText()); } browser.close();
动态队列
假设一种情况,有1000个发送者,都要使用不同的队列,并且队列存活期很短,那么可能想到是每个队列命名XXX1-XXXN这种方式,其实还有更好的方式
QueueSession.createTemporaryQueue()
这样创建的队列是临时的,并且只是针对一个connection而存在的。