ads:
关注以下公众号查看更多文章
最原始的redis队列是基于list实现的,
入队列
lpush mq_old "fanghailiang"
出队列
rpop mq_old
有三个缺点:
1. 即使队列中没有数据,也要不断的rpop出队列,造成程序空转
2. 数据进入队列后只能被消费一次,业务上在插入队列时就在队列的其他字段指明了消费方式,调用哪个类的什么函数进行消费
3. 队列被消费中途发生错误,造成队列数据丢失。
针对第一个问题,改进方法是,使用brpop
brpop mq_old 60
会阻塞队列60秒,直到收到数据或者时间到了才结束等待
针对第三个问题,可以在消费原始队列的时候把数据移入处理中的队列,处理结束后从处理中队列移除
BRPOPLPUSH mq_old mq_old_ing 60
数据先进入进行中的队列,处理完成后再从进行中队列移除数据
rpop mq_old_ing
针对第二个问题,需要引入消费组 的概念,list数据类型已经无法适用,改用 stream 数据类型
放入队列
xadd mq_new * name fanghailiang
创建消费组
XGROUP create mq_new group1 0
用消费组里的一个消费者消费消息
XREADGROUP group group1 consumer1 streams mq_new >
ps:为了防止空转,可以使用阻塞消费,命令是
XREADGROUP block 1000 group group1 consumer1 streams mq_new >
在没有确认消费时,消息会被放入处理中的队列
XPENDING mq_new group1
返回内容如下:
- 1) (integer) 7
- 2) "1669618113591-0"
- 3) "1669618690472-0"
- 4) 1) 1) "consumer1"
- 2) "7"
确认消息消费
XACK mq_new group1 1599274912765-0