环境/工具 | 版本 |
---|---|
RabbitMQ | 3.7.4 |
mule-standalone | 3.9.0 |
Anypoint-Studio | 6.4.0 |
AMQP-Connector(该组件在studio默认需要安装) | 3.7.7 |
这个简单的例子功能:将不同Oracle库中的表数据,从A库A表数据导入B库B表(AB表结构一致),中间简单地配合RabbitMQ使用。
Flow结构图
第一个Flow实现:通过 HTTP GET 请求触发,然后查询A库目标表数据,将其转换为 JSON ,通过AMQP连接器发送到 RabbitMQ 队列中,最后简单提示
第二个Flow实现:通过 AMQP 连接器获取队列的信息,使用多个 transformer 将数据从 byte array -> json ->object 最后插入 B库目标表
写在前面:
* 在 Anypoint-Studio 安装了 AMQP Connector 后,增加了以下 Element
* 本这个例子里面,不需要使用AMQP-0-9 Acnowledge
来确认消息,否则出现报错
org.mule.transport.amqp.internal.endpoint.receiver.MessageReceiverConsumer:
Received shutdown signal for consumer tag:
amq.ctag-y14AdMWmEGA3Jujbk_N14Q, the message receiver will try to restart.
该问题出现的回答
: click here
因此可以看出应该是重复ACK导致问题
XML文档
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:amqp="http://www.mulesoft.org/schema/mule/amqp" xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/amqp http://www.mulesoft.org/schema/mule/amqp/current/mule-amqp.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd">
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
<db:oracle-config name="TestDB" host="{url}" port="1521" instance="orcl" user="user" password="password" doc:name="Oracle Configuration"/>
<amqp:connector name="AMQP_0_9_Connector" validateConnections="true" doc:name="AMQP-0-9 Connector"/>
<db:oracle-config name="LOCAL_DB" host="localhost" port="1521" instance="orcl" user="hr" password="hr" doc:name="Oracle Configuration"/>
<flow name="importdatabyqueueFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/*" allowedMethods="GET" doc:name="HTTP"/>
<db:select config-ref="RMIS_DB" doc:name="Database">
<db:parameterized-query><![CDATA[SELECT * FROM YOUR_TABLE_NAME]]></db:parameterized-query>
</db:select>
<json:object-to-json-transformer doc:name="Object to JSON"/>
<logger message="#['*****send message*****'+payload]" level="INFO" doc:name="Logger"/>
<amqp:outbound-endpoint queueName="db_import" queueDurable="true" responseTimeout="10000" connector-ref="AMQP_0_9_Connector" doc:name="AMQP-0-9"/>
<set-payload value="#['hello world']" doc:name="Set Payload"/>
</flow>
<flow name="importdatabyqueueFlow1">
<amqp:inbound-endpoint queueName="db_import" queueDurable="true" responseTimeout="10000" exchange-pattern="request-response" connector-ref="AMQP_0_9_Connector" doc:name="AMQP-0-9"/>
<byte-array-to-object-transformer doc:name="Byte Array to Object"/>
<json:object-to-json-transformer doc:name="Object to JSON"/>
<json:json-to-object-transformer returnClass="java.util.LinkedList" doc:name="JSON to Object"/>
<custom-transformer class="importdatabyqueue.FilterDataTransformer" doc:name="Java"/>
<db:insert config-ref="LOCAL_DB" bulkMode="true" doc:name="Database">
<db:parameterized-query><![CDATA[INSERT INTO {YOUR_TABLE}(TABLE_FIELD_NAME)VALUES(#[payload.TABLE_FIELD_NAME])]]></db:parameterized-query>
</db:insert>
</flow>
</mule>