• 详解Redis之Lettuce实战


    摘要

        Redis 的一款高级 Java 客户端,已成为 SpringBoot 2.0 版本默认的 redis 客户端。Lettuce 后起之秀,不仅功能丰富,提供了很多新的功能特性,比如异步操作、响应式编程等,还解决了 Jedis 中线程不安全的问题。

    Lettuce

    1. <dependency>
    2. <groupId>io.lettuce</groupId>
    3. <artifactId>lettuce-core</artifactId>
    4. <version>6.2.5.RELEASE</version>
    5. </dependency>

    同步操作

    基本上 Jedis 支持的同步命令操作,Lettuce 都支持。

    Lettuce 的 api 操作如下!

    1. public class LettuceUtil {
    2. public static void main(String[] args) {
    3. RedisURI redisUri = RedisURI.builder()
    4. .withHost("127.0.0.1").withPort(6379).withPassword("111111")
    5. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    6. .build();
    7. RedisClient redisClient = RedisClient.create(redisUri);
    8. StatefulRedisConnection<String, String> connection = redisClient.connect();
    9. //获取同步操作命令工具
    10. RedisCommands<String, String> commands = connection.sync();
    11. System.out.println("清空数据:"+commands.flushdb());
    12. System.out.println("判断某个键是否存在:"+commands.exists("username"));
    13. System.out.println("新增<'username','xmr'>的键值对:"+commands.set("username", "xmr"));
    14. System.out.println("新增<'password','password'>的键值对:"+commands.set("password", "123"));
    15. System.out.println("获取<'password'>键的值:"+commands.get("password"));
    16. System.out.println("系统中所有的键如下:" + commands.keys("*"));
    17. System.out.println("删除键password:"+commands.del("password"));
    18. System.out.println("判断键password是否存在:"+commands.exists("password"));
    19. System.out.println("设置键username的过期时间为5s:"+commands.expire("username", 5L));
    20. System.out.println("查看键username的剩余生存时间:"+commands.ttl("username"));
    21. System.out.println("移除键username的生存时间:"+commands.persist("username"));
    22. System.out.println("查看键username的剩余生存时间:"+commands.ttl("username"));
    23. System.out.println("查看键username所存储的值的类型:"+commands.type("username"));
    24. connection.close();
    25. redisClient.shutdown();
    26. }
    27. }

    异步操作

    Lettuce 还支持异步操作,将上面的操作改成异步处理,结果如下!

    1. public class LettuceUtil {
    2. public static void main(String[] args) throws Exception {
    3. RedisURI redisUri = RedisURI.builder()
    4. .withHost("127.0.0.1").withPort(6379).withPassword("111111")
    5. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    6. .build();
    7. RedisClient redisClient = RedisClient.create(redisUri);
    8. StatefulRedisConnection<String, String> connection = redisClient.connect();
    9. //获取异步操作命令工具
    10. RedisAsyncCommands<String, String> commands = connection.async();
    11. System.out.println("清空数据:"+commands.flushdb().get());
    12. System.out.println("判断某个键是否存在:"+commands.exists("username").get());
    13. System.out.println("新增<'username','xmr'>的键值对:"+commands.set("username", "xmr").get());
    14. System.out.println("新增<'password','password'>的键值对:"+commands.set("password", "123").get());
    15. System.out.println("获取<'password'>键的值:"+commands.get("password").get());
    16. System.out.println("系统中所有的键如下:" + commands.keys("*").get());
    17. System.out.println("删除键password:"+commands.del("password").get());
    18. System.out.println("判断键password是否存在:"+commands.exists("password").get());
    19. System.out.println("设置键username的过期时间为5s:"+commands.expire("username", 5L).get());
    20. System.out.println("查看键username的剩余生存时间:"+commands.ttl("username").get());
    21. System.out.println("移除键username的生存时间:"+commands.persist("username").get());
    22. System.out.println("查看键username的剩余生存时间:"+commands.ttl("username").get());
    23. System.out.println("查看键username所存储的值的类型:"+commands.type("username").get());
    24. connection.close();
    25. redisClient.shutdown();
    26. }
    27. }

    响应式编程

            Lettuce 除了支持异步编程以外,还支持响应式编程,Lettuce 引入的响应式编程框架是Project Reactor,如果没有响应式编程经验可以先自行了解一下,访问地址https://projectreactor.io/

    响应式编程使用案例如下:

    1. public class LettuceUtil {
    2. public static void main(String[] args) throws Exception {
    3. RedisURI redisUri = RedisURI.builder()
    4. .withHost("127.0.0.1").withPort(6379).withPassword("111111")
    5. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    6. .build();
    7. RedisClient redisClient = RedisClient.create(redisUri);
    8. StatefulRedisConnection<String, String> connection = redisClient.connect();
    9. //获取响应式API操作命令工具
    10. RedisReactiveCommands<String, String> commands = connection.reactive();
    11. Mono<String> setc = commands.set("name", "mayun");
    12. System.out.println(setc.block());
    13. Mono<String> getc = commands.get("name");
    14. getc.subscribe(System.out::println);
    15. Flux<String> keys = commands.keys("*");
    16. keys.subscribe(System.out::println);
    17. //开启一个事务,先把count设置为1,再将count自增1
    18. commands.multi().doOnSuccess(r -> {
    19. commands.set("count", "1").doOnNext(value -> System.out.println("count1:" + value)).subscribe();
    20. commands.incr("count").doOnNext(value -> System.out.println("count2:" + value)).subscribe();
    21. }).flatMap(s -> commands.exec())
    22. .doOnNext(transactionResult -> System.out.println("transactionResult:" + transactionResult.wasDiscarded())).subscribe();
    23. Thread.sleep(1000 * 5);
    24. connection.close();
    25. redisClient.shutdown();
    26. }
    27. }

    发布和订阅

            Lettuce 还支持 redis 的消息发布和订阅,具体实现案例如下:

    1. public class LettuceUtil {
    2. public static void main(String[] args) throws Exception {
    3. RedisURI redisUri = RedisURI.builder()
    4. .withHost("127.0.0.1").withPort(6379).withPassword("111111")
    5. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    6. .build();
    7. RedisClient redisClient = RedisClient.create(redisUri);
    8. //获取发布订阅操作命令工具
    9. StatefulRedisPubSubConnection<String, String> pubsubConn = redisClient.connectPubSub();
    10. pubsubConn.addListener(new RedisPubSubListener<String, String>() {
    11. @Override
    12. public void unsubscribed(String channel, long count) {
    13. System.out.println("[unsubscribed]" + channel);
    14. }
    15. @Override
    16. public void subscribed(String channel, long count) {
    17. System.out.println("[subscribed]" + channel);
    18. }
    19. @Override
    20. public void punsubscribed(String pattern, long count) {
    21. System.out.println("[punsubscribed]" + pattern);
    22. }
    23. @Override
    24. public void psubscribed(String pattern, long count) {
    25. System.out.println("[psubscribed]" + pattern);
    26. }
    27. @Override
    28. public void message(String pattern, String channel, String message) {
    29. System.out.println("[message]" + pattern + " -> " + channel + " -> " + message);
    30. }
    31. @Override
    32. public void message(String channel, String message) {
    33. System.out.println("[message]" + channel + " -> " + message);
    34. }
    35. });
    36. RedisPubSubAsyncCommands<String, String> pubsubCmd = pubsubConn.async();
    37. pubsubCmd.psubscribe("CH");
    38. pubsubCmd.psubscribe("CH2");
    39. pubsubCmd.unsubscribe("CH");
    40. Thread.sleep(100 * 5);
    41. pubsubConn.close();
    42. redisClient.shutdown();
    43. }
    44. }

    客户端资源与参数配置

            Lettuce 客户端的通信框架集成了 Netty 的非阻塞 IO 操作,客户端资源的设置与 Lettuce 的性能、并发和事件处理紧密相关,如果不是特别熟悉客户端参数配置,不建议在没有经验的前提下凭直觉修改默认值,保持默认配置就行。

            非集群环境下,具体的配置案例如下:

    1. public class LettuceUtil {
    2. public static void main(String[] args) throws Exception {
    3. ClientResources resources = DefaultClientResources.builder()
    4. .ioThreadPoolSize(4) //I/O线程数
    5. .computationThreadPoolSize(4) //任务线程数
    6. .build();
    7. RedisURI redisUri = RedisURI.builder()
    8. .withHost("127.0.0.1").withPort(6379).withPassword("111111")
    9. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    10. .build();
    11. ClientOptions options = ClientOptions.builder()
    12. .autoReconnect(true)//是否自动重连
    13. .pingBeforeActivateConnection(true)//连接激活之前是否执行PING命令
    14. .build();
    15. RedisClient client = RedisClient.create(resources, redisUri);
    16. client.setOptions(options);
    17. StatefulRedisConnection<String, String> connection = client.connect();
    18. RedisCommands<String, String> commands = connection.sync();
    19. commands.set("name", "xxxx");
    20. System.out.println(commands.get("name"));
    21. connection.close();
    22. client.shutdown();
    23. resources.shutdown();
    24. }
    25. }

            集群环境下,具体的配置案例如下:

    1. public class LettuceMain {
    2. public static void main(String[] args) throws Exception {
    3. ClientResources resources = DefaultClientResources.builder()
    4. .ioThreadPoolSize(4) //I/O线程数
    5. .computationThreadPoolSize(4) //任务线程数
    6. .build();
    7. RedisURI redisUri = RedisURI.builder()
    8. .withHost("192.168.221.11").withPort(6379).withPassword("111111")
    9. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    10. .build();
    11. ClusterClientOptions options = ClusterClientOptions.builder()
    12. .autoReconnect(true)//是否自动重连
    13. .pingBeforeActivateConnection(true)//连接激活之前是否执行PING命令
    14. .validateClusterNodeMembership(true)//是否校验集群节点的成员关系
    15. .build();
    16. RedisClusterClient client = RedisClusterClient.create(resources, redisUri);
    17. client.setOptions(options);
    18. StatefulRedisClusterConnection<String, String> connection = client.connect();
    19. RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    20. commands.set("name", "goodss");
    21. System.out.println(commands.get("name"));
    22. connection.close();
    23. client.shutdown();
    24. resources.shutdown();
    25. }
    26. }

    线程池配置

            Lettuce 连接设计的时候,就是线程安全的,所以一个连接可以被多个线程共享,同时 lettuce 连接默认是自动重连的,使用单连接基本可以满足业务需求,大多数情况下不需要配置连接池,多连接并不会给操作带来性能上的提升。

    但在某些特殊场景下,比如事物操作,使用连接池会是一个比较好的方案,那么如何配置线程池呢?

    1. public class LettuceUtil {
    2. public static void main(String[] args) throws Exception {
    3. RedisURI redisUri = RedisURI.builder()
    4. .withHost("192.168.221.11")
    5. .withPort(6379)
    6. .withPassword("111111")
    7. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
    8. .build();
    9. RedisClient client = RedisClient.create(redisUri);
    10. //连接池配置
    11. GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    12. poolConfig.setMaxIdle(2);
    13. GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool(client::connect, poolConfig);
    14. StatefulRedisConnection<String, String> connection = pool.borrowObject();
    15. RedisCommands<String, String> commands = connection.sync();
    16. commands.set("name", "llkd");
    17. System.out.println(commands.get("name"));
    18. connection.close();
    19. pool.close();
    20. client.shutdown();
    21. }
    22. }

    主从模式配置

            redis 一般采用主从复制模式,搭建高可用的架构,简单的说就一个主节点,多个从节点,自动从主节点同步最新数据。

            Lettuce 支持自动发现主从模式下的节点信息,然后保存到本地,具体配置如下:

    1. public class LettuceUtil {
    2. public static void main(String[] args) throws Exception {
    3. //这里只需要配置一个节点的连接信息,不一定需要是主节点的信息,从节点也可以;可以自动发现主从节点
    4. RedisURI uri = RedisURI.builder().withHost("192.168.221.11").withPort(6379).withPassword("123456").build();
    5. RedisClient client = RedisClient.create(uri);
    6. StatefulRedisMasterReplicaConnection<String, String> connection = MasterReplica.connect(client, StringCodec.UTF8, uri);
    7. //从节点读取数据
    8. connection.setReadFrom(ReadFrom.REPLICA);
    9. RedisCommands<String, String> commands = connection.sync();
    10. commands.set("name", "张飞");
    11. System.out.println(commands.get("name"));
    12. connection.close();
    13. client.shutdown();
    14. }
    15. }

            当然我们也可以手动指定集群节点来加载,具体配置如下:

    1. public class LettuceMain {
    2. public static void main(String[] args) throws Exception {
    3. //集群节点
    4. List<RedisURI> uris = new ArrayList();
    5. uris.add(RedisURI.builder().withHost("192.168.221.14").withPort(7000).withPassword("123456").build());
    6. uris.add(RedisURI.builder().withHost("192.168.221.15").withPort(7000).withPassword("123456").build());
    7. uris.add(RedisURI.builder().withHost("192.168.221.16").withPort(7001).withPassword("123456").build());
    8. RedisClient client = RedisClient.create();
    9. StatefulRedisMasterReplicaConnection<String, String> connection = MasterReplica.connect(client, StringCodec.UTF8, uris);
    10. //从节点读取数据
    11. connection.setReadFrom(ReadFrom.REPLICA);
    12. RedisCommands<String, String> commands = connection.sync();
    13. commands.set("name", "张飞");
    14. System.out.println(commands.get("name"));
    15. connection.close();
    16. client.shutdown();
    17. }
    18. }

    哨兵模式配置

            哨兵模式,也是 redis 实现服务高可用的一大亮点,具体配置实现如下:

    1. public class LettuceUtil {
    2. public static void main(String[] args) throws Exception {
    3. //集群节点
    4. List<RedisURI> uris = new ArrayList();
    5. uris.add(RedisURI.builder().withHost("192.168.221.11").withPort(7000).withPassword("123456").build());
    6. uris.add(RedisURI.builder().withHost("192.168.221.12").withPort(7000).withPassword("123456").build());
    7. uris.add(RedisURI.builder().withHost("192.168.221.13").withPort(7000).withPassword("123456").build());
    8. RedisClient client = RedisClient.create();
    9. StatefulRedisMasterReplicaConnection<String, String> connection = MasterReplica.connect(client, StringCodec.UTF8, uris);
    10. //从节点读取数据
    11. connection.setReadFrom(ReadFrom.REPLICA);
    12. RedisCommands<String, String> commands = connection.sync();
    13. commands.set("name", "dddown");
    14. System.out.println(commands.get("name"));
    15. connection.close();
    16. client.shutdown();
    17. }
    18. }

    Cluster 集群模式配置

            Cluster 集群模式,是之后推出的一种高可用的架构模型,主要是采用分片方式来存储数据,具体配置如下:

    1. public class LettuceUtil {
    2. public static void main(String[] args) throws Exception {
    3. Set<RedisURI> uris = new HashSet<>();
    4. uris.add(RedisURI.builder().withHost("192.168.221.11").withPort(7000).withPassword("123456").build());
    5. uris.add(RedisURI.builder().withHost("192.168.221.12").withPort(7000).withPassword("123456").build());
    6. uris.add(RedisURI.builder().withHost("192.168.221.13").withPort(7000).withPassword("123456").build());
    7. uris.add(RedisURI.builder().withHost("192.168.221.14").withPort(7000).withPassword("123456").build());
    8. uris.add(RedisURI.builder().withHost("192.168.221.15").withPort(7000).withPassword("123456").build());
    9. uris.add(RedisURI.builder().withHost("192.168.221.16").withPort(7001).withPassword("123456").build());
    10. RedisClusterClient client = RedisClusterClient.create(uris);
    11. StatefulRedisClusterConnection<String, String> connection = client.connect();
    12. RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    13. commands.set("name", "uuup");
    14. System.out.println(commands.get("name"));
    15. //选择从节点,只读
    16. NodeSelection<String, String> replicas = commands.replicas();
    17. NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands();
    18. Executions<List<String>> keys = nodeSelectionCommands.keys("*");
    19. keys.forEach(key -> System.out.println(key));
    20. connection.close();
    21. client.shutdown();
    22. }
    23. }

    小结

            Lettuce 作为 Jedis 客户端,功能更加强大,不仅解决了线程安全的问题,还支持异步和响应式编程,支持集群,Sentinel,管道和编码器等等功能。


    1. 🌹 以上分享 Redis Lettuce客户端使用经验,请指教🤝。
    2. 🌹🌹 如你对技术也感兴趣,欢迎交流。
    3. 🌹🌹🌹 如有需要,请👍点赞💖收藏🐱‍🏍分享

  • 相关阅读:
    (0)调优
    【ESP 保姆级教程】疯狂Node.js服务器篇 ——配置自动重启插件 nodemon,保存触发重启
    雷达传感器感应模块,人体存在感控方案,助力产品智能化触发联动
    OceanBase:中国场景推动树立分布式数据库四项新标准
    记录一次腾讯测试开发工程师自动化接口测试实践经验
    【数据结构与算法分析】0基础带你学数据结构与算法分析09--线索二叉树 (TBT)
    GBase 8c V3.0.0数据类型——系统表信息函数
    因果推断概述
    【毕业季·进击的技术er】青春不散场
    操作系统学习笔记7 | 进程同步与合作
  • 原文地址:https://blog.csdn.net/qq_32662595/article/details/132739306