相信经过了解,基本使用activemq后,大家一定知道队列里面有ACK模式,他的类型有以下几种
1、AUTO_ACKNOWLEDGE = 1 自动确认
2、CLIENT_ACKNOWLEDGE = 2 客户端手动确认
3、DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
4、SESSION_TRANSACTED = 0 事务提交并确认
5、INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认(自定义ACK_MODE)
使用方式一般有
1、使用连接创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
2、spring中配置
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="defaultDestination" ref="destination"></property>
<property name="messageConverter" ref="jmsMessageConverter"></property>
<property name="sessionAcknowledgeMode" value="2" />
</bean>
注意:当我们只配置了这些,以为在消费者出获取消息调用message.acknowledge();方法就能控制消息的消费,很明显,不行!
查阅资料你会发现,当你获取消息的时候,默认的获取方法里面没有走你的这些配置直接就消费了消息,所以就达不到我们想要的效果,手动控制消费。所以,我们需要重写一个方法手动获取,但是带上自己的参数。
- /**
- * 接收消息
- * @param session
- * @param consumer
- * @param autoAcknowledge 是否开启手动消费
- * @return
- * @throws JMSException
- */
- protected Message doReceive(Session session, MessageConsumer consumer,Boolean autoAcknowledge) throws JMSException {
- try {
- this.consumer = consumer;
- //Use transaction timeout (if available).
- long timeout = 1000;
- Message message = doReceive(consumer, timeout);
- if (session.getTransacted()) {
- // Commit necessary - but avoid commit call within a JTA transaction.
- // 如果开启了jta事物,那么不会进行提交,jta事物会直接覆盖掉session事物
- if (isSessionLocallyTransacted(session)) {
- // Transacted session created by this template -> commit.
- //创建事物回话
- JmsUtils.commitIfNecessary(session);
- }
- }
- //autoAcknowledge如果为真,不进行自动确认
- else if (isClientAcknowledge(session) && !autoAcknowledge) {
- // Manually acknowledge message, if any.
- if (message != null) {
- message.acknowledge();
- }
- }
- return message;
- }
- finally {
- consumer = null;
- }
- }
- /**
- * 由于上面的doReceive(Session session, MessageConsumer consumer,Boolean autoAcknowledge)需要调用这个方法,
- * 而在父类里面这个方法是私有的,所以直接拷贝下来了
- * @param consumer
- * @param timeout
- * @return
- * @throws JMSException
- */
- private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException {
- if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {
- return consumer.receiveNoWait();
- }
- else if (timeout > 0) {
- return consumer.receive(timeout);
- }
- else {
- return consumer.receive();
- }
- }
原文地址:http://blog.csdn.net/yueding_h/article/details/54944254