• 【Redis】Redis 的学习教程(九)之 发布 Pub、订阅 Sub


    1. Pub/Sub 介绍

    Redis 的发布订阅(Pub/Sub)模式是一种消息传递机制,它允许在发送者和接收者之间建立松耦合的通信关系。在这种模式中,发送者(发布者)将消息发布到一个指定的频道或模式,而接收者(订阅者)可以订阅一个或多个频道,以便接收发布的消息。

    以下是Redis发布订阅模式的主要组件:

    • 发布者(Publisher):发布者是产生并发布消息的实体。它可以将消息发送到指定的频道或模式。
    • 订阅者(Subscriber):订阅者是接收并处理消息的实体。它可以订阅一个或多个频道或模式,以便接收相关的消息。
    • 频道(Channel):频道是发布者和订阅者之间的通信渠道。发布者将消息发送到频道,而订阅者从频道接收消息。

    可以看下图,Publisher 和 Subscriber、Channel 的关系很清晰:

    在这里插入图片描述
    发布者往 “Channel A” 通道发布消息:Hello World!,消息的所有订阅者就会收到这个消息

    2. 使用 Pub/Sub 实现发布订阅

    Redis实现 发布/订阅 一共有两种模式:

    1. 使用频道(Channel)进行发布订阅
    2. 使用模式(Pattern)进行发布订阅

    Redis 可以支持多个数据库,每个数据库都有自己的命名空间和数据。通过使用多个数据库,可以实现数据隔离、分区和组织

    但是值得注意的是:这种发布订阅机制与 数据分区空间无关,比如在 db 0 发布消息, 其他区的订阅者都会收到消息

    Redis 使用以下命令操作 Pub/Sub 工作:

    • SUBSCRIBE:订阅一个或多个频道
      • 语法:SUBSCRIBE channel [channel ...]
    • UNSUBSCRIBE :取消订阅一个或多个频道
      • 语法:UNSUBSCRIBE [channel [channel ...]]
    • PSUBSCRIBE:订阅一个或多个模式
      • 语法:PSUBSCRIBE pattern [pattern ...]
    • PUNSUBSCRIBE取消订阅一个或多个模式
      • 语法:PUNSUBSCRIBE [pattern [pattern ...]]
    • PUBSUB CHANNELS [pattern]:列出活跃的 channel
    • PUBSUB NUMSUB [channel-1 ... channel-N]:列出 channel 的订阅者个数

    2.1 通过频道(Channel)进行发布订阅

    通过频道(Channel)进行发布订阅过程如下:

    1. Subscriber 订阅某个 Channel,实现对 Channel 的监听
    2. Publisher 对 Channel 这个服务中心媒介发布消息
    3. 所有订阅 Channel 的 Subscriber 接收到消息

    2.1.1 订阅者订阅频道

    订阅后:

    在这里插入图片描述
    使用客户端 [subscriber A] 订阅 Channel [mychannel] 来接收消息。从上面可以看出响应的信息:

    • “subscribe” :消息类型,枚举是 subscribe、message、unsubscribe
    • “mychannel” :频道的名称
    • 最后的消息内容:不同的消息类型代表不同含义。

    进入订阅后的客户端可以收到 3 种枚举类型的消息:

    • subscribe:订阅成功的消息类型,第 2 个值是订阅成功的频道名称,第 3 个值是当前客户端订阅的频道数量。
    • message:客户端接收消息的消息类型,第 2 个值表示产生消息的频道名称,第 3 个值是消息的内容。
    • unsubscribe:取消订阅的消息类型,第 2 个值是对应的频道名称,第 3 个值是当前客户端订阅的频道数量。值为 0 时说明客户端一个订阅的都没有了,退出订阅状态。

    2.1.2 发布者发布消息

    发布消息:

    在这里插入图片描述
    发布的消息并不会持久化存储下来,所以消息发布之后被某个 Subcriber 订阅到的话,消息生命周期基本就完成了

    2.1.3 订阅者接收消息

    想要收到上面 发布者发布的消息,我们的客户端首先需要关注了 [mychannel] 频道,才能收到 “Hello, World!” 这条消息

    在这里插入图片描述

    2.1.4 退订频道

    如果你不想收到某个频道的消息了,你可以取消预订

    2.2 使用模式(Pattern)匹配实现发布订阅

    来看看另一种实现发布订阅的方案 ,就是模式匹配的方式:除了直接订阅的客户端之外,还会检查是否有与我们模式相匹配的 Channel,如果有,消息也会发布到对应匹配的频道上,订阅这个 Channel 的客户端也会收到消息

    如下图:

    在这里插入图片描述
    当 Message.Queue.Area1 频道接收到消息之后,除了订阅自身频道的 Actor A 和 Actor B 能收到消息之外。因为频道与模式匹配成功,消息还会发送给订阅 Message.Queue.* 模式的所有人员。

    在这里插入图片描述
    因为使用匹配模式,PUBLISH 消息发布到 Message.Queue.Area2 之外,还会将该 Channel 与匹配模式的Channel进行对比,如果 Channel 与某个模式匹配的话,也将这个消息发布到订阅这个模式的客户端。

    所以图中红色线条部分,包括 Actor C、Actor D、Actor E 都接受到了消息

    2.2.1 订阅者订阅频道

    Client A 订阅 Message.Queue.Area1:

    在这里插入图片描述

    Client B 订阅 Message.Queue.Area2:

    在这里插入图片描述
    Client C 订阅 Message.Queue.*:

    在这里插入图片描述

    2.2.2 发布者发布消息

    在这里插入图片描述

    2.2.3 订阅者接收消息

    对应频道的订阅者收到消息(Client A ):

    在这里插入图片描述
    匹配模式的订阅者收到消息(Client C):

    在这里插入图片描述

    因为没有筛重策略,所以如果你既订阅了匹配模式(如 Message.Queue.* ),又订阅了对应的频道(如 Message.Queue.Area2),那么你的客户端会收到两条同样的消息,一条消息类型是message,一条类型是 pmessage

    3. SpringBoot 整合 Redis 实现发布订阅模式

    3.1 概述

    订阅消息就是接收消息,这个比较复杂。既有对 Redis 连接的管理,也有对消费消息的线程池的管理。不过 Spring 已经把这个“重活”给干了。

    Spring 提供了一个全套的解决方案,这里面包括:

    1. 订阅/取消订阅这些相关的用户操作
    2. 接收所有来自Redis的消息
    3. 把这些消息按照订阅关系分发给具体的消费者
    4. 触发消费消息的回调代码在线程池中运行

    由于 Spring 已经全权代理,用户只需要提供要消费的 topic 以及对应的消费回调代码即可。

    我们需要了解Spring提供的几个接口和类,才可以很好的使用:

    1. Topic 接口,表示一个订阅对象:它有两个实现类,ChannelTopicPatternTopic,前者对应 redis的 channel,后者对应 redis 的 pattern
    2. MessageListener 接口,回调接口,通过它来执行业务代码
    3. Message 接口,表示从 redis 接收到的消息
    4. RedisMessageListenerContainer 类,这个核心类,相当于一个代理,就是它负责接收 redis 的消息,并分发给 MessageListener
    5. RedisConnectionFactoryRedisMessageListenerContainer 需要此类 RedisConnectionFactory:redis连接工厂,用来获取一个redis连接,由于这个连接用于接收消息,所以它是一直阻塞着的
    6. 还可以为这个类指定一个 Executor,即线程池,这不是必须的,如果不指定它会生成一个默认的

    SpringBoot集成Redis Messaging (Pub/Sub)

    3.2 在 Springboot 中使用发布订阅

    先说下在 springboot 中使用 redis 的发布订阅的步骤:

    1. 配置消息监听类(实现 MessageListener 接口,重写 onMessage() 方法)。
    2. 添加监听容器(配置 RedisMessageListenerContainer)。
    3. 订阅频道。
    4. 向频道发布消息。

    3.2.1 配置消息监听类

    添加一个订单消息监听器:

    @Component
    public class OrderSubscriber implements MessageListener {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            // 获取消息
            byte[] messageBody = message.getBody();
            // 使用值序列化器转换
            Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
            // 获取监听的频道
            byte[] channelByte = message.getChannel();
            // 使用字符串序列化器转换
            Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
            // 渠道名称转换
            String patternStr = new String(pattern);
            System.out.println(patternStr);
            System.out.println("---频道---: " + channel);
            System.out.println("---消息内容---: " + msg);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    3.2.2 容器添加监听器、订阅频道

    @Configuration
    public class RedisConfig {
    
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            template.setConnectionFactory(redisConnectionFactory);
    
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            // json 序列化配置
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            // String 序列化
            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
            // 所有的 key 采用 string 的序列化
            template.setKeySerializer(stringRedisSerializer);
            // 所有的 value 采用 jackson 的序列化
            template.setValueSerializer(jackson2JsonRedisSerializer);
            // hash 的 key 采用 string 的序列化
            template.setHashKeySerializer(stringRedisSerializer);
            // hash 的 value 采用 jackson 的序列化
            template.setHashValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, OrderSubscriber orderSubscriber) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            // 设置连接工厂
            container.setConnectionFactory(redisConnectionFactory);
            // 监听器订阅频道
            container.addMessageListener(orderSubscriber, new ChannelTopic("order"));
            container.addMessageListener(orderSubscriber, new ChannelTopic("sms"));
            // 序列化
            Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            seria.setObjectMapper(objectMapper);
            container.setTopicSerializer(seria);
            return container;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    3.2.3.1 容器添加多个监听器

    添加一个短信监听器:

    @Component
    public class SmsSubscriber implements MessageListener {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            // 获取消息
            byte[] messageBody = message.getBody();
            // 使用值序列化器转换
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    修改配置:

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, OrderSubscriber orderSubscriber, SmsSubscriber smsSubscriber) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 设置连接工厂
        container.setConnectionFactory(redisConnectionFactory);
        // 监听器订阅频道
        container.addMessageListener(orderSubscriber, Arrays.asList(new ChannelTopic("order"), new ChannelTopic("sms")));
        container.addMessageListener(smsSubscriber, new ChannelTopic("sms"));
        // 序列化
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        container.setTopicSerializer(seria);
        return container;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    3.2.3.2 使用 PatternTopic
    container.addMessageListener(smsSubscriber, new PatternTopic("redis.*"));
    
    • 1

    3.2.3 向频道发布消息

    @RestController
    @RequestMapping("/pub")
    public class PubController {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @GetMapping("/publish")
        public String publish() {
            redisTemplate.convertAndSend("order", "该订单已过期");
            redisTemplate.convertAndSend("sms", "该短信已发送");
            return "publish";
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.3 使用 MessageListenerAdapter 实现发布订阅

    1、定义 一个消息接受类

    @Component
    public class OrderMessageReceiver {
    
        public void receiveMessage(String message, String channel){
            System.out.println("---频道---: " + channel);
            System.out.println("---消息内容---: " + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2、配置一个 MessageListenerAdapter

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter adapter, SmsSubscriber smsSubscriber) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 设置连接工厂
        container.setConnectionFactory(redisConnectionFactory);
        // 监听器订阅频道
        container.addMessageListener(adapter, Arrays.asList(new ChannelTopic("order"), new ChannelTopic("sms")));
        container.addMessageListener(smsSubscriber, new PatternTopic("redis.*"));
        // 序列化
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        container.setTopicSerializer(seria);
        return container;
    }
    
    @Bean
    public MessageListenerAdapter smsExpirationListener(OrderMessageReceiver messageListener) {
        MessageListenerAdapter receiveMessage = new MessageListenerAdapter(messageListener, "receiveMessage");
        // 序列化
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        receiveMessage.setSerializer(seria);
        return receiveMessage;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    Spring boot整合Redis实现发布订阅(超详细)
    springboot中使用redis发布订阅

    4. 总结

    当使用 Pattern 进行发布订阅的时候。如果有消息发布出来,除了订阅该 Channel 的 Client 之外,所有订阅了与 Channel 匹配的模式的 Client 同样会收到消息。

    另外,Redis 发布订阅的消息不会被持久化,所以无历史消息,也不支持 ACK 机制,

  • 相关阅读:
    每日三题 6.30(2)
    调试-Debug
    【MySQL基础】常用指令详解
    桥接模式(Bridge Pattern)
    C++ Reference: Standard C++ Library reference: C Library: cmath: erfc
    基于PaddlePaddle框架对CIFAR-100数据集在简易CNN(LeNet-5修改)和简易DNN的效果对比
    ETL工具之Talend简介与安装
    君正T40 GPIO接口及操作方法
    来自BAT的一份Java高级开发岗面试指南:金三银四必定面试无忧
    进程间通信之信号量--使用信号实现生产者消费者问题
  • 原文地址:https://blog.csdn.net/sco5282/article/details/132800836