mule使用jms 进行事务回滚(rollback) 和消息重发(redelivery)

这个示例演示 使用jms 进行事务(rollback) 和消息重发(redelivery)
JMS报文 在事务中抛出一个异常
示例中报文随即被异常回滚策略组件( Rollback Exception Strategy Component)捕获,回滚并且重新发送相同的报文;回滚四次之后,报文自动发送成功。
这里写图片描述

  1. 右键运行示例
  2. 启动activMQ,访问http://localhost:8161/admin/send.jsp
    发送测试数据

    这里写图片描述

  3. 运行过程中事务会回滚四次,在第五次运行后运行成功。
  4. 访问http://localhost:8161/admin/topics.jsp
    会发现 Messages Enqueued列会增加响应的消息数目
    这里写图片描述

<?xml version="1.0" encoding="UTF-8"?>
<mule version="EE-3.7.3" xmlns="http://www.mulesoft.org/schema/mule/core"
    xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:https="http://www.mulesoft.org/schema/mule/https" xmlns:jbossts="http://www.mulesoft.org/schema/mule/jbossts"
    xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:spring="http://www.springframework.org/schema/beans"
    xmlns:ss="http://www.springframework.org/schema/security" xmlns:test="http://www.mulesoft.org/schema/mule/test"
    xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
    xmlns:xm="http://www.mulesoft.org/schema/mule/xml" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/jbossts http://www.mulesoft.org/schema/mule/jbossts/current/mule-jbossts.xsd http://www.mulesoft.org/schema/mule/https http://www.mulesoft.org/schema/mule/https/current/mule-https.xsd http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd">

    <spring:beans>
        <!-- ActiveMQ -->
        <spring:bean class="org.apache.activemq.RedeliveryPolicy"
            id="redeliveryPolicy">
            <spring:property name="maximumRedeliveries" value="5" />
            <spring:property name="initialRedeliveryDelay" value="5000" />
            <spring:property name="redeliveryDelay" value="2000" />
            <spring:property name="useExponentialBackOff" value="false" />
            <spring:property name="backOffMultiplier" value="2" />
        </spring:bean>
        <spring:bean class="org.apache.activemq.ActiveMQConnectionFactory"
            id="amqFactory" lazy-init="true">
            <!-- 代理url -->
            <spring:property name="brokerURL"
                value="failover:(tcp://localhost:61616)" />
            <spring:property name="redeliveryPolicy" ref="redeliveryPolicy" />
        </spring:bean>
    </spring:beans>


    <!-- JMS连接器,用来连接JMS服务器 1.连接工厂 -->
    <jms:activemq-connector connectionFactory-ref="amqFactory" 
        doc:name="Active MQ" maxRedelivery="5" name="jmsConnector"
        persistentDelivery="true" specification="1.1" validateConnections="true" />

    <flow doc:name="JMSRedeliver" name="JMSRedeliver">
        <!-- 定义JMS入口端点从JMS服务器读取消息 -->
        <jms:inbound-endpoint  connector-ref="jmsConnector"
            doc:name="Listen to JMS queue" queue="in">
            <!-- queue 点到点模式时的JMS队列名称,不能和topic同时 使用 -->
            <jms:transaction action="ALWAYS_BEGIN" />
        </jms:inbound-endpoint>
     <logger doc:name="log message" level="INFO" message="********: #[message.inboundProperties]"/>
     <logger doc:name="log message" level="INFO" message="********: #[message.inboundProperties['JMSXDeliveryCount']]"/>
        <choice doc:name="Choice">
            <when expression="#[message.inboundProperties['JMSXDeliveryCount']==5]">
                <logger doc:name="No exception thrown" level="INFO"
                    message="Transaction without errors" />
            </when>
            <otherwise>
                <test:component doc:name="Throws custom exception"
                    exceptionToThrow="org.exceptions.MyException" throwException="true" />
            </otherwise>
        </choice>

        <!-- 定义JMS出口端点向JMS服务器发送消息 -->
        <jms:outbound-endpoint connector-ref="jmsConnector"
            doc:name="Send to JMS topic" topic="topic1">
            <!-- 发布/订阅模式时JMS的消息主题名称,不能和 queue同时使用 -->
            <jms:transaction action="ALWAYS_JOIN" />
        </jms:outbound-endpoint>

        <choice-exception-strategy doc:name="Choice Exception Strategy">
            <catch-exception-strategy doc:name="Catch Exception Strategy"
                when="#[!exception.causedBy(org.exceptions.MyException)]">
                <logger doc:name="Commit transaction" level="INFO"
                    message="Entered catch exception strategy. The transaction is commited." />
            </catch-exception-strategy>

            <rollback-exception-strategy doc:name="Rollback Exception Strategy"
                when="#[exception.causedBy(org.exceptions.MyException)]">
                <logger doc:name="Rollback transaction" level="INFO"
                    message="Entered rollback exception strategy. The message rolls back to its original state for reprocessing." />
            </rollback-exception-strategy>
        </choice-exception-strategy>
    </flow>
</mule>

这里写图片描述

猜你喜欢

转载自blog.csdn.net/ke_weiquan/article/details/51887563