ActiveMQ有支持两种事务,
- JMS transactions - the commit() / rollback() methods on a Session (which is like doing commit() / rollback() on a JDBC connection)
- XA Transactions - where the XASession acts as an XAResource by communicating with the Message Broker, rather like a JDBC Connection takes place in an XA transaction by communicating with the database.
在支持事务的session中,producer发送message时在message中带有transaction ID。broker收到message后判断是否有transaction ID,如果有就把message保存在transaction store中,等待commit或者rollback消息。所以ActiveMq的事务是针对broker而不是producer的,不管session是否commit,broker都会收到message。
如果producer发送模式选择了persistent,那么message过期后会进入死亡队列。在message进入死亡队列之前,ActiveMQ会删除message中的transaction ID,这样过期的message就不在事务中了,不会保存在transaction store中,会直接进入死亡队列。具体删除transaction ID的地方是在
org.apache.activemq.util.BrokerSupport的doResend,将transaction ID保存在了originalTransactionID中,删除了transaction ID
public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception { Message message = copy ? originalMessage.copy() : originalMessage; message.setOriginalDestination(message.getDestination()); message.setOriginalTransactionId(message.getTransactionId()); message.setDestination(deadLetterDestination); message.setTransactionId(null); message.setMemoryUsage(null); message.setRedeliveryCounter(0); boolean originalFlowControl = context.isProducerFlowControl(); try { context.setProducerFlowControl(false); ProducerInfo info = new ProducerInfo(); ProducerState state = new ProducerState(info); ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); producerExchange.setProducerState(state); producerExchange.setMutable(true); producerExchange.setConnectionContext(context); context.getBroker().send(producerExchange, message); } finally { context.setProducerFlowControl(originalFlowControl); }