Kafka重试机制实现
这篇文章主要是记录一下,在做头条的红包支付系统的过程中,用kafka做消息队列时,如何设计一个可以重试的消费机制
这个方式已经在线上运行了很长的时间,但是后续总觉得这种实现方式不算很好,所以就在网上找到了一篇文章,也是介绍如何构造一个重试队列,而且正好跟我的想法相同,所以就打算把我自己的想法写下来
背景
今年春节的时候,今日头条旗下的产品进行了分别进行了抢红包、百万英雄等活动,用户在活动中获取到的钱能够体现到各自的银行卡中,其中有一个重要的环节就是如何知道用户在我们部门代付出去的钱是否已经成功或失败。在项目中,我们采用了 回调 的方式进行通知。即收到付款完成的回调通知后,我们会再通过回调的方式通知业务方,由于前期考虑到数据量会很大,特别是大年三十的时候回出现高峰期,所以在整个项目中使用了kafka作为消息队列来使用(其实从我的理解来看,kafka作为消息队列并不是一个完美的解决方案,但是考虑到公司内部kafka比较稳定,而且不想在项目中引入更多的依赖项目,所以选用了kafka)
问题
在使用kafka的时候,由于我们需要通知业务方,所以考虑到业务方可能会出现大问题,所以我们暂定了一个规则,重试三次通知业务方,每次间隔十分钟,如果三次都失败则认为失败,中间任意一次成功都可以认为是成功。这个规则就引入了我们今天要讨论的问题,由于kafka不支持延迟消息,那么怎么保证能够做到间隔十分钟通知一次用户呢?
解决方案
kafka简介
kafka是一个分布式流式平台,它通常被用作构建一个实时的数据流平台,但是得益于topic的持久化,它也被用作消息流存储来处理历史数据,为了改进kafka的扩展能力,kafka持久化一个topic的时候,会用一个或者多个分区(partition)来进行存储。kafka的partition的底层是一个只能增加(append-only)的文件,使用这样一个简单的结构可以极大的增加系统的吞吐能力。
简单的重试机制
最简单的重试机制是我们可以在一个线程中处理,如果没有到需要处理的时候,则进行sleep,伪代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
func consumeMessageWithSimpleRetry() { for { message, ok := <- kafkaConsumer.Message() if !ok { return } processed := false for !processed { err := process(message) if !err { time.Sleep(waitPeriod) processed = true } } } } |
非阻塞的重试逻辑
在类似kafka这种流式系统中,我们不能跳过当前的消息,然后去消费后面的消息,一旦我们移动了消息的指针(kafka中称之为offset)的时候,我们就不能再回去消费之前的消息了(强制移动指针是可取的,在这里不做讨论)。简单来说就是一旦我们消费掉了这条消息,我们就记住了这个指针的位置,由于这种情况的存在,如果当前消息没有消费成功,我们就不能继续消费下一条消息。按照这种逻辑,如果我们消费消息失败,我们就不能移动指针,这是一种不可取的方式,因为消息消息失败的情况很多,我们不能因为这条消息消费失败了,就不再继续消息后续的消息了。一个简单的比如,如果我们的消息在处理的过程中失败了,而我们设定的重试时间时一个小时,按照上面的简单重试机制,我们在接下来的一个小时只能,就不能做其他的事了,这在实时系统中是不可能存在的方案,所以我们需要想办法改进一下我们的实现方案。
下面有两种解决方案,但是本质上其实是相同的,只是实现的逻辑不通,下面简单分析和对比一下实现方式:
使用后台线程来处理重试消息
当我们处理消息失败的时候,我们把消息放到后台线程中去处理,伪代码如下:
1 2 3 4 5 6 7 8 9 10 11 |
func consumeMessageNonBlock() { for { ... err := process(message) if err != nil { go waitAndRetryConsumeMessage(message) } ... } } |
这样做的好处是,我们不需要外加一些依赖来处理这些逻辑,直接新建一个routine
来处理,但是坏处就比较明显,比如,如果在某一段时间内,服务出现不稳定,消息消费可能会出现大面积失败,这个时候,我们就会新建太多的routine
。
使用一个带buffer的队列来处理
带buffer的队列有两种方式:
- 用
go
自带的chan
来做缓冲区,然后用多个routine来消费这个缓冲区,伪代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
retryMessageChan := make(chan RetryMessage, bufferSize) for i:= 0; i < retryParallesim; i++ { go consumeRetryChan() } func consumeMessageNonBlockByChan() { for { ... err := process(message) if err != nil { retryMessageChan <- message } ... } } |
这种方案实现起来简单,而且很高效,可以简单的控制并发度来避免routine
数量太多,但是这种方案也有问题,一是chan
的bufferSize不好控制,如果太大,很容易造成资源浪费,如果比较小,然后偶遇失败高峰期的时候,chan
容易写满,从而造成消费不能正常继续了
- 申请一个新的kafka topic作为重试队列,然后新起任务消费重试队列即刻,伪代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
func consumeMessageNonBlockByKafka() { for { ... err := process(message) if err != nil { sendToRetryKafka(message) } ... } } func consumeRetryMessage() { for { message := getRetryMessage() err := processRetry(message) if err != nil { ... } } }
可以在sendToKafka这种封装中新增retryTimes
和nextRetryTime
两个字段,分别表示重试次数和下次重试时间,这样就能够很方便的进行重试机制的实现了。
总结
由于Kafka的特性所致,目前没有想到更好的办法能够实现重试机制,如果有,请大家不吝赐教。但是目前NSQ支持Deferred Message,这个可以很方便的实现重试机制,大家可以一试。