• Java通过Lettuce访问Redis主从,哨兵,集群


    操作

    • 首先需要maven导入依赖

    1. <dependency>
    2. <groupId>io.lettucegroupId>
    3. <artifactId>lettuce-coreartifactId>
    4. <version>6.3.0.RELEASEversion>
    5. dependency>
    • 测试连接

    1. public class LettuceDemo {
    2. public static void main(String[] args) {
    3. RedisURI redisUri = RedisURI.builder()
    4. .withHost("127.0.0.1")
    5. .withPort(6379)
    6. .withPassword("123456")
    7. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    8. .build();
    9. RedisClient redisClient = RedisClient.create(redisUri);
    10. StatefulRedisConnection connection = redisClient.connect();
    11. RedisCommands commands = connection.sync();
    12. System.out.println(commands.ping());
    13. connection.close();
    14. redisClient.shutdown();
    15. }
    16. }
    • 同步操作

    1. public class LettuceSyncDemo {
    2. public static void main(String[] args) {
    3. RedisURI redisUri = RedisURI.builder()
    4. .withHost("127.0.0.1").withPort(6379).withPassword("123456")
    5. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    6. .build();
    7. RedisClient redisClient = RedisClient.create(redisUri);
    8. StatefulRedisConnection connection = redisClient.connect();
    9. //获取同步操作命令工具
    10. RedisCommands commands = connection.sync();
    11. System.out.println("清空数据:"+commands.flushdb());
    12. System.out.println("判断某个键是否存在:"+commands.exists("username"));
    13. System.out.println("新增<'username','xmr'>的键值对:"+commands.set("username", "xmr"));
    14. System.out.println("新增<'password','password'>的键值对:"+commands.set("password", "123"));
    15. System.out.println("获取<'password'>键的值:"+commands.get("password"));
    16. System.out.println("系统中所有的键如下:" + commands.keys("*"));
    17. System.out.println("删除键password:"+commands.del("password"));
    18. System.out.println("判断键password是否存在:"+commands.exists("password"));
    19. System.out.println("设置键username的过期时间为5s:"+commands.expire("username", 5L));
    20. System.out.println("查看键username的剩余生存时间:"+commands.ttl("username"));
    21. System.out.println("移除键username的生存时间:"+commands.persist("username"));
    22. System.out.println("查看键username的剩余生存时间:"+commands.ttl("username"));
    23. System.out.println("查看键username所存储的值的类型:"+commands.type("username"));
    24. connection.close();
    25. redisClient.shutdown();
    26. }
    27. }
    • 异步操作

    1. public class LettuceASyncDemo {
    2. public static void main(String[] args) throws Exception {
    3. RedisURI redisUri = RedisURI.builder()
    4. .withHost("127.0.0.1").withPort(6379).withPassword("123456")
    5. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    6. .build();
    7. RedisClient redisClient = RedisClient.create(redisUri);
    8. StatefulRedisConnection connection = redisClient.connect();
    9. //获取异步操作命令工具
    10. RedisAsyncCommands commands = connection.async();
    11. System.out.println("清空数据:"+commands.flushdb().get());
    12. System.out.println("判断某个键是否存在:"+commands.exists("username").get());
    13. System.out.println("新增<'username','xmr'>的键值对:"+commands.set("username", "xmr").get());
    14. System.out.println("新增<'password','password'>的键值对:"+commands.set("password", "123").get());
    15. System.out.println("获取<'password'>键的值:"+commands.get("password").get());
    16. System.out.println("系统中所有的键如下:" + commands.keys("*").get());
    17. System.out.println("删除键password:"+commands.del("password").get());
    18. System.out.println("判断键password是否存在:"+commands.exists("password").get());
    19. System.out.println("设置键username的过期时间为5s:"+commands.expire("username", 5L).get());
    20. System.out.println("查看键username的剩余生存时间:"+commands.ttl("username").get());
    21. System.out.println("移除键username的生存时间:"+commands.persist("username").get());
    22. System.out.println("查看键username的剩余生存时间:"+commands.ttl("username").get());
    23. System.out.println("查看键username所存储的值的类型:"+commands.type("username").get());
    24. connection.close();
    25. redisClient.shutdown();
    26. }
    27. }

            Lettuce 除了支持异步编程以外,还支持响应式编程,Lettuce 引入的响应式编程框架是Project Reactor,框架官网:https://projectreactor.io/

    1. public class LettuceReactorDemo {
    2. public static void main(String[] args) throws Exception {
    3. RedisURI redisUri = RedisURI.builder()
    4. .withHost("127.0.0.1").withPort(6379).withPassword("123456")
    5. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    6. .build();
    7. RedisClient redisClient = RedisClient.create(redisUri);
    8. StatefulRedisConnection connection = redisClient.connect();
    9. //获取响应式API操作命令工具
    10. RedisReactiveCommands commands = connection.reactive();
    11. Mono setc = commands.set("name", "mayun");
    12. System.out.println(setc.block());
    13. Mono getc = commands.get("name");
    14. getc.subscribe(System.out::println);
    15. Flux keys = commands.keys("*");
    16. keys.subscribe(System.out::println);
    17. //开启一个事务,先把count设置为1,再将count自增1
    18. commands.multi().doOnSuccess(r -> {
    19. commands.set("count", "1").doOnNext(value -> System.out.println("count1:" + value)).subscribe();
    20. commands.incr("count").doOnNext(value -> System.out.println("count2:" + value)).subscribe();
    21. }).flatMap(s -> commands.exec())
    22. .doOnNext(transactionResult -> System.out.println("transactionResult:" + transactionResult.wasDiscarded())).subscribe();
    23. Thread.sleep(1000 * 5);
    24. connection.close();
    25. redisClient.shutdown();
    26. }
    27. }
    • 发布和订阅

    1. public class LettuceReactiveDemo {
    2. public static void main(String[] args) throws Exception {
    3. RedisURI redisUri = RedisURI.builder()
    4. .withHost("127.0.0.1").withPort(6379).withPassword("123456")
    5. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    6. .build();
    7. RedisClient redisClient = RedisClient.create(redisUri);
    8. //获取发布订阅操作命令工具
    9. StatefulRedisPubSubConnection pubsubConn = redisClient.connectPubSub();
    10. pubsubConn.addListener(new RedisPubSubListener() {
    11. @Override
    12. public void unsubscribed(String channel, long count) {
    13. System.out.println("[unsubscribed]" + channel);
    14. }
    15. @Override
    16. public void subscribed(String channel, long count) {
    17. System.out.println("[subscribed]" + channel);
    18. }
    19. @Override
    20. public void punsubscribed(String pattern, long count) {
    21. System.out.println("[punsubscribed]" + pattern);
    22. }
    23. @Override
    24. public void psubscribed(String pattern, long count) {
    25. System.out.println("[psubscribed]" + pattern);
    26. }
    27. @Override
    28. public void message(String pattern, String channel, String message) {
    29. System.out.println("[message]" + pattern + " -> " + channel + " -> " + message);
    30. }
    31. @Override
    32. public void message(String channel, String message) {
    33. System.out.println("[message]" + channel + " -> " + message);
    34. }
    35. });
    36. RedisPubSubAsyncCommands pubsubCmd = pubsubConn.async();
    37. pubsubCmd.psubscribe("CH");
    38. pubsubCmd.psubscribe("CH2");
    39. pubsubCmd.unsubscribe("CH");
    40. Thread.sleep(100 * 5);
    41. pubsubConn.close();
    42. redisClient.shutdown();
    43. }
    44. }
    • 客户端资源与参数配置

            Lettuce 客户端的通信框架集成了 Netty 的非阻塞 IO 操作,客户端资源的设置与 Lettuce 的性能、并发和事件处理紧密相关,如果不熟悉客户端参数配置,不推荐修改默认值,保持默认配置即可。

            - 非集群模式

    1. public class LettuceDemo {
    2. public static void main(String[] args) throws Exception {
    3. ClientResources resources = DefaultClientResources.builder()
    4. .ioThreadPoolSize(4) //I/O线程数
    5. .computationThreadPoolSize(4) //任务线程数
    6. .build();
    7. RedisURI redisUri = RedisURI.builder()
    8. .withHost("127.0.0.1").withPort(6379).withPassword("123456")
    9. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    10. .build();
    11. ClientOptions options = ClientOptions.builder()
    12. .autoReconnect(true)//是否自动重连
    13. .pingBeforeActivateConnection(true)//连接激活之前是否执行PING命令
    14. .build();
    15. RedisClient client = RedisClient.create(resources, redisUri);
    16. client.setOptions(options);
    17. StatefulRedisConnection connection = client.connect();
    18. RedisCommands commands = connection.sync();
    19. commands.set("name", "关羽");
    20. System.out.println(commands.get("name"));
    21. connection.close();
    22. client.shutdown();
    23. resources.shutdown();
    24. }
    25. }

            - 集群模式

    1. public class LettuceDemo {
    2. public static void main(String[] args) throws Exception {
    3. ClientResources resources = DefaultClientResources.builder()
    4. .ioThreadPoolSize(4) //I/O线程数
    5. .computationThreadPoolSize(4) //任务线程数
    6. .build();
    7. RedisURI redisUri = RedisURI.builder()
    8. .withHost("127.0.0.1").withPort(6379).withPassword("123456")
    9. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    10. .build();
    11. ClusterClientOptions options = ClusterClientOptions.builder()
    12. .autoReconnect(true)//是否自动重连
    13. .pingBeforeActivateConnection(true)//连接激活之前是否执行PING命令
    14. .validateClusterNodeMembership(true)//是否校验集群节点的成员关系
    15. .build();
    16. RedisClusterClient client = RedisClusterClient.create(resources, redisUri);
    17. client.setOptions(options);
    18. StatefulRedisClusterConnection connection = client.connect();
    19. RedisAdvancedClusterCommands commands = connection.sync();
    20. commands.set("name", "张飞");
    21. System.out.println(commands.get("name"));
    22. connection.close();
    23. client.shutdown();
    24. resources.shutdown();
    25. }
    26. }
    • 线程池配置

            Lettuce 连接设计的时候,就是线程安全的,所以一个连接可以被多个线程共享,同时 lettuce 连接默认是自动重连的,使用单连接基本可以满足业务需求,大多数情况下不需要配置连接池,多连接并不会给操作带来性能上的提升。

    但在某些特殊场景下,比如事物操作,使用连接池会是一个比较好的方案,配置方法如下

    1. public class LettuceDemo {
    2. public static void main(String[] args) throws Exception {
    3. RedisURI redisUri = RedisURI.builder()
    4. .withHost("127.0.0.1")
    5. .withPort(6379)
    6. .withPassword("123456")
    7. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    8. .build();
    9. RedisClient client = RedisClient.create(redisUri);
    10. //连接池配置
    11. GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    12. poolConfig.setMaxIdle(2);
    13. GenericObjectPool> pool = ConnectionPoolSupport.createGenericObjectPool(client::connect, poolConfig);
    14. StatefulRedisConnection connection = pool.borrowObject();
    15. RedisCommands commands = connection.sync();
    16. commands.set("name", "张飞");
    17. System.out.println(commands.get("name"));
    18. connection.close();
    19. pool.close();
    20. client.shutdown();
    21. }
    22. }
    • 主从模式

           Redis 一般采用主从复制模式,搭建高可用的架构,简单的说就一个主节点,多个从节点,自动从主节点同步最新数据。

             - 自动发现主从模式下所有节点

    1. public class LettuceDemo {
    2. public static void main(String[] args) throws Exception {
    3. //这里只需要配置一个节点的连接信息,不一定需要是主节点的信息,从节点也可以;可以自动发现主从节点
    4. RedisURI uri = RedisURI.builder().withHost("192.168.31.111").withPort(6379).withPassword("123456").build();
    5. RedisClient client = RedisClient.create(uri);
    6. StatefulRedisMasterReplicaConnection connection = MasterReplica.connect(client, StringCodec.UTF8, uri);
    7. //从节点读取数据
    8. connection.setReadFrom(ReadFrom.REPLICA);
    9. RedisCommands commands = connection.sync();
    10. commands.set("name", "张飞");
    11. System.out.println(commands.get("name"));
    12. connection.close();
    13. client.shutdown();
    14. }
    15. }

             - 手动指定主从模式下集群节点

    1. public class LettuceDemo {
    2. public static void main(String[] args) throws Exception {
    3. //集群节点
    4. List uris = new ArrayList();
    5. uris.add(RedisURI.builder().withHost("192.168.79.135").withPort(6379).withPassword("123456").build());
    6. uris.add(RedisURI.builder().withHost("192.168.79.136").withPort(6379).withPassword("123456").build());
    7. uris.add(RedisURI.builder().withHost("192.168.79.137").withPort(6379).withPassword("123456").build());
    8. RedisClient client = RedisClient.create();
    9. StatefulRedisMasterReplicaConnection connection = MasterReplica.connect(client, StringCodec.UTF8, uris);
    10. //从节点读取数据
    11. connection.setReadFrom(ReadFrom.REPLICA);
    12. RedisCommands commands = connection.sync();
    13. commands.set("name", "张飞");
    14. System.out.println(commands.get("name"));
    15. connection.close();
    16. client.shutdown();
    17. }
    18. }
    • 哨兵模式

            哨兵模式可以实现当主节点下线时,通过选举的方式将从节点升级为主节点

    1. public class LettuceDemo {
    2. public static void main(String[] args) throws Exception {
    3. //集群节点
    4. List uris = new ArrayList();
    5. uris.add(RedisURI.builder().withSentinel("192.168.79.135", 26379).withSentinelMasterId("mymaster").withPassword("123456").build());
    6. uris.add(RedisURI.builder().withSentinel("192.168.79.135", 26379).withSentinelMasterId("mymaster").withPassword("123456").build());
    7. uris.add(RedisURI.builder().withSentinel("192.168.79.135", 26379).withSentinelMasterId("mymaster").withPassword("123456").build());
    8. RedisClient client = RedisClient.create();
    9. StatefulRedisMasterReplicaConnection connection = MasterReplica.connect(client, StringCodec.UTF8, uris);
    10. //从节点读取数据
    11. connection.setReadFrom(ReadFrom.REPLICA);
    12. RedisCommands commands = connection.sync();
    13. commands.set("name", "赵云");
    14. System.out.println(commands.get("name"));
    15. connection.close();
    16. client.shutdown();
    17. }
    18. }
    • Cluster 集群模式

            基于主从模式和哨兵模式的理念,推出的高可用的架构模型,主要是采用分片方式来存储数据

    1. public class LettuceReactiveDemo {
    2. public static void main(String[] args) throws Exception {
    3. Set uris = new HashSet<>();
    4. uris.add(RedisURI.builder().withHost("192.168.79.135").withPort(5001).withPassword("123456").build());
    5. uris.add(RedisURI.builder().withHost("192.168.79.135").withPort(5002).withPassword("123456").build());
    6. uris.add(RedisURI.builder().withHost("192.168.79.135").withPort(5003).withPassword("123456").build());
    7. uris.add(RedisURI.builder().withHost("192.168.79.136").withPort(5001).withPassword("123456").build());
    8. uris.add(RedisURI.builder().withHost("192.168.79.136").withPort(5002).withPassword("123456").build());
    9. uris.add(RedisURI.builder().withHost("192.168.79.136").withPort(5003).withPassword("123456").build());
    10. RedisClusterClient client = RedisClusterClient.create(uris);
    11. StatefulRedisClusterConnection connection = client.connect();
    12. RedisAdvancedClusterCommands commands = connection.sync();
    13. commands.set("name", "关羽");
    14. System.out.println(commands.get("name"));
    15. //选择从节点,只读
    16. NodeSelection replicas = commands.replicas();
    17. NodeSelectionCommands nodeSelectionCommands = replicas.commands();
    18. Executions> keys = nodeSelectionCommands.keys("*");
    19. keys.forEach(key -> System.out.println(key));
    20. connection.close();
    21. client.shutdown();
    22. }
    23. }

    小结

            Lettuce 相比老牌的 Jedis 客户端,功能更加强大,不仅解决了线程安全的问题,还支持异步和响应式编程,支持集群,Sentinel,管道和编码器等等功能。

    了解更多的信息,可以访问官网地址:https://lettuce.io/

    参考:

    【进阶篇】Redis实战之Lettuce使用技巧详解 - 知乎

  • 相关阅读:
    毕业季,作为程序员(it软件开发工程师),如何培养强大的解决问题的能力
    氨丙基表面修饰二氧化硅亚微米微球/二氧化硅微球表面负载硫化亚铁纳米晶
    Linux基本指令系列第三篇
    【问题思考总结】解方程的时候什么时候可以消去方程?如何保证不丢解?
    EKF例程 matlab
    公开可用的API 合集
    HDFS 伪分布式环境搭建
    ICCV2021|你以为这是一个填色模型?其实我是检索模型!
    MATLAB——一维离散小波的单层分解
    linux+python3.6.8+uwsgi+postgresql+django部署web服务器
  • 原文地址:https://blog.csdn.net/ymq267/article/details/134464769