Redis Stream,是Redis 5.0版本新增加的一个高级数据结构,从字面意思看就是一个流结构,但是在功能上,Redis Stream是Redis对消息队列的一个最佳实现,几乎满足了一个典型消息队列组件应该具备的所有功能,这是Redis作者谋划多年,也是Redis 5.0版本最大的一个Feature了。
上图是我根据Redis Stream的功能和命令画的大体结构图,可以看出Redis Stream有一个Stream消息队列将所有的消息都串起来,Stream内的每个消息都有一个唯一的ID和相应的消息内容,并且消息还是持久化的,在Redis重启后,消息内容还在;Stream可以接收多个生产者的消息;每个Stream都有一个唯一标识,本质上就是Redis的一个key;Stream可以挂多个消费者组或者单消费者,它们之间是相互独立的,都可以消费到Stream内的所有消息。
在消费者组内部,每个消费者有一个组内名称,它们之间是竞争的,共同维护一个last_delivered_id变量,向前推进消费消息;在每个消费者内部会有一个状态数组变量pending_ids,记录了当前已经被Client读取,但是还没有ack的消息,如果Client没有ack,这个变量里面的消息ID会越来越多,ack之后,就开始减少了。
Redis Stream是一个新的强大的支持多播的可持久化的消息队列,作者坦言Redis Stream狠狠地借鉴了Kafka的设计,难道是想取代Kafka吗?来看一下它实现了哪些内容:包括但不限于:
-
消息ID的序列化生成:由两部分组成:时间戳-序号。时间戳是毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型(int64);序号是在这个毫秒时间点内的消息序号,也是个64位整型。
-
消息遍历
-
消息的阻塞和非阻塞读取
-
消息的分组消费
-
未完成消息的处理
-
坏消息问题、Dead Letter、死信问题
扫描二维码关注公众号,回复: 10969959 查看本文章 -
消息队列监控、pending等待ack列表
-
持久化、主从复制、高可用
Redis Stream涉及的基本命令:
-
生产消息:
XADD key ID field string [field string ...]
- key为消息队列的名称。
-
ID为消息ID,最常使用*,表示由Redis自动生成消息ID,这也是强烈建议的方案。当然也支持自己设定。
-
field string [field string]就是当前消息内容,由1个或多个key-value构成。
-
消费消息:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
- [COUNT count],用于限定获取的消息数量:COUNT 10 获取10条消息。
-
[BLOCK milliseconds],用于设置XREAD为阻塞模式,默认为非阻塞模式。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待,需要设置一个等待时间。
-
key为消息队列的名称。
-
ID,用于设置由哪个消息ID开始读取。ID是单调递增,0表示从第一条消息开始,可以设置ID向后读取,使用Block模式,配合$作为ID,表示读取最新的消息。
-
消费者组管理:
XGROUP CREATE mq mqGroup 0
- 在消息队列mq上创建消费组 mpGroup,最后一个参数0,表示该消费组是从第一条消息开始消费的。
-
在消息队列mq上创建消费组 mpGroup,最后一个参数0,表示该消费组是从第一条消息开始消费的。
XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >
- 消费者组mqGroup内的消费者consumerA在消息队列mq中消费,参数>表示未被消费者组消费的起始消息,参数count 1表示获取一条。
-
消息丢失:
XPENDING mq mqGroup [start end count] [consumerA]
XACK mq mqGroup ID
- 为了解决消息丢失问题,Stream设计了Pending列表,用于记录读取但并未处理完毕的消息,图中的pending_ids[ ]即是列表,通过这个列表保证消息不丢失。
-
命令XPENDING用来获消费者组或组内消费者的未处理完毕的消息列表;命令XACK则负责确认消费者组内消息处理已完成。
-
每个Pending的消息有4个属性,在执行命令后可以看到,分别是:消息ID、所属消费者、已读取时长IDLE、消息被读取次数delivery counter。
-
消息转移:
/*转移超过3600s的消息1553585533795-1到消费者A的Pending列表*/
XCLAIM mq mqGroup consumerA 3600000 1553585533795-1
- 当消费者宕机而且不能上线时,需要将Pending列表中的消息移交给其他消费者处理,被转移的消息的IDLE会被重置,用以保证不会被重复转移。
-
坏消息删除:
XDEL mq 1553585533795-1 /*删除队列mq中的消息*/
XRANGE mq - + /*查看队列中消息*/
- 坏消息(也叫死信,DeadLetter,无法投递的消息):长时间处于Pending列表无法被处理的消息,即使被转移也是如此,该消息的delivery counter会累加,当delivery counter达到一个阈值时,就认为是坏消息,可以删除。
-
信息监控:
XINFO STREAM mq /*消息队列信息*/
XINFO GROUPS mq /*消费组信息*/
XINFO CONSUMERS mq mqGroup /*消费组成员信息*/
- 通过XINFO命令实现信息查询。
Redis Stream命令汇总:
一个最典型的消息队列就是XADD命令配合XREAD BLOCK命令完成,XADD负责生成消息,XREAD BLOCK负责堵塞消费消息。Redis Stream可以进行组内消费的基本原理是,Stream类型会为每个组记录一个最后处理(交付)的消息ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。可以参考:Redis Stream消费组结构图。
Redis Stream和基于BLOCKED LIST、PUB/SUB、ZSET的消息队列比较:
Redis现有的BLOCKED LIST、PUB/SUB、ZSET数据类型,虽然也都可以在简单的场景下作为消息队列来使用,但是Redis Stream无疑要完善很多:Redis Stream提供了消息的持久化和主备复制功能、采用新的RadixTree(基数树)数据结构来支持更高效的内存使用和消息读取、甚至是类似于Kafka的Consumer Group功能。
好了,有关Redis Stream的基础知识,我们就讲到这里,由于文章篇幅有限,更深层的知识我们讲的不多,对Redis Stream感兴趣的同学可以自行百度搜索,包括RadixTree、Redis版本更新、命令等等,也可以亲自动手实践一下,说不定哪一天Redis Stream会帮上大忙。
PS:如有任何问题或疑问,请留言告诉我。
喜欢这篇文章的朋友,欢迎关注公众号,第一时间收到更新内容。