• 93、Redis 之 使用连接池管理Redis6.0以上的连接 及 消息的订阅与发布


    ★ 使用连接池管理Redis连接

    从Redis 6.0开始,Redis可支持使用多线程来接收、处理客户端命令,因此应用程序可使用连接池来管理Redis连接。

    上一章讲的是创建单个连接来操作redis数据库,这次使用连接池来操作redis数据库

    Lettuce连接池 支持需要 Apache Commons Pool2 的支持,需要添加该依赖

    接下来即可在程序中通过类似如下代码片段来创建连接池了。
    var conf = new GenericObjectPoolConfig>();

    conf.setMaxTotal(20); // 设置连接池中允许的最大连接数

    // 创建连接池对象(其中连接由redisClient的connectPubSub方法创建)
    pool = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, conf);

    代码演示

    创建连接池对象,创建两个消息订阅者和一个消息发布者,然后操作redis数据库

    1、添加依赖
    在这里插入图片描述

    Subscriper 第一个消息订阅者

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    启动这个消息订阅者的程序
    在这里插入图片描述

    Subscriper 第二个消息订阅者

    直接拷贝第一个消息订阅者,然后修改这个消息订阅者只订阅 c2 这个channel 主题
    在这里插入图片描述

    Publisher 消息发布者

    也是拷贝消息订阅者的代码,因为创建连接池对象的代码都是一样的。
    这里只需要把消息订阅的方法改成消息发布的方法就可以了,其他代码一样。

    在这里插入图片描述

    测试:

    测试成功
    消息发布者成功发布消息
    消息订阅者也能接收到各自订阅的channel的消息
    用小黑窗测试也没有问题
    在这里插入图片描述

    完整代码

    Subscriper

    package cn.ljh.app;
    
    
    import io.lettuce.core.RedisClient;
    import io.lettuce.core.RedisURI;
    import io.lettuce.core.ScoredValue;
    import io.lettuce.core.api.StatefulRedisConnection;
    import io.lettuce.core.api.sync.RedisCommands;
    import io.lettuce.core.pubsub.RedisPubSubAdapter;
    import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
    import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
    import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
    import io.lettuce.core.support.ConnectionPoolSupport;
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    import java.time.Duration;
    
    //使用 Lettuce ,这个类是消息订阅者
    //通过连接池操作redis数据库
    public class Subscriper
    {
        private RedisClient redisClient;
        //连接池pool对象
        private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;
    
        public void init()
        {
            //1、定义RedisURI
            RedisURI uri = RedisURI.builder()
                    .withHost("127.0.0.1")
                    .withPort(6379)
                    //选择redis 16个数据库中的哪个数据库
                    .withDatabase(0)
                    .withPassword(new char[]{'1', '2', '3', '4', '5', '6'})
                    .withTimeout(Duration.ofMinutes(5))
                    .build();
            //2、创建 RedisClient 客户端
            this.redisClient = RedisClient.create(uri);
    
            //创建连接池的配置对象
            //GenericObjectPoolConfig> conf = new GenericObjectPoolConfig>();
            var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();
            //设置连接池允许的最大连接数
            conf.setMaxTotal(20);
            //3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)
            pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);
        }
    
        //关闭资源
        public void closeResource()
        {
            //关闭连接池--先开后关
            this.pool.close();
            //关闭RedisClient 客户端------最先开的最后关
            this.redisClient.shutdown();
        }
    
    
        //订阅消息的方法
        public void subscribe() throws Exception
        {
    
            //从连接池中取出连接
            StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();
    
            //4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法
            RedisPubSubCommands cmd = conn.sync();
    
            //监听消息:消息到来时,是通过监听器来实现的
            conn.addListener(new RedisPubSubAdapter<>()
            {
                //匿名内部类重写这3个方法:收到消息、订阅主题、取消订阅主题
    
                //接收来自普通的channel的消息,就用这个方法(就是没带模式的,比如那些主从、集群模式,点进RedisPubSubAdapter类里面看)
                //接收消息的方法
                @Override
                public void message(String channel, String message)
                {
                    System.err.printf("从 %s 收到消息 : %s\n " , channel , message);
                }
    
                //订阅普通channel激发的方法,
                //订阅主题的方法--下面有这个订阅的方法cmd.subscribe("c1", "c2");
                //不太清楚这个 subscribed方法 和 下面的 cmd.subscribe 方法的关联 todo
                @Override
                public void subscribed(String channel, long count)
                {
                    System.err.println("完成订阅 :" + count);
                }
    
                //不订阅普通的channel所使用方法--取消订阅
                //取消订阅的方法
                @Override
                public void unsubscribed(String channel, long count)
                {
                    System.err.println("取消订阅");
                }
            });
    
            //订阅消息------订阅了 c1 和 c2 这两个主题 channel
            cmd.subscribe("c1", "c2");
    
        }
    
        public static void main(String[] args) throws Exception
        {
            Subscriper subscriper = new Subscriper();
            subscriper.init();
            subscriper.subscribe();
            //改程序只订阅了60分钟,超过60分钟就程序就退出不订阅了
            Thread.sleep(600000);
            //关闭资源
            subscriper.closeResource();
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118

    Subscriper2

    package cn.ljh.app;
    
    
    import io.lettuce.core.RedisClient;
    import io.lettuce.core.RedisURI;
    import io.lettuce.core.pubsub.RedisPubSubAdapter;
    import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
    import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
    import io.lettuce.core.support.ConnectionPoolSupport;
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    import java.time.Duration;
    
    //使用 Lettuce ,这个类是消息订阅者2
    //通过连接池操作redis数据库
    public class Subscriper2
    {
        private RedisClient redisClient;
        //连接池pool对象
        private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;
    
        public void init()
        {
            //1、定义RedisURI
            RedisURI uri = RedisURI.builder()
                    .withHost("127.0.0.1")
                    .withPort(6379)
                    //选择redis 16个数据库中的哪个数据库
                    .withDatabase(0)
                    .withPassword(new char[]{'1', '2', '3', '4', '5', '6'})
                    .withTimeout(Duration.ofMinutes(5))
                    .build();
            //2、创建 RedisClient 客户端
            this.redisClient = RedisClient.create(uri);
    
            //创建连接池的配置对象
            //GenericObjectPoolConfig> conf = new GenericObjectPoolConfig>();
            var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();
            //设置连接池允许的最大连接数
            conf.setMaxTotal(20);
            //3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)
            pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);
        }
    
        //关闭资源
        public void closeResource()
        {
            //关闭连接池--先开后关
            this.pool.close();
            //关闭RedisClient 客户端------最先开的最后关
            this.redisClient.shutdown();
        }
    
    
        //订阅消息的方法
        public void subscribe() throws Exception
        {
    
            //从连接池中取出连接
            StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();
    
            //4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法
            RedisPubSubCommands cmd = conn.sync();
    
            //监听消息:消息到来时,是通过监听器来实现的
            conn.addListener(new RedisPubSubAdapter<>()
            {
                //接收来自普通的channel的消息,就用这个方法(就是没带模式的,比如那些主从、集群模式,点进RedisPubSubAdapter类里面看),
                @Override
                public void message(String channel, String message)
                {
                    System.err.printf("从 %s 收到消息 : %s\n " , channel , message);
                }
    
                //订阅普通channel激发的方法,
                @Override
                public void subscribed(String channel, long count)
                {
                    System.err.println("完成订阅 :" + count);
                }
    
                //不订阅普通的channel所使用方法
                @Override
                public void unsubscribed(String channel, long count)
                {
                    System.err.println("取消订阅");
                }
            });
    
            //订阅消息------订阅了 c2 这个主题 channel
            cmd.subscribe( "c2");
        }
    
        public static void main(String[] args) throws Exception
        {
            Subscriper2 subscriper2 = new Subscriper2();
            subscriper2.init();
            subscriper2.subscribe();
            //改程序只订阅了60分钟,超过60分钟就程序就退出不订阅了
            Thread.sleep(600000);
            //关闭资源
            subscriper2.closeResource();
        }
    
    
    }
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110

    Publisher

    package cn.ljh.app;
    
    
    import io.lettuce.core.RedisClient;
    import io.lettuce.core.RedisURI;
    import io.lettuce.core.pubsub.RedisPubSubAdapter;
    import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
    import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
    import io.lettuce.core.support.ConnectionPoolSupport;
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    import java.time.Duration;
    
    //消息发布者
    
    //通过连接池操作redis数据库
    public class Publisher
    {
        private RedisClient redisClient;
        //连接池pool对象
        private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;
    
        public void init()
        {
            //1、定义RedisURI
            RedisURI uri = RedisURI.builder()
                    .withHost("127.0.0.1")
                    .withPort(6379)
                    //选择redis 16个数据库中的哪个数据库
                    .withDatabase(0)
                    .withPassword(new char[]{'1', '2', '3', '4', '5', '6'})
                    .withTimeout(Duration.ofMinutes(5))
                    .build();
            //2、创建 RedisClient 客户端
            this.redisClient = RedisClient.create(uri);
    
            //创建连接池的配置对象
            //GenericObjectPoolConfig> conf = new GenericObjectPoolConfig>();
            var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();
            //设置连接池允许的最大连接数
            conf.setMaxTotal(20);
            //3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)
            pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);
        }
    
        //关闭资源
        public void closeResource()
        {
            //关闭连接池--先开后关
            this.pool.close();
            //关闭RedisClient 客户端------最先开的最后关
            this.redisClient.shutdown();
        }
    
    
        //订阅消息的方法
        public void publish() throws Exception
        {
    
            //从连接池中取出连接
            StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();
    
            //4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法
            RedisPubSubCommands cmd = conn.sync();
    
            //向这两个channel主题各自发布了一条消息
            cmd.publish("c2","c2 c2 c2 这是一条来自 c2 这个channel 里面的消息");
            cmd.publish("c1","c1 c1 c1 这是一条来自 c1 这个channel 里面的消息");
    
    
            //关闭资源
            redisClient.shutdown();
    
        }
    
        //发送消息,消息发出去,程序就退出了
        public static void main(String[] args) throws Exception
        {
            Publisher subscriper2 = new Publisher();
            subscriper2.init();
            subscriper2.publish();
            subscriper2.closeResource();
        }
    
    
    }
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.ljh</groupId>
        <artifactId>Lettucepool</artifactId>
        <version>1.0.0</version>
    
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <!-- 引入 Lettuce 这个操作redis的框架的依赖 -->
            <dependency>
                <groupId>io.lettuce</groupId>
                <artifactId>lettuce-core</artifactId>
                <version>6.1.4.RELEASE</version>
            </dependency>
            <!-- 创建连接池对象的依赖 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.9.0</version>
            </dependency>
        </dependencies>
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
  • 相关阅读:
    小红书达人投放比例是多少合适?品牌方必看
    Lnmp架构之Redis服务
    【GCC编译优化系列】前后编译的两个版本固件bin大小不一样,怎么办?
    python+vue驾校驾驶理论考试模拟系统
    百度ueditor富文本插件插入视频问题汇总【必须收藏】
    leetcode448找到所有数组中消失的数字
    一文理解Flink 水位线(Flink Watermark)
    【好书分享第十一期】深入Rust标准库(文末送书)
    Matplotlib(五)matplotlib基础用法
    个人开源项目如何上传maven中央仓库
  • 原文地址:https://blog.csdn.net/weixin_44411039/article/details/133502697