• redis中stream数据结构使用详解——redis最适合做消息队列的数据结构


    写在前面

    Redis Stream 是 Redis 5.0 版本新增加的数据结构。

    Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

    简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

    而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

    Redis Stream 有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
    在这里插入图片描述
    每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

    上图解析:

    • Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
    • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
    • pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

    stream的使用

    消息队列相关命令:

    • XADD - 添加消息到末尾
    • XTRIM - 对流进行修剪,限制长度
    • XDEL - 删除消息
    • XLEN - 获取流包含的元素数量,即消息长度
    • XRANGE - 获取消息列表,会自动过滤已经删除的消息
    • XREVRANGE - 反向获取消息列表,ID 从大到小
    • XREAD - 以阻塞或非阻塞方式获取消息列表

    消费者组相关命令:

    • XGROUP CREATE - 创建消费者组
    • XREADGROUP GROUP - 读取消费者组中的消息
    • XACK - 将消息标记为"已处理"
    • XGROUP SETID - 为消费者组设置新的最后递送消息ID
    • XGROUP DELCONSUMER - 删除消费者
    • XGROUP DESTROY - 删除消费者组
    • XPENDING - 显示待处理消息的相关信息
    • XCLAIM - 转移消息的归属权
    • XINFO - 查看流和消费者组的相关信息;
    • XINFO GROUPS - 打印消费者组的信息;
    • XINFO STREAM - 打印流信息

    XADD命令

    使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。

    基本语法:

    XADD key ID field value [field value ...]
    
    • 1

    key :队列名称,如果不存在就创建
    ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
    field value : 记录,类似key-value的形式,每次add可以添加多个属性。

    基本用法:

    # 往streamkey1 中添加一个id为1,name为zhangsan,age为15的值,如果消息的ID我们设置为*,redis会自动生成一个ID并返回
    127.0.0.1:6379> xadd streamkey1 * id 1 name zhangsan age 15
    "1662703935644-0"
    # 往streamkey1 中添加一个id为2,name为lisi,address为qingdao的值
    127.0.0.1:6379> xadd streamkey1 * id 2 name lisi address qingdao
    "1662703953898-0"
    # 我们查看一下streamkey1中有多少个元素了
    127.0.0.1:6379> xlen streamkey1
    (integer) 2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    XTRIM命令

    XTRIM将流裁剪为指定数量的项目,如有需要,将驱逐旧的项目(ID较小的项目)。此命令被设想为接受多种修整策略,但目前只实现了一种,即MAXLEN,并且与XADD中的MAXLEN选项完全相同。

    该命令返回从流中删除的条目数。

    基本语法:

    XTRIM key MAXLEN [~] count
    
    • 1

    key :队列名称
    MAXLEN :长度
    count :数量
    在选项MAXLEN和实际计数中间的参数~的意思是,用户不是真的需要精确的count个项目。它可以多几十个条目,但决不能少于count个。通过使用这个参数,仅当我们移除整个节点的时候才执行修整。这使得命令更高效,而且这也是我们通常想要的。

    基本用法:

    # 再往streamkey1中 添加一个元素
    127.0.0.1:6379> xadd streamkey1 * id 3 name wangwu
    "1662704343522-0"
    # 现在有三个元素了
    127.0.0.1:6379> xlen streamkey1
    (integer) 3
    # 指定streamkey1 元素最大长度为2,返回值1为删除的元素个数
    127.0.0.1:6379> xtrim streamkey1 MAXLEN 2
    (integer) 1
    # 现在只有两个元素了
    127.0.0.1:6379> xlen streamkey1
    (integer) 2
    # 我们发现,删除了第一个元素
    127.0.0.1:6379> xrange streamkey1 - +
    1) 1) "1662703953898-0"
       2) 1) "id"
          2) "2"
          3) "name"
          4) "lisi"
          5) "address"
          6) "qingdao"
    2) 1) "1662704343522-0"
       2) 1) "id"
          2) "3"
          3) "name"
          4) "wangwu"
    
    # 此时继续往里放元素,还是能够正常放的
    127.0.0.1:6379> xadd streamkey1 * id 4
    "1662704516937-0"
    127.0.0.1:6379> xrange streamkey1 - +
    1) 1) "1662703953898-0"
       2) 1) "id"
          2) "2"
          3) "name"
          4) "lisi"
          5) "address"
          6) "qingdao"
    2) 1) "1662704343522-0"
       2) 1) "id"
          2) "3"
          3) "name"
          4) "wangwu"
    3) 1) "1662704516937-0"
       2) 1) "id"
          2) "4"
    127.0.0.1:6379> xlen streamkey1
    (integer) 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    XDEL命令

    使用 XDEL 删除消息。

    基本语法:

    XDEL key ID [ID ...]
    
    • 1

    key:队列名称
    ID :消息 ID

    基本用法:

    # streamkey1中有三个元素
    127.0.0.1:6379> xrange streamkey1 - +
    1) 1) "1662703953898-0"
       2) 1) "id"
          2) "2"
          3) "name"
          4) "lisi"
          5) "address"
          6) "qingdao"
    2) 1) "1662704343522-0"
       2) 1) "id"
          2) "3"
          3) "name"
          4) "wangwu"
    3) 1) "1662704516937-0"
       2) 1) "id"
          2) "4"
    # 删除id为1662703953898-0的元素
    127.0.0.1:6379> xdel streamkey1 1662703953898-0
    (integer) 1
    # 此时还剩下俩元素了
    127.0.0.1:6379> xrange streamkey1 - +
    1) 1) "1662704343522-0"
       2) 1) "id"
          2) "3"
          3) "name"
          4) "wangwu"
    2) 1) "1662704516937-0"
       2) 1) "id"
          2) "4"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    XLEN命令

    使用 XLEN 获取流包含的元素数量,即消息长度。

    基本语法:

    XLEN key
    
    • 1

    基本用法:

    # 查看streamkey1中元素个数
    127.0.0.1:6379> xlen streamkey1
    (integer) 2
    
    • 1
    • 2
    • 3

    XRANGE命令

    使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。

    基本语法:

    XRANGE key start end [COUNT count]
    
    • 1

    key :队列名
    start :开始值, - 表示最小值
    end :结束值, + 表示最大值
    count :数量

    基本用法:

    # 查询streamkey1中所有元素
    127.0.0.1:6379> xrange streamkey1 - +
    1) 1) "1662704343522-0"
       2) 1) "id"
          2) "3"
          3) "name"
          4) "wangwu"
    2) 1) "1662704516937-0"
       2) 1) "id"
          2) "4"
    # 查询streamkey1中第一个元素
    127.0.0.1:6379> xrange streamkey1 - + COUNT 1
    1) 1) "1662704343522-0"
       2) 1) "id"
          2) "3"
          3) "name"
          4) "wangwu"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    XREVRANGE命令

    使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息。
    跟XRANGE命令恰好相反,显示的结果是倒叙排列的。

    基本语法:

    XREVRANGE key end start [COUNT count]
    
    • 1

    key :队列名
    end :结束值, + 表示最大值
    start :开始值, - 表示最小值
    count :数量

    基本用法:

    127.0.0.1:6379> xrange streamkey1 - +
    1) 1) "1662704343522-0"
       2) 1) "id"
          2) "3"
          3) "name"
          4) "wangwu"
    2) 1) "1662704516937-0"
       2) 1) "id"
          2) "4"
    127.0.0.1:6379> xrevrange streamkey1 + -
    1) 1) "1662704516937-0"
       2) 1) "id"
          2) "4"
    2) 1) "1662704343522-0"
       2) 1) "id"
          2) "3"
          3) "name"
          4) "wangwu"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    XREAD命令

    使用 XREAD 以阻塞或非阻塞方式获取消息列表。

    基本语法:

    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
    
    • 1

    count :数量
    milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
    key :队列名
    id :消息 ID,如果从头读的话,id写0即可

    其中STREAMS是必须要输入的,后面的key和id可以输入多个,但是一定要一一对应。

    基本用法:

    # streamkey1中有两个元素
    127.0.0.1:6379> xrange streamkey1 - +
    1) 1) "1662704343522-0"
       2) 1) "id"
          2) "3"
          3) "name"
          4) "wangwu"
    2) 1) "1662704516937-0"
       2) 1) "id"
          2) "4"
    # 读取一个
    127.0.0.1:6379> xread COUNT 1 STREAMS streamkey1 0
    1) 1) "streamkey1"
       2) 1) 1) "1662704343522-0"
             2) 1) "id"
                2) "3"
                3) "name"
                4) "wangwu"
    # 还剩两个
    127.0.0.1:6379> xrange streamkey1 - +
    1) 1) "1662704343522-0"
       2) 1) "id"
          2) "3"
          3) "name"
          4) "wangwu"
    2) 1) "1662704516937-0"
       2) 1) "id"
          2) "4"
    # 此时再读的话,还是会读到重复的数据
    127.0.0.1:6379> xread COUNT 1 STREAMS streamkey1 0
    1) 1) "streamkey1"
       2) 1) 1) "1662704343522-0"
             2) 1) "id"
                2) "3"
                3) "name"
                4) "wangwu"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    XGROUP命令

    使用 XGROUP 创建、销毁、管理消费者组。

    基本语法:

    XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
    
    • 1

    key :队列名称,如果不存在就创建
    groupname :组名。
    $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

    基本用法:

    # 创建streamkey1 的streamgroup1,从头开始消费
    127.0.0.1:6379> xgroup create streamkey1 streamgroup1 0-0
    OK
    # 创建streamkey1 的streamgroup1,从尾开始消费
    127.0.0.1:6379> xgroup create streamkey1 streamgroup1 $
    OK
    # 销毁一个消费者组
    127.0.0.1:6379> xgroup destroy streamkey1 streamgroup1
    (integer) 1
    # 移除一个消费者
    XGROUP DELCONSUMER streamkey1 streamgroup1 myconsumer123
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    XREADGROUP GROUP命令

    使用 XREADGROUP GROUP 读取消费组中的消息。
    消费的消息会记录在pending列表里,等待xack确认。

    基本语法:

    XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
    
    • 1

    group :消费组名
    consumer :消费者名。
    count : 读取数量。
    milliseconds : 阻塞毫秒数。
    key : 队列名。
    ID : 消息 ID。

    基本用法:

    # 队列中有两条消息
    127.0.0.1:6379> xrange streamkey1 - +
    1) 1) "1662704343522-0"
       2) 1) "id"
          2) "3"
          3) "name"
          4) "wangwu"
    2) 1) "1662704516937-0"
       2) 1) "id"
          2) "4"
    # 最后的ID我们不能指定为0,需要使用>符号来消费
    127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 0
    1) 1) "streamkey1"
       2) (empty array)
    # 消费key为streamkey1 、消费者组为streamgroup1 、消费者为streamconsumer1 ,消费一条消息
    127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 >
    1) 1) "streamkey1"
       2) 1) 1) "1662704343522-0"
             2) 1) "id"
                2) "3"
                3) "name"
                4) "wangwu"
    127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 >
    1) 1) "streamkey1"
       2) 1) 1) "1662704516937-0"
             2) 1) "id"
                2) "4"
    127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 >
    (nil)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    XINFO命令

    用于检索关于流和关联的消费者组的不同的信息。

    基本语法:

    XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
    
    • 1

    基本用法:

    # 查询消费者信息
    127.0.0.1:6379> xinfo consumers streamkey1 streamgroup1
    1) 1) "name"
       2) "streamconsumer1"
       3) "pending"
       4) (integer) 2
       5) "idle"
       6) (integer) 504511
    # 查询组信息
    127.0.0.1:6379> xinfo groups streamkey1
    1) 1) "name"
       2) "streamgroup1"
       3) "consumers"
       4) (integer) 1
       5) "pending"
       6) (integer) 2
       7) "last-delivered-id"
       8) "1662704516937-0"
    # 查询key的完整信息,包括所有条目、组、消费者信息
    127.0.0.1:6379> xinfo stream streamkey1
     1) "length"
     2) (integer) 2
     3) "radix-tree-keys"
     4) (integer) 1
     5) "radix-tree-nodes"
     6) (integer) 2
     7) "last-generated-id"
     8) "1662704516937-0"
     9) "groups"
    10) (integer) 1
    11) "first-entry"
    12) 1) "1662704343522-0"
        2) 1) "id"
           2) "3"
           3) "name"
           4) "wangwu"
    13) "last-entry"
    14) 1) "1662704516937-0"
        2) 1) "id"
           2) "4"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    XPENDING命令

    通过XREADGROUP命令消费的消息,其实并没有实际消费成功,之后使用XACK命令做出应答之后才算真正消费成功。
    读取了但是没有应答的消息,存放在XPENDING列表中。

    基本语法:

    XPENDING key group [start end count] [consumer]
    
    • 1

    基本用法:

    # 读取streamkey1中streamgroup1组的两条pending数据
    127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2
    1) 1) "1662704343522-0"
       2) "streamconsumer1"
       3) (integer) 1179060
       4) (integer) 1
    2) 1) "1662704516937-0"
       2) "streamconsumer1"
       3) (integer) 1173716
       4) (integer) 1
    # 读取streamkey1中streamgroup1组的3条pending数据(只有两条所以显示了两条)
    127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 3
    1) 1) "1662704343522-0"
       2) "streamconsumer1"
       3) (integer) 1181652
       4) (integer) 1
    2) 1) "1662704516937-0"
       2) "streamconsumer1"
       3) (integer) 1176308
       4) (integer) 1
    # 读取streamkey1中streamgroup1组的两条pending数据,可以指定消费者
    127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer1
    1) 1) "1662704343522-0"
       2) "streamconsumer1"
       3) (integer) 1199490
       4) (integer) 1
    2) 1) "1662704516937-0"
       2) "streamconsumer1"
       3) (integer) 1194146
       4) (integer) 1
    127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer2
    (empty array)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    XCLAIM命令

    在流的消费者组上下文中,此命令改变待处理消息的所有权, 因此新的所有者是在命令参数中指定的消费者。通常是这样的:

    • 假设有一个具有关联消费者组的流。
    • 某个消费者A在消费者组的上下文中通过XREADGROUP从流中读取一条消息。
    • 作为读取消息的副作用,消费者组的待处理条目列表(PEL)中创建了一个待处理消息条目:这意味着这条消息已传递给给定的消费者,但是尚未通过XACK确认。
    • 突然这个消费者出现故障,且永远无法恢复。
    • 其他消费者可以使用XPENDING检查已经过时很长时间的待处理消息列表,为了继续处理这些消息,他们使用XCLAIM来获得消息的所有权,并继续处理。

    命令选项
    该命令有多个选项,但是大部分主要用于内部使用,以便将XCLAIM或其他命令的结果传递到AOF文件, 以及传递相同的结果到从节点,并且不太可能对普通用户有用:

    IDLE : 设置消息的空闲时间(自最后一次交付到目前的时间)。如果没有指定IDLE,则假设IDLE值为0,即时间计数被重置,因为消息现在有新的所有者来尝试处理它。
    TIME : 这个命令与IDLE相同,但它不是设置相对的毫秒数,而是将空闲时间设置为一个指定的Unix时间(以毫秒为单位)。这对于重写生成XCLAIM命令的AOF文件很有用。
    RETRYCOUNT : 将重试计数器设置为指定的值。这个计数器在每一次消息被交付的时候递增。通常,XCLAIM不会更改这个计数器,它只在调用XPENDING命令时提供给客户端:这样客户端可以检测到异常,例如在大量传递尝试后由于某种原因从未处理过的消息。
    FORCE: 在待处理条目列表(PEL)中创建待处理消息条目,即使某些指定的ID尚未在分配给不同客户端的待处理条目列表(PEL)中。但是消息必须存在于流中,否则不存在的消息ID将会被忽略。
    JUSTID: 只返回成功认领的消息ID数组,不返回实际的消息。

    基本语法:

    XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]
    
    • 1

    基本用法:

    # 10秒后未应答,将该消息的所有权给streamconsumer1 
    127.0.0.1:6379> xclaim streamkey1 streamgroup1 streamconsumer1 10000 1662704516937-0
    1) 1) "1662704516937-0"
       2) 1) "id"
          2) "4"
    
    • 1
    • 2
    • 3
    • 4
    • 5

    XACK命令

    将挂起的消息标记为已正确处理,从而有效地将其从使用者组的挂起条目列表中删除。命令的返回值是成功确认的消息数,也就是我们实际上能够在PEL中解析的id。

    基本语法:

    XACK key group ID [ID ...]
    
    • 1

    基本用法:

    127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer1
    1) 1) "1662704516937-0"
       2) "streamconsumer1"
       3) (integer) 468271
       4) (integer) 2
    2) 1) "1662709302078-0"
       2) "streamconsumer1"
       3) (integer) 416375
       4) (integer) 1
    # id为1662704516937-0的消息,我确认处理完毕了,此时就会从pending列表删掉
    127.0.0.1:6379> xack streamkey1 streamgroup1 1662704516937-0
    (integer) 1
    127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer1
    1) 1) "1662709302078-0"
       2) "streamconsumer1"
       3) (integer) 442613
       4) (integer) 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
  • 相关阅读:
    DVWA之SQL注入(盲注)
    springboot整合es
    【3D视觉】PointNet解析
    基于SpringBoot的水果销售网站
    Java 定时器
    【Rust 日报】2022-09-04 Rust基金会招募
    SpringCloud Gateway--网关服务基本介绍和基本原理
    paper 阅读: An introduction to ROC analysis
    【收藏】使用jieba 进行基于Paddle的词性标注
    13.1.X:ByteScout PDF Extractor SDK
  • 原文地址:https://blog.csdn.net/A_art_xiang/article/details/126781423