生产者配置:
<?
xml version
="1.0"
encoding
="UTF-8"
?>
<beans
xmlns
="http://www.springframework.org/schema/beans"
xmlns:
xsi
="http://www.w3.org/2001/XMLSchema-instance"
xmlns:
rabbit
="http://www.springframework.org/schema/rabbit"
xsi
:schemaLocation
="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"
>
<
rabbit
:connection-factory
id
="connectionFactory"
host
="10.153.25.15"
username
="insurance"
password
="insurance"
port
="5672"
/>
<
rabbit
:admin
connection-factory
="connectionFactory"
/>
<
rabbit
:queue
id
="queue_insurance"
durable
="true"
auto-delete
="false"
exclusive
="false"
name
="queue_insurance"
>正常队列当中指向死信
<
rabbit
:queue-arguments>
<entry
key
="x-message-ttl"
>设置超时
<value
type
="java.lang.Long"
>
30000
</value>
</entry>
<entry
key
="x-dead-letter-exchange"
>指定交换机
<value
type
="java.lang.String"
>
alter
</value>
</entry>
</
rabbit
:queue-arguments>
</
rabbit
:queue>
<
rabbit
:queue
id
="alter_queue"
durable
="true"
auto-delete
="false"
exclusive
="false"
name
="alter_queue"
/>死信队列
<
rabbit
:direct-exchange
name
="alter"
durable
="true"
auto-delete
="false"
id
="alter"
>死信交换机
<
rabbit
:bindings>
<
rabbit
:binding
queue
="alter_queue"
key
="queue_key_insurance"
/>
</
rabbit
:bindings>
</
rabbit
:direct-exchange>
<
rabbit
:direct-exchange
name
="exchange_insurance"
durable
="true"
auto-delete
="false"
id
="exchange_insurance"
>正常交换机
<
rabbit
:bindings>
<
rabbit
:binding
queue
="queue_insurance"
key
="queue_key_insurance"
/>
</
rabbit
:bindings>
</
rabbit
:direct-exchange>
<!-- (5)客户端投递消息到exchange。 -->
<
rabbit
:template
id
="amqpTemplate"
exchange
="exchange_insurance"
connection-factory
="connectionFactory"
/>
</beans>
消费者配置:
<?
xml version
="1.0"
encoding
="UTF-8"
?>
<beans
xmlns
="http://www.springframework.org/schema/beans"
xmlns:
xsi
="http://www.w3.org/2001/XMLSchema-instance"
xmlns:
rabbit
="http://www.springframework.org/schema/rabbit"
xsi
:schemaLocation
="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"
>
<!-- 连接服务配置 -->
<
rabbit
:connection-factory
id
="connectionFactory2"
host
="10.153.25.15"
username
="insurance"
password
="insurance"
port
="5672"
/>
<
rabbit
:admin
connection-factory
="connectionFactory2"
/>
<!-- queue 队列声明 -->
<!-- queue 队列声明 name 队里的额name 是关联生产表和消费表的为唯一线索 -->
<
rabbit
:queue
id
="queue_insurance"
name
="queue_insurance"
>
<
rabbit
:queue-arguments
value-type
="java.lang.Long"
>
<entry
key
="x-message-ttl"
value
="30000"
/>
</
rabbit
:queue-arguments>
</
rabbit
:queue>
<!-- 定义消费者监听器 -->
<!-- 创建一个bean实例,bean实例中声明处理请求的类 -->
<bean
id
="consumerLitener2"
class
="com.insurance.mq.CommissionController"
></bean>
<
rabbit
:listener-container
connection-factory
="connectionFactory2"
acknowledge
="auto"
concurrency
="8"
>
<!-- queues属性从那个队列中接收消息,ref属性是当存在消息是使用哪个类去处理 -->
<
rabbit
:listener
queues
="queue_insurance"
ref
="consumerLitener2"
/>
</
rabbit
:listener-container>
</beans>
rabbitMq三种模式
一. Fanout Exchange 广播
所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。
Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。
所以,Fanout Exchange 转发消息是最快的。
二. Direct Exchange 点对点
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。
Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
三. Topic Exchange 模糊匹配
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,
Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”。
所以,Topic Exchange 使用非常灵活。
mq也支持重发机制:
rabbitmq的消息确认机制分两部分
一部分是生产端,一部分是消费端
生产端
有两种选择,transaction 和 confirm。
confirm 的性能要好于transaction
- //transaction 机制
- channel.txSelect();
- String msg = "msg test !!!";
- for(int i=0;i<10000;i++){
- msg = i+" : msg test !!!";
- channel.basicPublish(EXCHAGE, QUEUE_NAME,null,msg.getBytes());
- System.out.println("publish msg "+msg);
- if (i>0&&i%100==0){
- //批量提交
- channel.txCommit();
- }
- } // 若出现异常 进行 channel.txRollback(),对相应批次的msg进行重发或记录
- channel.txCommit();
延迟队列
AMQP和RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。
但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:
RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
- A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
- B: 对消息进行单独设置,每条消息TTL可以不同。
如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。
- x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
- x-dead-letter-routing-key:指定routing-key发送
队列出现dead letter的情况有:
- 消息或者队列的TTL过期
- 队列达到最大长度
- 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。
设置方法:
- Per-Message TTL(官方文档)
java client发送一条只能驻留60秒的消息到队列:
byte[] messageBodyBytes =
"Hello, world!"
.getBytes
()
;
AMQP
.BasicProperties
properties = new AMQP
.BasicProperties
()
;
properties
.setExpiration
(
"60000"
)
;
channel
.basicPublish
(
"my-exchange"
,
"routing-key"
, properties, messageBodyBytes)
;
- Queue TTL
创建一个队列,队列的消息过期时间为30分钟
java client方式:Map<
String
,
Object
> args =
new
HashMap<
String
,
Object
>();args.put(
"x-expires"
,
1800000
);channel.queueDeclare(
"myqueue"
,
false
,
false
,
false
, args);rabbitmqctl命令方式rabbitmqctl set_policy expiry
".*"
'{"expires":1800000}' --apply-to queues
rabbitmqctl (Windows) rabbitmqctl set_policy expiry
".*"
"{""expires"":1800000}"
--apply-
to
queues
11
- Dead Letter Exchanges设置方法(官方文档)
channel.exchangeDeclare(
"some.exchange.name"
,
"direct"
);Map<
String
,
Object
> args =
new
HashMap<
String
,
Object
>();args.put(
"x-dead-letter-exchange"
,
"some.exchange.name"
);args.put(
"x-dead-letter-routing-key"
,
"some-routing-key"
);channel.queueDeclare(
"myqueue"
,
false
,
false
,
false
, args);