session.AUTO_ACKNOWLEDGE
session的提交和不提交现象
package test.mq.helloword; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.ConsumerBrokerExchange; public class Receiver { public static void main(String[] args) throws JMSException { ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory( "tcp://localhost:61616" ); Connection connection=ConnectionFactory.createConnection(); connection.start(); Enumeration names=connection.getMetaData().getJMSXPropertyNames(); while(names.hasMoreElements()){ String name=(String) names.nextElement(); System.out.println("jmsx name==="+name); } Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination=session.createQueue("my_queue"); MessageConsumer Consumer=session.createConsumer(destination); int i=0; while(i<3){ MapMessage msg=(MapMessage) Consumer.receive(); //当session不提交时, 信息会不确认签收,信息就一直可以接受到。 session.commit(); System.out.println("接收信息:"+msg.getString("message"+i)+",property=="+msg.getStringProperty("extra"+i)); i++; } session.close(); connection.close(); } }
Session.CLIENT_ACKNOWLEDGE的使用
信息发送者
package test.mq.helloword; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { public static void main(String[] args) throws JMSException, InterruptedException { ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory( "tcp://localhost:61616" ); Connection connection=ConnectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE); Destination destination=session.createQueue("my_queue"); MessageProducer Producer=session.createProducer(destination); for(int i=0;i<3;i++){ MapMessage Message=session.createMapMessage(); Message.setStringProperty("extra"+i, "okok"); Message.setString("message"+i, "my map---->"+i); Producer.send(Message); } session.commit(); session.close(); connection.close(); } }
信息消费者
package test.mq.helloword; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.ConsumerBrokerExchange; public class Receiver { public static void main(String[] args) throws JMSException { ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory( "tcp://localhost:61616" ); Connection connection=ConnectionFactory.createConnection(); connection.start(); Enumeration names=connection.getMetaData().getJMSXPropertyNames(); while(names.hasMoreElements()){ String name=(String) names.nextElement(); System.out.println("jmsx name==="+name); } Session session=connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); Destination destination=session.createQueue("my_queue"); MessageConsumer Consumer=session.createConsumer(destination); int i=0; while(i<3){ MapMessage msg=(MapMessage) Consumer.receive(); if(i==2){ //当 msg.acknowledge()不提交时, 信息会不确认签收,信息就一直可以接受到。 msg.acknowledge(); } System.out.println("接收信息:"+msg.getString("message"+i)+",property=="+msg.getStringProperty("extra"+i)); i++; } session.close(); connection.close(); } }