• redis 队列


     ads

    关注以下公众号查看更多文章

    list类型

    最原始的redis队列是基于list实现的,

    入队列

    lpush mq_old "fanghailiang"

    出队列

    rpop mq_old

    有三个缺点:

    1. 即使队列中没有数据,也要不断的rpop出队列,造成程序空转

    2. 数据进入队列后只能被消费一次,业务上在插入队列时就在队列的其他字段指明了消费方式,调用哪个类的什么函数进行消费

    3. 队列被消费中途发生错误,造成队列数据丢失。

    针对第一个问题,改进方法是,使用brpop

    brpop mq_old 60

    阻塞队列60秒,直到收到数据或者时间到了才结束等待

    针对第三个问题,可以在消费原始队列的时候把数据移入处理中的队列,处理结束后从处理中队列移除

    BRPOPLPUSH mq_old mq_old_ing 60

    数据先进入进行中的队列,处理完成后再从进行中队列移除数据

    rpop mq_old_ing

    stream类型

    针对第二个问题,需要引入消费组 的概念,list数据类型已经无法适用,改用 stream 数据类型

    放入队列

    xadd mq_new * name fanghailiang

    创建消费组

    XGROUP create mq_new group1 0

    用消费组里的一个消费者消费消息

    XREADGROUP group group1 consumer1 streams mq_new >

    ps:为了防止空转,可以使用阻塞消费,命令是

    XREADGROUP block 1000 group group1 consumer1 streams mq_new >

    在没有确认消费时,消息会被放入处理中的队列

    XPENDING mq_new group1

    返回内容如下:

    1. 1) (integer) 7
    2. 2) "1669618113591-0"
    3. 3) "1669618690472-0"
    4. 4) 1) 1) "consumer1"
    5. 2) "7"

    确认消息消费

    XACK mq_new group1 1599274912765-0

  • 相关阅读:
    Laravel 框架资源嵌套.浅嵌套.自定义&表单伪造.CSRF 保护 ④
    Redis特殊“三巨头”
    PMP_第3章章节试题
    关于Django
    12.2 实现键盘模拟按键
    vscode的窗口下拉显示行数不够
    第七章集合与字典作业
    小程序实现语音识别功能
    Sqoop基本操作
    一、Flink 1.13 源码解析前导——Akka通信模型
  • 原文地址:https://blog.csdn.net/fanghailiang2016/article/details/128078709