1.为什么要用事务?
为什么commit之后,不会有持久的消息重新传送呢?
原因在于commit操作会自动将为签收确认的消息进行签收确认,如果是当前接收但未签收确认的消息,都会被确认处理。因而在commit之后不会有持久化的消息出现。
2.activeMQ支持的事务:
ActiveMQ有支持两种事务,
- JMS transactions - the commit() / rollback() methods on a Session (which is like doing commit() / rollback() on a JDBC connection)
- XA Transactions - where the XASession acts as an XAResource by communicating with the Message Broker, rather like a JDBC Connection takes place in an XA transaction by communicating with the database.
在支持事务的session中,producer发送message时在message中带有transaction ID。broker收到message后判断是否有transaction ID,如果有就把message保存在transaction store中,等待commit或者rollback消息。所以ActiveMq的事务是针对broker而不是producer的,不管session是否commit,broker都会收到message。
如果producer发送模式选择了persistent,那么message过期后会进入死亡队列。在message进入死亡队列之前,ActiveMQ会删除message中的transaction ID,这样过期的message就不在事务中了,不会保存在transaction store中,会直接进入死亡队列。具体删除transaction ID的地方是在:org.apache.activemq.util.BrokerSupport的doResend,将transaction ID保存在了originalTransactionID中,删除了transaction ID。
在下面的介绍中我用的是JMS transactions.
JMS transactions事务的配置:
①建立JMS事务,并引入关联链接事务。
②.设置一个jmsTamplat,并关联监听容器。
- <!-- jms事务 -->
- <bean id="jmsTransactionManager"
- class="org.springframework.jms.connection.JmsTransactionManager">
- <property name="connectionFactory" ref="connectionFactory" />
- </bean>
- <tx:annotation-driven transaction-manager="jmsTransactionManager" />
- <!-- 消息监听容器 消息接收监听器用于异步接收消息 -->
- <bean id="jmsContainerOne" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="destination" ref="destinationOne" />
- <property name="messageListener" ref="consumerMessageListenerOfOne" />
- <!-- <property name="sessionTransacted" value="true"/> --> <!-- 给listener添加事务,只负责接收消息的回滚 (有了transactionManager就不用这个了,这个功能不全) 设置后好像并没有起作用 不知道为啥 -->
- <!-- <property name="transactionManager" ref="jtaTransactionManager"/> --> <!-- 接收消息和数据库访问处于同一事务中 jta -->
- <property name="transactionManager" ref="jmsTransactionManager" /> <!--jms事务 -->
- <property name="sessionAcknowledgeMode" value="4"></property> <!-- 应答模式是 INDIVIDUAL_ACKNOWLEDGE http://blog.csdn.net/yueding_h/article/details/54944254 -->
- <!-- ActiveMQ:设置多个并行的消费者 -->
- <property name="concurrency" value="2-3" />
- </bean>
- session.rollback();
- public void onMessage(Message message, Session session) {
- TextMessage textMsg = (TextMessage) message;
- try {
- System.out.println(1);
- String endStr = textMsg.getText();
- Integer endInt = Integer.parseInt(endStr);
- System.out.println("消息:==="+endInt);
- //只要被确认后 就会出队,接受失败没有确认成功,会在原队列里面
- textMsg.acknowledge();
- } catch (Exception e) {
- try {
- session.rollback();
- System.out.println("测试回滚");
- e.printStackTrace();
- System.out.println("异常信息是:===:" + e.getMessage());
- }
- }
待我详细了解JtaTransactionManager 后,再说吧。