• Redis系列22:Redis 的Pub/Sub能力


    Redis系列1:深刻理解高性能Redis的本质
    Redis系列2:数据持久化提高可用性
    Redis系列3:高可用之主从架构
    Redis系列4:高可用之Sentinel(哨兵模式)
    Redis系列5:深入分析Cluster 集群模式
    追求性能极致:Redis6.0的多线程模型
    追求性能极致:客户端缓存带来的革命
    Redis系列8:Bitmap实现亿万级数据计算
    Redis系列9:Geo 类型赋能亿级地图位置计算
    Redis系列10:HyperLogLog实现海量数据基数统计
    Redis系列11:内存淘汰策略
    Redis系列12:Redis 的事务机制
    Redis系列13:分布式锁实现
    Redis系列14:使用List实现消息队列
    Redis系列15:使用Stream实现消息队列
    Redis系列16:聊聊布隆过滤器(原理篇)
    Redis系列17:聊聊布隆过滤器(实践篇)
    Redis系列18:过期数据的删除策略
    Redis系列19:LRU内存淘汰算法分析
    Redis系列20:LFU内存淘汰算法分析
    Redis系列21:缓存与数据库的数据一致性讨论

    1 关于Redis 的 Pub/Sub

    Redis的发布订阅(Pub/Sub)模式是一种消息传递机制,它允许在发送者和接收者之间建立松耦合的通信关系。在这种模式中,发送者(发布者)将消息发布到一个指定的频道或模式,而接收者(订阅者)可以订阅一个或多个频道,以便接收发布的消息。
    以下是Redis发布订阅模式的主要组件:
    发布者(Publisher):发布者是产生并发布消息的实体。它可以将消息发送到指定的频道或模式。
    订阅者(Subscriber):订阅者是接收并处理消息的实体。它可以订阅一个或多个频道或模式,以便接收相关的消息。
    频道(Channel):频道是发布者和订阅者之间的通信渠道。发布者将消息发送到频道,而订阅者从频道接收消息。

    可以看下图,Publisher 和 Subscriber、Channel的关系很清晰:
    image
    发布者往 "Channel A" 通道发布消息:Hello World!,消息的所有订阅者就会收到这个消息。

    2 使用Pub/Sub实现发布订阅的过程

    Redis实现 发布/订阅 一共有两种模式:

    • 使用频道(Channel)进行发布订阅
    • 使用模式(Pattern)进行发布订阅

    我们知道,Redis 可以支持多个数据库,每个数据库都有自己的命名空间和数据。通过使用多个数据库,可以实现数据隔离、分区和组织。
    但是值得注意的是,这种发布订阅机制与 数据分区空间无关,比如在 db 0 发布消息, 其他区的订阅者都会收到消息。

    2.1 通过频道(Channel)进行发布订阅

    • 首先,Subscriber订阅某个Channel,实现对Channel的监听
    • Publisher 对 Channel 这个服务中心媒介发布消息
    • 所有订阅 Channel 的Subscriber接收到消息

    接下来我们通过这几个步骤看看具体是怎么实现发布与订阅的过程的。

    2.1.1 订阅者订阅频道

    SUBSCRIBE命令:订阅者使用SUBSCRIBE命令订阅一个或多个频道。语法为SUBSCRIBE channel [channel ...]。
    时间复杂度为O(n) ,n 为订阅的 Channel 数量。

    SUBSCRIBE mychannel
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe" // Message Type
    2) "mychannel" // channel
    3) (integer) 1 
    

    执行完该指令后,订阅者可以使用以下命令操作 Pub/Sub 工作:

    • subscribe:订阅
    • unsubscribe:取消订阅
    • psubscribe:订阅者使用PSUBSCRIBE命令订阅一个或多个模式
    • punsubscribe:订阅者使用PUNSUBSCRIBE命令取消订阅一个或多个模式

    我们使用客户端 [subscriber A] 订阅Channel [mychannel] 来接收消息。从上面可以看出响应的信息:

    • "subscribe" :消息类型,枚举是 subscribe、message、unsubscribe
    • "mychannel" :频道的名称
    • 最后的消息内容:不同的消息类型代表不同含义。

    进入订阅后的客户端可以收到 3 种枚举类型的消息:

    • subscribe:订阅成功的消息类型,第2个值是订阅成功的频道名称,第3个值是当前客户端订阅的频道数量。(参考我们上面的mychannel消息)
    • message:客户端接收消息的消息类型,第2个值表示产生消息的频道名称,第3个值是消息的内容。
    • unsubscribe:取消订阅的消息类型,第2个值是对应的频道名称,第3个值是当前客户端订阅的频道数量。值为 0 时说明客户端一个订阅的都没有了,退出订阅状态。

    2.1.2 发布者发布消息

    PUBLISH命令:发布者使用PUBLISH命令将消息发送到指定的频道。语法为 PUBLISH channel message

    PUBLISH mychannel "Hello, World!"
    (integer) 1
    

    需要注意咱们发布的消息并不会持久化存储下来,所以消息发布之后被某个 Subcriber 订阅到的话,消息生命周期基本就完成了。

    2.1.3 订阅者接收消息

    如果想要收到上面 发布者发布的消息,我们的客户端首先需要关注了 [mychannel] 频道,才能收到 "Hello, World!" 这条消息。

    // 第一步,订阅Channel频道
    SUBSCRIBE mychannel
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe" // 订阅成功
    2) "mychannel" // 频道名称
    3) (integer) 1  // 订阅频道数量
    
    // 第二步,当Publisher发布消息,Subcriber订阅到的消息如下
    1) "message" // 接受到消息
    2) "mychannel" // 频道名称
    3) "Hello, World!" // 消息内容
    

    2.1.4 退订频道

    如果你不想收到某个频道的消息了,你可以取消预订。类似取消朋友圈关注,之后就不会收到推送了。
    UNSUBSCRIBE命令:订阅者使用UNSUBSCRIBE命令取消订阅一个或多个频道。语法为 UNSUBSCRIBE channel [channel ...]

    UNSUBSCRIBE mychannel
    

    2.2 使用模式(Pattern)匹配实现发布订阅

    我们来看看另一种实现发布订阅的方案 ,就是模式匹配的方式,除了直接订阅的客户端之外,还会检查是否有与我们模式相匹配的Channel,如果有,
    消息也会发布到对应匹配的频道上,订阅这个Channel的客户端也会收到消息。
    我们来试试看效果,订阅匹配模式如下图:
    image

    当 Message.Queue.Area1 频道接收到消息之后,除了订阅自身频道的 Actor A 和 Actor B 能收到消息之外。因为频道与模式匹配成功,消息还会发送给订阅 Message.Queue.* 模式的所有人员。

    image

    如上面图中的那样,因为使用匹配模式,PUBLISH 消息发布到 Message.Queue.Area2 之外,还会将该 Channel 与匹配模式的Channel进行对比,如果 Channel 与某个模式匹配的话,也将这个消息发布到订阅这个模式的客户端。
    所以中红色线条部分,包括Actor C、Actor D、Actor E 都接受到了消息。

    2.2.1 订阅模式的相关语法

    • 订阅模式指令:PSUBSCRIBE
    PSUBSCRIBE Message.Queue.*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe" // 消息类型:使用模式(Pattern)进行发布订阅
    2) "Message.Queue.*"// 匹配的模式
    3) (integer) 1 //订阅数
    
    • 取消模式订阅的指令:PUNSUBSCRIBE
    PUNSUBSCRIBE Message.Queue.*
    
    • 订阅 Message.Queue.Area1 和 Message.Queue.Area2 频道的
    # Message.Queue.Area1
    SUBSCRIBE Message.Queue.Area1
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "Message.Queue.Area1"
    3) (integer) 1
    
    # Message.Queue.Area2
    SUBSCRIBE Message.Queue.Area2
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "Message.Queue.Area2"
    3) (integer) 1
    
    • Publisher向Message.Queue.Area2发布消息
    # 匹配模式的订阅者收到消息
    PSUBSCRIBE Message.Queue.*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "Message.Queue.*"
    3) (integer) 1
    # 进入订阅状态,接收到消息
    1) "pmessage"  # 消息类型
    2) "Message.Queue.*" # 模式匹配
    3) "Message.Queue.Area2" # 匹配的Channel
    4) "Hello World!" # 具体的消息消息内容
    
    
    # 对应频道的订阅者收到消息
    SUBSCRIBE Message.Queue.Area2
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "Message.Queue.Area2"
    3) (integer) 1
    # 准备接收消息
    1) "message"
    2) "Message.Queue.Area2"
    3) "Hello World!"
    

    因为没有筛重策略,所以如果你既订阅了匹配模式(如 Message.Queue.* ),又订阅了对应的频道(如 Message.Queue.Area2),那么你的客户端会收到两条同样的消息,一条消息类型是message,一条类型是pmessage。

    2.3 程序实现

    参考官方代码:

    # 消息消费代码,监听频道
    RShardedTopic topic = redisson.getShardedTopic("myTopic");
    int listenerId = topic.addListener(SomeObject.class, new MessageListener() {
        @Override
        public void onMessage(String channel, SomeObject message) {
            //...
        }
    });
    
    # 消息生产代码
    // in other thread or JVM(发布消息与监听消息要运行在不同的 JVM,因为同一个redisonClinet,无法监听到自己的消息)
    RShardedTopic topic = redisson.getShardedTopic("myTopic");
    long clientsReceivedMessage = topic.publish(new SomeObject());
    

    3 总结

    Redis 实现发布订阅的功能,包括如下指令:

    • subscribe:订阅
    • unsubscribe:取消订阅
    • psubscribe:订阅者使用PSUBSCRIBE命令订阅一个或多个模式
    • punsubscribe:订阅者使用PUNSUBSCRIBE命令取消订阅一个或多个模式
    • publish channel message:向指定的频道channel发送消息message

    Redis实现 发布/订阅 一共有两种模式:

    • 使用频道(Channel)进行发布订阅
    • 使用模式(Pattern)进行发布订阅

    需要注意的是,当使用Pattern进行发布订阅的时候。如果有消息发布出来,除了订阅该Channel的Client之外,所有订阅了与Channel匹配的模式的Client同样会收到消息。
    另外,Redis 发布订阅的消息不会被持久化,所以无历史消息,也不支持 ACK 机制,与之前介绍过的 List 与 Stream 消息队列能力是不同的,大家注意区分,在不同的场景下合理使用。

  • 相关阅读:
    在 html 文件中写 react ( es module + babel )
    Rust和Pytho写一段采集公众号代码
    iOS 单元测试之常用框架 OCMock 详解
    基础I/O【Linux】
    java学习笔记001
    什么是RPA自动化办公?
    [笔记]JavaScript 实现按钮拖拽效果
    Spring声明式事务基础_JdbcTemplate
    redis-集群理论篇
    线段树上树剖再拿线段树维护:0914T4
  • 原文地址:https://www.cnblogs.com/wzh2010/p/17205455.html