有道无术,术尚可求,有术无道,止于术。
本系列Redis 版本 7.2.5
源码地址:https://gitee.com/pearl-organization/study-redis-demo
消息队列:是指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递,生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由服务端给其推送消息。
Redis
也支持消息队列功能,在 5.0
版本之前,基于以下两种方式实现:
Pub/Sub
List
Pub/Sub
发布订阅模式,消息的发送者不会将消息直接发送给特定的接收者,而是通过消息通道广播出去,让订阅该消息主题的订阅者消费到:
Pub/Sub
中的消息无法持久化,如果出现网络断开、宕机等,消息就会被丢弃。而且也没有 Ack
机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。
Redis List
也可以实现消息队列,按照插入顺序排序,可以添加一个元素到列表的头部(左边)或者尾部(右边)。 将需要延后处理的任务结构体序列化成字符串塞进 Redis
的列表,另一个线程从这个列表中轮询数据进行处理:
Redis List
同样存在诸多问题,比如,不支持多消费者模式,不支持延时消息,不支持优先级,不支持消息确认机制等等。
Redis Stream
是 5.0
版本中引入的一种新的数据结构,用于实现简单但功能强大的消息传递模式。以时间序列的方式存储消息,每个消息都有一个唯一的 ID
,并且可以被多个消费者订阅和消费。是 Redis
实现消息队列的另外一种模式,支持消息的持久化、支持自动生成全局唯一 1D
、支持 Ack
确认消息模式、支持消费组模式等,旨在让消息队列更加的稳定和可靠。
其结构图如下:
各部分解释:
Message Content
:消息内容Consumer group
:消费组,通过 XGROUP CREATE
命令创建,同一个消费组可以有多个消费者Last_delivered_id
:游标,每个消费组会有个游标 Last_delivered_id
,任意一个消费者读取了消息都会使游标往前移动。Consumer
:消费者,消费组中的消费者Pending_ ids
:消费者会有一个状态变量,用于记录被当前消费已读取但未 ack 的消息 Id ,如果客户端没有 ack ,这个变量里面的消息 ID
会越来越多,一旦某个消息被 ack
它就开始减少。这个 pending_ids
变量在 Redis
官方被称之为 PEL
(Pending Entries List
),记录了当前已经被客户端读取的消息,但是还没有 ack
(Acknowledge character
:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理Stream
相关所有命令:
命名 | 描述 |
---|---|
XACK | 确认消费者已经成功处理从 Stream 中获取的消息 |
XADD | 添加消息到队列末尾 |
XAUTOCLAIM | 转移符合指定条件的待处理流条目的所有权 |
XCLAIM | 改变待处理消息的所有权 |
XDEL | 删除消息 |
XGROUP CREATE | 为存储在 key 的流创建一个新的消费者组 |
XGROUP CREATECONSUMER | 要在存储在key 的流的消费者组中创建一个消费者 |
XGROUP DELCONSUMER | 消费者组中删除一个消费者 |
XGROUP DESTROY | 删除一个已存在的消费者组 |
XGROUP SETID | 为消费者组设置最后传递的ID |
XINFO CONSUMERS | 返回消费者组中的消费者列表 |
XINFO GROUPS | 返回消费者组列表 |
XINFO STREAM | 存储在的key 流的相关信息 |
XLEN | 获取 Stream 中的消息长度 |
XPENDING | 通过消费者组从流中获取数据但不确认这些数据,会产生待处理条目 |
XRANGE | 获取消息列表(可以指定范围) |
XREAD | 获取消息(阻塞/非阻塞),返回大于指定 ID 的消息 |
XREADGROUP | XREAD 命令的一个特殊版本,支持消费者组 |
XREVRANGE | 和 XRANGE 相比区别在于反向获取,ID 从大到小 |
XSETID | 内部命令。它用于主节点来复制流的最后传递的ID |
XTRIM | 限制 Stream 的长度,如果已经超长会进行截取 |
XADD
命令用于向 Stream
(流)数据结构末尾添加消息。
语法格式:
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
参数说明:
key
:指定要添加消息的 Stream
的名称。[NOMKSTREAM]
:可选参数,用于指定当流不存在时是否报错。默认情况下,如果指定的流不存在,XADD
命令会创建。如果使用NOMKSTREAM
选项,则流不存在时命令会失败。[MAXLEN|MINID [=|~] threshold [LIMIT count]]
:这组选项用于控制流的最大长度或最小消息 ID
。
MAXLEN maxlen
:限制 Stream
的最大长度。当长度达到maxlen
时,旧的消息会被自动删除。MINID minid
:指定最旧的消息ID
。当插入新消息时,如果已经存在比minid
更旧的消息,则会将这些消息删除。[=|~]
:操作符,=
表示精确匹配,~
表示小于等于(对于MINID
而言)或大于等于(对于MAXLEN
而言)。[LIMIT count]
:当使用MAXLEN
和~
时,指定需要保留的消息数量的最小值。*|ID
:消息的ID
。使用*
表示自动生成一个唯一的ID
。如果不使用*
,则需要提供一个有效的消息ID
,它必须大于流中所有已存在的消息的ID
。field value [field value ...]
:消息的字段和值。可以指定一个或多个字段及其对应的值。示例,插入消息:
localhost:0>XADD mystream * msg_1 100 msg_2 38
"1719279960591-0"
示例, 插入消息,并限制长度不超过 1000
条:
localhost:0>XADD mystream MAXLEN 1000 * msg_3 100 msg_4 38
"1719279971749-0"
查看控制台:
XRANGE
命令用于获取指定范围内的消息。
命令格式:
XRANGE key start end [COUNT count]
参数说明:
key
:指定 Stream
的 key
。start
:指定要检索的消息范围的起始 ID
。可以使用特殊值-来表示最小值。end
:指定要检索的消息范围的结束 ID
。可以使用特殊值+来表示最大值。[COUNT count]
:可选参数,用于限制返回的消息数量。注意事项:
Stream
的消息 ID
由两部分组成:一个时间戳和一个序列号。时间戳表示消息被添加到 Stream
的时间,而序列号则用于在同一时间戳内区分不同的消息。XRANGE
命令返回的消息是按照它们在 Stream
中的顺序排列的,即按照消息 ID
的顺序。COUNT
参数,但指定的范围内的消息数量少于 COUNT
指定的数量,那么只会返回范围内的所有消息。示例,检索所有消息:
localhost:0>XRANGE mystream - +
1) 1) "1719279960591-0"
2) 1) "msg_1"
2) "100"
3) "msg_2"
4) "38"
2) 1) "1719279971749-0"
2) 1) "msg_3"
2) "100"
3) "msg_4"
4) "38"
示例,检索特定范围内的消息:
localhost:0>XRANGE mystream 1719279960591-0 1719279960600-0
1) 1) "1719279960591-0"
2) 1) "msg_1"
2) "100"
3) "msg_2"
4) "38"
示例,限制返回的消息数量:
localhost:0>XRANGE mystream - + COUNT 1
1) 1) "1719279960591-0"
2) 1) "msg_1"
2) "100"
3) "msg_2"
4)
XREVRANGE
命令与 XRANGE
命令类似,但它是按照消息 ID
的递减顺序(用于反向)获取指定范围内的消息。
命令格式:
XREVRANGE key end start [COUNT count]
示例,检索最后两个时间序列的消息:
localhost:0>XREVRANGE mystream + - COUNT 2
1) 1) "1719279971749-0"
2) 1) "msg_3"
2) "100"
3) "msg_4"
4) "38"
2) 1) "1719279960591-0"
2) 1) "msg_1"
2) "100"
3) "msg_2"
4) "38"
XDEL
命令用于从 Stream 中删除指定的消息。返回一个整数,表示被成功删除的消息数量。
命令格式:
XDEL key ID [ID ...]
参数说明:
key
:指定 Stream
的 key
。ID
:一个或多个要删除的消息的 ID
。注意事项:
XDEL
命令时,你需要确保提供的消息 ID
是存在的,否则命令将不会删除任何消息,并返回0。XDEL
命令删除多个消息,只需在命令中提供多个消息 ID
即可。XDEL
命令不会改变 Stream
的其他消息的顺序或 ID
。示例,删除消息:
localhost:0>XDEL mystream 1719280747405-0
"1"
XLEN
命令用于获取指定 Stream
中包含的消息数量,即流的长度。如果 Stream
不存在或为空,则返回 0
。
命令格式:
XLEN key
示例:
localhost:0>XLEN mystream
"1"
XREAD
命令是用于从 Stream
独立消费消息,支持阻塞等待新消息的到来。返回一个数组,其中每个元素都是一个包含 Stream key
和消息列表的数组。消息列表是一个包含消息 ID
和消息数据的数组。
命令格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
参数说明:
COUNT count
:指定一次读取的最大消息数量。如果未指定,则默认为 1
。BLOCK milliseconds
:用于指定阻塞的时间(以毫秒为单位)。如果指定了此参数,并且 Stream
中没有可消费的消息,客户端将在指定的时间内阻塞等待。如果设置为 0
,则表示非阻塞模式,即如果没有消息可消费,则立即返回。STREAMS key [key ...]
:指定要从中读取消息的 Stream
的 key
。可以指定一个或多个。ID [ID ...]
:对于每个指定的 key
,可以提供一个或多个消息 ID
。这些 ID
用于指定从哪个位置开始读取消息。如果某个 key
后面没有指定 ID
,则默认为从该 Stream
的最新消息开始读取。示例,非阻塞模式读取最新消息:
XREAD COUNT 1 STREAMS mystream $
示例,阻塞模式,读取最新消息并等待新消息:
XREAD COUNT 1 BLOCK 10000 STREAMS mystream $
XGROUP CREATE
命令用于在已存在的流(stream
)上创建一个新的消费者组(consumer group
)。消费者组允许多个消费者(consumer
)协作消费同一个流中的数据,并且每个消费者都可以从自己的位置开始读取流。
命令格式:
XGROUP CREATE <key> <groupname> <id> [MKSTREAM] [MKTABLE] [CREATECONSUMER <consumername>]
参数说明:
:流的名称。
:消费者组的名称。
:消费者组初始的最后一个条目 ID
,即消费者组开始读取的起始点。可以使用$
表示流的最新条目,或者使用0
表示流的起始点,或者使用任何其他有效的 ID
。[MKSTREAM]
:可选参数,如果流不存在,则创建它。[MKTABLE]
:在 Redis 6.2
及更高版本中引入的可选参数,用于创建与流关联的二级索引表(secondary index table
)。这通常用于支持基于特定字段的查询。[CREATECONSUMER ]
:在 Redis 6.2
及更高版本中引入的可选参数,用于在创建消费者组时同时创建一个消费者。示例,创建一个新的消费者组,从流的最新条目开始读取:
localhost:0>XGROUP CREATE mystream mygroup $ MKSTREAM
"OK"
XACK
命令用于确消费者已经成功处理了一个或多个消息。这些消息通常是从流(Stream
)中读取的,并存储在消费者组的待处理条目列表(Pending Entry List
,PEL
)中。通过发送 XACK
命令,消费者通知 Redis
服务器它已经完成了一个或多个消息的处理,从而将这些消息从 PEL
中移除。
命令格式:
XACK <key> <groupname> <consumername> <ID> [<ID> ...]
参数说明:
:流的名称。
:消费者组的名称。
:消费者的名称。
:要确认的消息的ID
,可以指定一个或多个。示例,假设消费者已经读取了一些消息,并决定它们已经被成功处理。现在,消费者想要确认这些消息:
XACK mystream mygroup myconsumer 1526569900000-0 1526569900002-0
在这个例子中,消费者确认了两个消息,它们的 ID
分别是 1526569900000-0
和 1526569900002-0
。
一旦消息被确认,它们将从该消费者组的 PEL
中移除,表示这些消息已经被成功处理。注意,即使消息被确认并从 PEL
中移除,它们仍然保留在流中,并且可以被其他消费者组或消费者读取。
如果消费者在处理消息时失败,或者需要稍后重试,它可以选择不发送 XACK
命令,这样消息将保持在 PEL
中,直到消费者准备好确认它们或它们因超时而被自动从 PEL
中移除(取决于消费者组的配置)。
XPENDING
命令用于查询消费者组中未确认消息的详细信息。允许你查看哪些消息正在等待被处理,以及哪些消费者拥有这些消息。
命令格式:
XPENDING <key> <groupname> [start end count] [consumername]
参数说明:
:流的名称。
:消费者组的名称。[start end count]
:这三个参数是可选的,用于限制查询结果的范围。start
:查询的开始消息ID
。end
:查询的结束消息ID
。count
:要返回的消息数量。[consumername]
:可选参数,指定要查询的消费者的名称。如果不提供此参数,将返回消费者组中的所有未确认消息。XPENDING
命令返回一个数组,其中包含以下信息:
ID
:字符串,表示在指定范围内未确认消息的最小ID
。ID
:字符串,表示在指定范围内未确认消息的最大ID
。注意事项:
XPENDING
是一个只读命令,它不会修改任何数据。consumername
参数,则只返回该消费者的未确认消息信息。[start end count]
参数,则只返回指定范围内的未确认消息信息。XPENDING
命令,你可以轻松地监控消费者组中的未确认消息,从而确保消息得到及时处理,并在必要时进行故障排除。示例:
XPENDING mystream mygroup
2) "1526569900000-0" # 最小消息ID
3) "1526569900002-0" # 最大消息ID
4) 1) 1) "myconsumer" # 消费者名称
2) (integer) 2 # 该消费者拥有的未确认消息数
Redis Stream
主要用于消息队列,所以可以用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。但是更推荐使用专业的消息队列,比如RabbitMQ
、RocketMQ
等,对于简单的应用场景,如果能满足需求,也可以使用Redis Stream
。