这个示例演示 使用jms 进行事务(rollback) 和消息重发(redelivery)
JMS报文 在事务中抛出一个异常
示例中报文随即被异常回滚策略组件( Rollback Exception Strategy Component)捕获,回滚并且重新发送相同的报文;回滚四次之后,报文自动发送成功。
- 右键运行示例
启动activMQ,访问http://localhost:8161/admin/send.jsp
发送测试数据- 运行过程中事务会回滚四次,在第五次运行后运行成功。
- 访问http://localhost:8161/admin/topics.jsp
会发现 Messages Enqueued列会增加响应的消息数目
<?xml version="1.0" encoding="UTF-8"?>
<mule version="EE-3.7.3" xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:https="http://www.mulesoft.org/schema/mule/https" xmlns:jbossts="http://www.mulesoft.org/schema/mule/jbossts"
xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:ss="http://www.springframework.org/schema/security" xmlns:test="http://www.mulesoft.org/schema/mule/test"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns:xm="http://www.mulesoft.org/schema/mule/xml" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/jbossts http://www.mulesoft.org/schema/mule/jbossts/current/mule-jbossts.xsd http://www.mulesoft.org/schema/mule/https http://www.mulesoft.org/schema/mule/https/current/mule-https.xsd http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd">
<spring:beans>
<!-- ActiveMQ -->
<spring:bean class="org.apache.activemq.RedeliveryPolicy"
id="redeliveryPolicy">
<spring:property name="maximumRedeliveries" value="5" />
<spring:property name="initialRedeliveryDelay" value="5000" />
<spring:property name="redeliveryDelay" value="2000" />
<spring:property name="useExponentialBackOff" value="false" />
<spring:property name="backOffMultiplier" value="2" />
</spring:bean>
<spring:bean class="org.apache.activemq.ActiveMQConnectionFactory"
id="amqFactory" lazy-init="true">
<!-- 代理url -->
<spring:property name="brokerURL"
value="failover:(tcp://localhost:61616)" />
<spring:property name="redeliveryPolicy" ref="redeliveryPolicy" />
</spring:bean>
</spring:beans>
<!-- JMS连接器,用来连接JMS服务器 1.连接工厂 -->
<jms:activemq-connector connectionFactory-ref="amqFactory"
doc:name="Active MQ" maxRedelivery="5" name="jmsConnector"
persistentDelivery="true" specification="1.1" validateConnections="true" />
<flow doc:name="JMSRedeliver" name="JMSRedeliver">
<!-- 定义JMS入口端点从JMS服务器读取消息 -->
<jms:inbound-endpoint connector-ref="jmsConnector"
doc:name="Listen to JMS queue" queue="in">
<!-- queue 点到点模式时的JMS队列名称,不能和topic同时 使用 -->
<jms:transaction action="ALWAYS_BEGIN" />
</jms:inbound-endpoint>
<logger doc:name="log message" level="INFO" message="********: #[message.inboundProperties]"/>
<logger doc:name="log message" level="INFO" message="********: #[message.inboundProperties['JMSXDeliveryCount']]"/>
<choice doc:name="Choice">
<when expression="#[message.inboundProperties['JMSXDeliveryCount']==5]">
<logger doc:name="No exception thrown" level="INFO"
message="Transaction without errors" />
</when>
<otherwise>
<test:component doc:name="Throws custom exception"
exceptionToThrow="org.exceptions.MyException" throwException="true" />
</otherwise>
</choice>
<!-- 定义JMS出口端点向JMS服务器发送消息 -->
<jms:outbound-endpoint connector-ref="jmsConnector"
doc:name="Send to JMS topic" topic="topic1">
<!-- 发布/订阅模式时JMS的消息主题名称,不能和 queue同时使用 -->
<jms:transaction action="ALWAYS_JOIN" />
</jms:outbound-endpoint>
<choice-exception-strategy doc:name="Choice Exception Strategy">
<catch-exception-strategy doc:name="Catch Exception Strategy"
when="#[!exception.causedBy(org.exceptions.MyException)]">
<logger doc:name="Commit transaction" level="INFO"
message="Entered catch exception strategy. The transaction is commited." />
</catch-exception-strategy>
<rollback-exception-strategy doc:name="Rollback Exception Strategy"
when="#[exception.causedBy(org.exceptions.MyException)]">
<logger doc:name="Rollback transaction" level="INFO"
message="Entered rollback exception strategy. The message rolls back to its original state for reprocessing." />
</rollback-exception-strategy>
</choice-exception-strategy>
</flow>
</mule>