消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流,并基于数据通信来进行分布式系统的集成。
消息队列具有 低耦合、可靠投递、广播、流量控制、最终一致性 等功能。
常见的消息队列 有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等。
通过提供 消息传递 和 消息排队 模型,它可以在 分布式环境 下提供 应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步 等功能。
Redis的list数据结构是一个双向链表,使用出队入队即可实现消息队列。
LPUSH:将一个或多个值value插入到列表key的表头如果有多个value值,那么各个value值按从左到右的顺序依次插入到表头。
RPOP:移除并返回列表key的尾元素。
RPUSH与LPOP同理。
通过 LPUSH,RPOP 这样的方式,会存在一个性能风险点,就是消费者如果想要及时的处理数据,就要在程序中写个类似 while(true) 这样的逻辑,不停地去调用 RPOP 或 LPOP 命令,这就会给消费者程序带来些不必要的性能损失。
Redis 还提供了 BLPOP、BRPOP 这种阻塞式读取的命令(带 B-Bloking的都是阻塞式),客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。这种方式就节省了不必要的 CPU 开销。
数据存入格式:lpush listname v1 v2 v3
v 表示存入链表的值
阻塞等待指令格式:blpop list_name timeout
listname 为 取出内容的列表名
timeout为等待超时时间,如果为0,则可无限等待
"发布/订阅"模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息。
常用的指令:
subscribe channel1 [channel...]: 订阅一个或多个频道
publish channel1 msg:向一个频道发送消息
psubscribe pattern [pattern]:订阅与pattern格式匹配的所有频道
pattern通配符:
h?llo:?可以替换为任意一个其他字母,比如hello;而hllo和hkkllo不行
hllo: 可以替换为0个或多个其他字母,比如hllo、hello、heeeello
h[ae]llo:可以替换为任意一个中括号中的字母,比如hallo、hello;而hillo不行
Redis 5.0 版本新增了一个更强大的数据结构——Stream。它提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
命令格式:XADD key [NOMKSTREAM] [
NOMKSTREAM:如果队列不存在,是否自动创建队列,默认自动创建
[= | ~] threshold [LIMIT count] :设置消息队列的最大消息数量
<* | id> 消息的唯一id,* 代表由redis自动生成。* 格式是"时间戳-递增数字",例如"1644804662707-0"
field value [field value …]:发送到队列中的消息队列,称为Entry。格式就是多个key-value键值对。
示例:
# 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成id
XADD users * name jack age 21
命令格式:XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
[COUNT count]:每次读取消息的最大数量
[BLOCK milliseconds]:当没有消息时,是否阻塞、阻塞时长
STREAMS key [key …]:要从哪个队列读取消息,key就是队列名
起始id,只返回大于该id的消息;0代表从第一个消息开始;$代表从最新的消息开始
读取最新的消息示例:
XREAD COUNT 1 BLOCK 1000 STREAMS users $
消费者组:将多个消费者划分到一个组里,监听同一个队列。
XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
其他对应指令:
# 删除指定的消费者组
XGROUP DESTROY key group
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key group consumer
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key group consumer
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
[NOACK] STREAMS key [key ...] id [id ...]
group:消费者组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当前消息等待最大时长
NOACK:无需手动ACK,获取消息后自动确认
STREAMS key:指定监听一个或多个队列名称
ID:获取消息的起始ID:
- “ >” 从下一个未消费的消息开始
- 其他:根据指定ID从pending-list中获取一个消费但未确认的消息,例如0,是从pending-list中第一个消息开始
XACK key group id [id ...]
key:消息队列名称
group:组名
id:确认的消息ID
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
如果业务要求较高,可以考虑使用更加专业的 Kafka、RocketMQ、RabbitMQ。