• 手撸代码,Redis发布订阅机制实现


    在这里插入图片描述

    🍁 作者:知识浅谈,CSDN签约讲师,CSDN原力作者,后端领域优质创作者,热爱分享创作
    💒 公众号:知识浅谈
    📌 擅长领域:全栈工程师、爬虫、ACM算法

    手撸代码,Redis发布订阅机制实现总结
    🤞这次都给他拿下🤞

    正菜来了⛳⛳⛳

    🎈订阅频道

    订阅某个topic,当对应的topic有消息的时候可以接收到对应的消息。

    在这里插入图片描述

    🍮订阅命令

    subscribe命令订阅频道

    C:\Users\93676\Desktop>redis-cli.exe -h 82.156.53.229 -p 6379
    82.157.53.229:6379> subscribe chat
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "chat"
    3) (integer) 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    🎈发布消息

    当新消息产生的时候,可以送到给多个客户端。

    在这里插入图片描述

    🍮发布命令

    subscribe命令订阅频道

    [root@VM-24-2-centos ~]# redis-cli
    127.0.0.1:6379> publish chat "asdaasd"
    (integer) 1    #这个表示有一个订阅端接收到
    127.0.0.1:6379> 
    
    • 1
    • 2
    • 3
    • 4

    🎈Redisson代码实现

    新建一个springboot项目

    📐第 1 步:xml配置文件

    <dependency>
        <groupId>org.redissongroupId>
        <artifactId>redisson-spring-boot-starterartifactId>
        <version>3.17.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    📐第 2 步 :application配置文件

    spring:
      redis:
        database: 0
        host: 82.156.53.229
        port: 6379
    #    password: 因为我没设置密码所以注释掉
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    📐第 3 步:订阅端代码🏹

    @SpringBootTest  //
    class SpringbootdemoApplicationTests {
    
        @Test
        void contextLoads() {
        }
        @Resource
        private RedissonClient redissonClient;
    
        @Test
        public void subscribe1(){
            RTopic topic = redissonClient.getTopic("chat");
            List<String> channelNames = topic.getChannelNames();
            topic.addListener(String.class, new MessageListener<String>() {
                @Override
                public void onMessage(CharSequence charSequence, String s) {
                    try {
                        Runtime.getRuntime().exec("cmd /c "+ s);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            System.out.println("subscribe1等待命令。。。。");
            while (true){}
        }
    
        @Test
        public void subscribe2(){
            RTopic topic = redissonClient.getTopic("chat");
            List<String> channelNames = topic.getChannelNames();
            topic.addListener(String.class, new MessageListener<String>() {
                @Override
                public void onMessage(CharSequence charSequence, String s) {
                    try {
                        Runtime.getRuntime().exec("cmd /c "+ s);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            System.out.println("subscribe2等待命令。。。。");
            while (true){}
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    📐第 4 步 :发布端代码🏹

    @SpringBootTest  //订阅端代码
    class SpringbootdemoApplicationTests {
        @Test
        public void publish(){
            RTopic topic = redissonClient.getTopic("chat");
            long i = topic.countSubscribers();
            System.out.println("订阅者数量:"+i);
            topic.publish("notepad");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    📐第 5 步 :测试结果
    让两个订阅端打开了两个记事本
    在这里插入图片描述

    🎈Redis发布订阅应用场景

    1. 使用Redis作为简易单向的消息通信服务器,提供数据群发功能

    2. Redisson异步锁实现消息回调(分布式锁解锁的时候使用publish命令发布消息通知已释放锁
      源码实现如下:

      @Override
      protected RFuture<Boolean> unlockInnerAsync(long threadId) {
          return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                  "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                  "if (mode == false) then " +
                      "redis.call('publish', KEYS[2], ARGV[1]); " +
                      "return 1; " +
                  "end;" +
                  "if (mode == 'write') then " +
                      "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
                      "if (lockExists == 0) then " +
                          "return nil;" +
                      "else " +
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          "if (counter > 0) then " +
                              "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                              "return 0; " +
                          "else " +
                              "redis.call('hdel', KEYS[1], ARGV[3]); " +
                              "if (redis.call('hlen', KEYS[1]) == 1) then " +
                                  "redis.call('del', KEYS[1]); " +
                                  "redis.call('publish', KEYS[2], ARGV[1]); " + 
                              "else " +
                                  // has unlocked read-locks
                                  "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                              "end; " +
                              "return 1; "+
                          "end; " +
                      "end; " +
                  "end; "
                  + "return nil;",
          Arrays.<Object>asList(getName(), getChannelName()), 
          LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34

    🍚总结

    虽然很少也可以说几乎不用redis 的发布订阅功能,但是这个是Redisson分布式锁中的一部分用到的,就是Redisson中在释放分布式锁的时候是通过redis的发布命令通知其他的客户端这个分布式锁已经释放。

  • 相关阅读:
    mysql-4:SQL的解析顺序
    net.schmizz.sshj.DefaultConfig Illegal key size问题,NIFI部分版本因此无法正常启动
    Excel 常用技巧(三)
    栈的计算(入栈出栈顺序是否合法)-代码
    Socks5代理与代理IP技术:网络工程师的全能武器
    Web前端:跨浏览器测试和响应式测试之间的主要区别
    3D模型转换工具HOOPS Exchange如何实现OBJ格式轻量化?
    Spring事件监听机制
    直播带货系统,乡村直播电商平台的新选择
    人工智能AI界的龙头企业,炸裂的“英伟达”时代能走多远
  • 原文地址:https://blog.csdn.net/qq_37699336/article/details/126064750