• 05【Redis的发布订阅】


    五、Redis的发布订阅(pub/sub)

    1.1 简介

    Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

    发送者(发布者)并不是直接发送它们的消息给指定的接收者(订阅者),而是将消息发布到特定的消息通道,并且不需要知道订阅者的任何信息。订阅者可以订阅一个或多个感兴趣的消息通道,同时也只会收到他们感兴趣通道的信息,而不用去关心是谁发布的。这种发布者与订阅者的解耦,使其具备更强的扩展性并得到一个更加动态的网络拓扑。

    订阅:客户端订阅喜欢的频道。

    在这里插入图片描述

    发布:消息发送给指定的频道,由频道发送给订阅它的客户端。

    在这里插入图片描述

    1.2 订阅发布应用

    • 订阅频道
    subscribe channel [channel ...] 
    
    • 1

    示例:订阅a、b、c频道

    subscribe a b c
    
    • 1
    • 发布消息
    publish channel message
    
    • 1

    示例:给a频道发送hello

    publish a hello
    
    • 1

    在这里插入图片描述

    • 查看现有多少频道
    127.0.0.1:6379> pubsub channels
    1) "a"
    2) "e"
    3) "c"
    4) "b"
    127.0.0.1:6379>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 退订给定的频道
    unsubscribe channel [channel ...]
    
    • 1

    示例:退订a频道

    unsubscribe a 
    
    • 1

    tips:注意:由于redis客户端订阅操作会占用当前客户端窗口,因此执行不了任何redis命令,退订频道命令一般用于程序客户端操作使用(如Java客户端、C客户端、PHP客户端等)

    • 订阅多个符合条件的频道
    psubscribe parent [parent ...]
    
    • 1

    示例:订阅所有以a结尾的频道

    psubscribe *a
    
    • 1
    • 退订多个符合条件的频道
    punsubscribe parent [parent ...]
    
    • 1

    示例:退订以a结尾的所有频道

    punsubscribe *a *b *c
    
    • 1

    案例:

    client1:

    127.0.0.1:6379> psubscribe goods.*
    
    • 1

    client2:

    127.0.0.1:6379> psubscribe order.*
    
    • 1

    client3:

    127.0.0.1:6379> publish goods.save hello~
    (integer) 1
    127.0.0.1:6379> publish ace hi~
    (integer) 1
    127.0.0.1:6379> 
    
    • 1
    • 2
    • 3
    • 4
    • 5

    观测变化:

    在这里插入图片描述

    1.3 Java操作发布订阅API

    1.3.1 Jedis操作发布订阅

    在Jedis中提供有JedisPubSub类,该类主要用于监听触发发布订阅指定命令的执行;当有发布订阅相关命令执行时,就会触发JedisPubSub中指定的方法;

    • JedisPubSub提供的方法如下:
    方法触发时机
    onSubscribe当有频道订阅时触发(subscribe)
    onMessage当有频道收到消息时触发(publish)
    onUnsubscribe当有频道退订时触发(unsubscribe)
    onPSubscribe当使用psubscribe命令订阅一批频道时触发
    onPUnsubscribe当使用punsubscribe命令退订一批频道时触发
    onPMessage当使用pubsub命令时调用
    • 编写监听类:
    package com.dfbz.listener;
    
    import redis.clients.jedis.JedisPubSub;
    
    /**
     * @author lscl
     * @version 1.0
     * @intro:
     */
    public class PubSubListener extends JedisPubSub {
    
        /**
         * 当有频道进行订阅时调用(subscribe)
         *
         * @param channel:              订阅的频道
         * @param subscribedChannels:   当前jedis连接订阅了几个频道
         */
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            System.out.println("有频道【" + channel + "】订阅了: " + subscribedChannels);
        }
    
        /**
         * 当频道收到消息时调用(publish)
         *
         * @param channel
         * @param message
         */
        @Override
        public void onMessage(String channel, String message) {
            System.out.println("接收到来自【" + channel + "】的信息【" + message + "】");
    
            // 如果发送的消息是close时退订这个频道
            if("close".equals(message)){
                this.unsubscribe(channel);
            }
        }
    
        /**
         * 取消订阅频道时调用(unsubscribe)
         *
         * @param channel
         * @param subscribedChannels
         */
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
            System.out.println("有频道【" + channel + "】取消订阅了: " + subscribedChannels);
        }
    
        /**
         * 使用psubscribe命令订阅一批频道时触发
         *
         * @param pattern
         * @param subscribedChannels
         */
        public void onPSubscribe(String pattern, int subscribedChannels) {
            System.out.println("使用【" + pattern + "】表达式订阅了频道: " + subscribedChannels);
    
        }
    
        /**
         * 使用punsubscribe命令退订一批频道时触发
         *
         * @param pattern
         * @param subscribedChannels
         */
        public void onPUnsubscribe(String pattern, int subscribedChannels) {
            System.out.println("使用【" + pattern + "】表达式退订了频道: " + subscribedChannels);
        }
    
    
        /**
         * 使用pubsub命令时触发
         *
         * @param pattern
         * @param channel
         * @param message
         */
        public void onPMessage(String pattern, String channel, String message) {
            System.out.println("订阅表达式为【" + pattern + "】,订阅的频道是【" + channel + "】,消息【" + message + "】");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 测试类:
    package com.dfbz.demo01;
    
    import com.dfbz.listener.PubSubListener;
    import org.junit.Before;
    import org.junit.Test;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    /**
     * @author lscl
     * @version 1.0
     * @intro:
     */
    public class Demo01 {
    
        private JedisPool jedisPool;
    
        @Before
        public void before() {
            jedisPool = new JedisPool("localhost", 6379);
        }
    
        @Test
        public void test1() throws Exception {
    
            PubSubListener pubSubListener = new PubSubListener();
            // 从连接池获取一个连接
            Jedis jedis = jedisPool.getResource();
    
            jedis.subscribe(pubSubListener, "a", "b", "c");
    
        }
    
        @Test
        public void test2() throws Exception {
            Jedis jedis = jedisPool.getResource();
    
            jedis.publish("a", "a say hello~");
            jedis.publish("b", "b say hello~");
            jedis.publish("c", "c say hello~");
    
            // 退订a频道
            jedis.publish("a", "close");
    
            // 退订是比较耗时的操作
            Thread.sleep(10);
            jedis.publish("a", "a say hello~");
            jedis.publish("b", "b say hello~");
            jedis.publish("c", "c say hello~");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    测试pattern模式:

    package com.dfbz.demo01;
    
    import com.dfbz.listener.PubSubListener;
    import org.junit.Before;
    import org.junit.Test;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    /**
     * @author lscl
     * @version 1.0
     * @intro:
     */
    public class Demo02 {
    
        private JedisPool jedisPool;
    
        @Before
        public void before() {
            jedisPool = new JedisPool("localhost", 6379);
        }
    
        @Test
        public void test1() throws Exception {
    
            PubSubListener pubSubListener = new PubSubListener();
            // 从连接池获取一个连接
            Jedis jedis = jedisPool.getResource();
    
            jedis.psubscribe(pubSubListener, "goods.*", "order.*");
    
        }
    
        @Test
        public void test2() throws Exception {
            Jedis jedis = jedisPool.getResource();
    
            jedis.publish("goods.save","goods.save hello~");
            jedis.publish("goods.delete","goods.delete hello~");
            jedis.publish("order.query","order.query hello~");
            jedis.publish("order.update","order.update hello~");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    1.3.2 SpringBoot操作发布订阅

    • 1)引入依赖:
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.dfbz</groupId>
        <artifactId>02_Redis_SpringBoot</artifactId>
        <version>1.0-SNAPSHOT</version>
    
    
        <parent>
            <artifactId>spring-boot-parent</artifactId>
            <groupId>org.springframework.boot</groupId>
            <version>2.0.1.RELEASE</version>
        </parent>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
        </dependencies>
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 2)编写监听器:

    CRMListener:

    package com.dfbz.listener;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.stereotype.Component;
    
    /**
     * @author lscl
     * @version 1.0
     * @intro: 监听CRM业务相关的消息
     */
    @Component
    public class CRMListener implements MessageListener {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @Override
        public void onMessage(Message message, byte[] bytes) {
            // redis序列化工具
            RedisSerializer<?> serializer = redisTemplate.getValueSerializer();
    
            // 频道
            byte[] channel = message.getChannel();
    
            // 发送的数据
            byte[] body = message.getBody();
            System.out.println("我是CRMListener--来自【" + new String(channel) + "】频道的消息:【" +serializer.deserialize(body) + "】");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    OAListener:

    package com.dfbz.listener;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.stereotype.Component;
    
    /**
     * @author lscl
     * @version 1.0
     * @intro: 监听OA业务相关的消息
     */
    @Component
    public class OAListener implements MessageListener {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            // redis序列化工具
            RedisSerializer<?> serializer = redisTemplate.getValueSerializer();
    
            // 频道
            byte[] channel = message.getChannel();
    
            // 发送的数据
            byte[] body = message.getBody();
    
            System.out.println("我是OAListener--来自【" + new String(channel) + "】频道的消息:【" + serializer.deserialize(body).toString() + "】");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 启动类(注册监听):
    package com.dfbz;
    
    import com.dfbz.listener.CRMListener;
    import com.dfbz.listener.OAListener;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.ChannelTopic;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.Topic;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author lscl
     * @version 1.0
     * @intro:
     */
    @SpringBootApplication
    public class RedisApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(RedisApplication.class);
        }
    
        // 封装成监听适配器
        @Bean
        public MessageListenerAdapter CRMListenerAdapter(CRMListener crmListener) {
            return new MessageListenerAdapter(crmListener);
        }
    
        // 封装成监听适配器
        @Bean
        public MessageListenerAdapter OAListenerAdapter(OAListener oaListener) {
            return new MessageListenerAdapter(oaListener);
        }
    
    
        // 注册监听适配器
        @Bean
        public RedisMessageListenerContainer redisMessageListenerContainer(
                RedisConnectionFactory redisConnectionFactory,
                MessageListenerAdapter CRMListenerAdapter,
                MessageListenerAdapter OAListenerAdapter
        ) {
    
            // 监听规则集合
            List<Topic> oaList = new ArrayList<Topic>();
    
            // 普通订阅,订阅具体的频道
            ChannelTopic deptTopic = new ChannelTopic("dept");
            oaList.add(deptTopic);
    
            // 模式订阅,支持模式匹配订阅,*为模糊匹配符
            PatternTopic userTopic = new PatternTopic("user.*");
            oaList.add(userTopic);
    
            // 监听规则集合
            List<Topic> crmList = new ArrayList<Topic>();
    
            // 普通订阅,订阅具体的频道
            ChannelTopic examineTopic = new ChannelTopic("examine");
            crmList.add(examineTopic);
    
            // 模式订阅,支持模式匹配订阅,*为模糊匹配符
            PatternTopic reportTopic = new PatternTopic("report.*");
            crmList.add(reportTopic);
    
            // 监听容器
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
            redisMessageListenerContainer.addMessageListener(CRMListenerAdapter, crmList);
            redisMessageListenerContainer.addMessageListener(OAListenerAdapter, oaList);
            return redisMessageListenerContainer;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 测试类:
    package com.dfbz.demo01;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * @author lscl
     * @version 1.0
     * @intro:
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Demo01 {
        @Autowired
        private RedisTemplate redisTemplate;
    
    
        @Test
        public void test1(){
    
            redisTemplate.convertAndSend("dept","dept hello~");
            redisTemplate.convertAndSend("user.save","user.save hello~");
            redisTemplate.convertAndSend("user.delete","user.delete hello~");
            redisTemplate.convertAndSend("examine","examine hello~");
            redisTemplate.convertAndSend("report.query","report.query hello~");
            redisTemplate.convertAndSend("report.update","report.update hello~");
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  • 相关阅读:
    【计算机硬件体系架构】计算机电脑基本架构
    sklearn模型整理
    使用ansible统一管理修改Linux和Windows管理员密码
    c++可变参数模板
    Visio从安装到使用完整版
    “以太坊杀手” Polkadot 何以在一众公链中脱颖而出
    Java读取Excel并生成Word&PDF
    Linux笔记——Ubuntu子系统从系统盘迁移到非系统盘
    【Unittest】Requests实现小程序项目接口测试
    ACPI规范概览-1
  • 原文地址:https://blog.csdn.net/Bb15070047748/article/details/125433860