RabbitMQ(六)谈一谈RabbitMQ的消息成功确认机制
在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?
- 事务机制
- 发布确认机制
事务机制
-
AMQP协议提供的一种保证消息成功投递的方式,通过信道开启 transactional 模式
-
并利用信道 的三个方法来实现以事务方式 发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递
- channel.txSelect(): 开启事务
- channel.txCommit() :提交事务
- channel.txRollback() :回滚事务
-
Spring已经对上面三个方法进行了封装,所以只能使用原始的代码玩
生产者
public class Sender {
public static void main(String[] args) {
//获得连接
Connection connection = ConnectionUtil.getConnection();
try {
//在连接中创建通道
Channel channel = connection.createChannel();
//声明路由
/**
* String var1: 路由名
* String var2: 路由类型
* topic : 模糊匹配的定向分发
*/
channel.exchangeDeclare("test_transaction","topic");
//开启事务
channel.txSelect();
try {
/**
* 发送消息有四个参数
* String var1: (路由)交换机名称,当前是简单模式,也就是P2P模式,无交换机,所以名称为""
* String var2: 目标队列名称
* BasicProperties var3: 设置消息的属性(没有属性则为null)
* byte[] var4: 消息内容(只接受字节数组)
*/
channel.basicPublish("test_transaction", "product.price", null, "商品 1 降价".getBytes());
System.out.println(1 / 0);
channel.basicPublish("test_transaction", "product.price", null, "商品 2 降价".getBytes());
//提交事务
channel.txCommit();
System.out.println("生产者 : 消息已发送!");
} catch (IOException e) {
System.out.println("消息全部撤销!");
e.printStackTrace();
//回滚事务
channel.txRollback();
} finally {
channel.close();
//释放资源
ConnectionUtil.close(connection);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者
public class Recer {
public static void main(String[] args) {
//获得连接
Connection connection = ConnectionUtil.getConnection();
try {
//获得通道(信道)
Channel channel = connection.createChannel();
//声明消息队列
//queueDeclare : 此方法有双重作用,如果队列不存在则创建队列,若队列存在则获取
channel.queueDeclare("test_transaction_queue", false, false, false, null);
//绑定路由(关注)
/**
* String var1: 队列名
* String var2: 路由名
* String var3:
* 匹配用户开头的所有操作
*/
channel.queueBind("test_transaction_queue","test_transaction","product.#");
//从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 交付处理,重写方法
* @param consumerTag 收件人信息
* @param envelope 信封,包裹上的快递标签
* @param properties 协议配置
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//将消息字节构建成字符串
//boby就是在队列中获取的消息
String string = new String(body);
System.out.println("消费者 1 : " + string);
}
};
//监听队列
/**
* String var1: 队列名称
* boolean var2: 自动消息确认
* Consumer var3: 从管道中获取信息
*/
channel.basicConsume("test_transaction_queue",true,consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
Confirm发布确认机制
- RabbitMQ为了保证消息的成功投递,采用通过AMQP协议层面为我们提供事务机制的方案,但是采用事务会大大降低消息的吞吐量
- 我本机SSD硬盘测试结果10w条消息未开启事务,大约8s发送完毕;而开启了事务后,需要将近310s,差了30多倍。
Using standard AMQP 0-9-1, the only way to guarantee that a message isn’t lost is by using transactions – make the channel transactional then for each message or set of messages publish, commit. In this case, transactions are nnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced. It mimics the consumer acknowledgements mechanism already present in the protocol.
关键性译文:开启事务性能最大损失超过250倍
- 那么有没有更加高效的解决方式呢?答案就是采用Confirm模式。
- 事务效率为什么会这么低呢?试想一下:10条消息,前9条成功,如果第10条失败,那么9条消息要全部撤销回滚。太太太浪费
- 而confirm模式则采用补发第10条的措施来完成10条消息的送达
在spring中应用
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx
https://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--配置RabbitMQ连接-->
<!--publisher-confirms="true" : 开启生产者确认机制-->
<rabbit:connection-factory id="connectionFactory" host="106.75.245.83" port="5672" username="admin" password="admin"
virtual-host="/szx" publisher-confirms="true"></rabbit:connection-factory>
<!--配置队列-->
<rabbit:queue name="test_spring_queue_1"></rabbit:queue>
<!--配置RabbitMQAdmin,主要用于在Java代码中对队列的管理,用来创建,绑定,删除队列与交换机,发送消息等操作-->
<rabbit:admin connection-factory="connectionFactory"></rabbit:admin>
<!--配置路由,topic类型-->
<rabbit:topic-exchange name="spring_topic_exchange">
<!--绑定队列-->
<rabbit:bindings>
<rabbit:binding pattern="msg.#" queue="test_spring_queue_1"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--配置json转换工具,将消息转换为json-->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>
<!--配置RabbitMQ的模板-->
<!--confirm-callback="messsageConfirm" : 添加确认回调处理类-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange"
message-converter="jsonMessageConverter" confirm-callback="messsageConfirm"></rabbit:template>
<!--配置确认机制的处理类-->
<bean id="messsageConfirm" class="com.szx.confirm.MesssageConfirm"></bean>
</beans>
public class MesssageConfirm implements RabbitTemplate.ConfirmCallback {
/**
* @param correlationData 消息相关的数据对象(封装了消息的唯一 ID )
* @param b 消息是否确认成功
* @param s 异常信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b){
System.out.println("消息确认成功!");
} else {
System.out.println("消息确认失败!");
System.out.println(s);
// 如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用消息补发
// 采用递归(固定次数,不可无限)或 redis+定时任务
}
}
}
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rabbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l%m%n
log4j.rootLogger=debug, stdout,file
public class Sender {
public static void main(String[] args) {
//获取spring的配置文件
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
//与容器中获取RabbitMQ模板类
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
//发送消息
Map<String,String> map = new HashMap<String, String>();
map.put("name","小星");
map.put("email","[email protected]");
/**
* String routingKey: 指定路由键
* Object object: 发送的数据
*/
rabbitTemplate.convertAndSend("msg.user",map);
System.out.println("消息已发送!");
context.close();
}
}