• Redis——Lettuce连接redis集群


    Lettuce连接redis集群使用的都是集群专用类,像RedisClusterClient、StatefulRedisClusterConnection、RedisAdvancedClusterCommands、StatefulRedisClusterPubSubConnection等等;

    Lettuce对redis cluster的支持:

    • 支持所有Cluster命令;
    • 基于键哈希槽的路由节点;
    • 对集群命令高级抽象;
    • 在多个集群节点上执行命令;
    • 处理MOVED和ASK重定向;
    • 通过槽位和ip端口直接连接集群节点;
    • SSL和身份验证;
    • 定期和自适应集群拓扑更新;
    • 发布订阅;

    启动时只需至少一个可以连接的集群节点就可以,能够自动拓扑出集群全部节点;也可以使用ReadFrom设置读取数据来源,跟主从模式一样;

    虽然redis本身的多键命令要求key必须都在同一个槽位,但Lettuce对一部分命令多了优化,可以对多键命令进行跨槽位执行,通过将对不同槽位键的操作命令分解为多条命令,单个命令以fork/join方式并发运行,最后将结果合并返回;

    可以跨槽位的命令有:

    • DEL:删除键,返回删除数量;
    • EXISTS:统计跨槽位的存在的键的数量;
    • MGET:获取所有给定键的值,顺序按照键的顺序返回;
    • MSET:批量保存键值对,总是返回OK;
    • TOUCH:改变给定键的最后访问时间,返回改变的键的数量;
    • UNLINK:删除键并在另一个不同的线程中回收内存,返回删除数量;

    提供跨槽位命令的api:RedisAdvancedClusterCommands、RedisAdvancedClusterAsyncCommands、RedisAdvancedClusterReactiveCommands;

    可以在多个集群节点上执行的命令有:

    • CLIENT SETNAME:在所有已知的集群节点上设置客户端的名称,总是返回OK;
    • KEYS:返回所有master上存储的key;
    • DBSIZE:返回所有master上存储的key的数量;
    • FLUSHALL:清空master上的所有数据,总是返回OK;
    • FLUSHDB:清空master上的所有数据,总是返回OK;
    • RANDOMKEY:从随机master上返回随机的key;
    • SCAN:根据ReadFrom设置扫描整个集群的键空间;
    • SCRIPT FLUSH:从所有的集群节点脚本缓存中删除所有脚本;
    • SCRIPT LOAD:在所有的集群节点上加载lua脚本;
    • SCRIPT KILL:在所有集群节点上杀死脚本;(即使脚本没有运行调用也不会失败)
    • SHUTDOWN:将数据集同步保存到磁盘,然后关闭集群所有节点;

    关于发布订阅:

    普通用户空间的发布订阅,redis集群会发送到每个节点,发布者和订阅者不需要在同一个节点,普通订阅发布消息可以在集群拓扑改变时重新连接。对于键空间事件,只会发到自己的节点,不会扩散到其他节点,要订阅键空间事件可以去适当的多个节点上订阅,或者使用RedisClusterClient消息传播和NodeSelection API获得一个托管连接集合;

    注意:由于主从同步,键会被复制到多个从节点上,特别是键过期事件,会在主从节点上都产生过期事件,如果订阅从节点,可能会收到多条相同的过期事件;订阅是通过NodeSelection API或者单个节点调用subscribe(…)发出的,订阅对于新增的节点无效;

    测试Demo:(redis版本7.0.2,Lettuce版本6.1.8)

    集群节点:虚拟机 192.168.1.31,端口 9001-9006,集群节点已设置notify-keyspace-events AK;

    1. /**
    2. * 2022年6月23日上午9:41:47
    3. */
    4. package testlettuce;
    5. import java.time.Duration;
    6. import java.util.ArrayList;
    7. import java.util.HashMap;
    8. import java.util.List;
    9. import java.util.Map;
    10. import io.lettuce.core.ClientOptions.DisconnectedBehavior;
    11. import io.lettuce.core.KeyScanCursor;
    12. import io.lettuce.core.KeyValue;
    13. import io.lettuce.core.ReadFrom;
    14. import io.lettuce.core.RedisURI;
    15. import io.lettuce.core.ScanCursor;
    16. import io.lettuce.core.SocketOptions;
    17. import io.lettuce.core.SslOptions;
    18. import io.lettuce.core.TimeoutOptions;
    19. import io.lettuce.core.cluster.ClusterClientOptions;
    20. import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
    21. import io.lettuce.core.cluster.RedisClusterClient;
    22. import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
    23. import io.lettuce.core.cluster.api.sync.Executions;
    24. import io.lettuce.core.cluster.api.sync.NodeSelection;
    25. import io.lettuce.core.cluster.api.sync.NodeSelectionCommands;
    26. import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
    27. import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
    28. import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands;
    29. import io.lettuce.core.cluster.pubsub.api.async.PubSubAsyncNodeSelection;
    30. import io.lettuce.core.cluster.pubsub.api.reactive.RedisClusterPubSubReactiveCommands;
    31. import io.lettuce.core.protocol.DecodeBufferPolicies;
    32. import io.lettuce.core.protocol.ProtocolVersion;
    33. import io.lettuce.core.pubsub.RedisPubSubListener;
    34. import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
    35. /**
    36. * @author XWF
    37. *
    38. */
    39. public class TestLettuceCluster {
    40. /**
    41. * @param args
    42. */
    43. public static void main(String[] args) {
    44. List<RedisURI> nodeList = new ArrayList<>();
    45. nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9001).withAuthentication("default", "123456").build());
    46. nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9002).withAuthentication("default", "123456").build());
    47. nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9003).withAuthentication("default", "123456").build());
    48. nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9004).withAuthentication("default", "123456").build());
    49. nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9005).withAuthentication("default", "123456").build());
    50. nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9006).withAuthentication("default", "123456").build());
    51. RedisClusterClient clusterClient = RedisClusterClient.create(nodeList);
    52. ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
    53. .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(5L))//设置自适应拓扑刷新超时,每次超时刷新一次,默认30s;
    54. .closeStaleConnections(false)//刷新拓扑时是否关闭失效连接,默认true,isPeriodicRefreshEnabled()为true时生效;
    55. .dynamicRefreshSources(true)//从拓扑中发现新节点,并将新节点也作为拓扑的源节点,动态刷新可以发现全部节点并计算每个客户端的数量,设置false则只有初始节点为源和计算客户端数量;
    56. .enableAllAdaptiveRefreshTriggers()//启用全部触发器自适应刷新拓扑,默认关闭;
    57. .enablePeriodicRefresh(Duration.ofSeconds(5L))//开启定时拓扑刷新并设置周期;
    58. .refreshTriggersReconnectAttempts(3)//长连接重新连接尝试n次才拓扑刷新
    59. .build();
    60. ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
    61. .autoReconnect(true)//在连接丢失时开启或关闭自动重连,默认true;
    62. .cancelCommandsOnReconnectFailure(true)//允许在重连失败取消排队命令,默认false;
    63. .decodeBufferPolicy(DecodeBufferPolicies.always())//设置丢弃解码缓冲区的策略,以回收内存;always:解码后丢弃,最大内存效率;alwaysSome:解码后丢弃一部分;ratio(n)基于比率丢弃,n/(1+n),通常用1-10对应50%-90%;
    64. .disconnectedBehavior(DisconnectedBehavior.DEFAULT)//设置连接断开时命令的调用行为,默认启用重连;DEFAULT:启用时重连中接收命令,禁用时重连中拒绝命令;ACCEPT_COMMANDS:重连中接收命令;REJECT_COMMANDS:重连中拒绝命令;
    65. // .maxRedirects(5)//当键从一个节点迁移到另一个节点,集群重定向次数,默认5;
    66. // .nodeFilter(nodeFilter)//设置节点过滤器
    67. // .pingBeforeActivateConnection(true)//激活连接前设置PING,默认true;
    68. // .protocolVersion(ProtocolVersion.RESP3)//设置协议版本,默认RESP3;
    69. // .publishOnScheduler(false)//使用专用的调度器发出响应信号,默认false,启用时数据信号将使用服务的多线程发出;
    70. // .requestQueueSize(requestQueueSize)//设置每个连接请求队列大小;
    71. // .scriptCharset(scriptCharset)//设置Lua脚本编码为byte[]的字符集,默认StandardCharsets.UTF_8;
    72. // .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)).keepAlive(true).tcpNoDelay(true).build())//设置低级套接字的属性
    73. // .sslOptions(SslOptions.builder().build())//设置ssl属性
    74. // .suspendReconnectOnProtocolFailure(false)//当重新连接遇到协议失败时暂停重新连接(SSL验证,连接失败前PING),默认值为false;
    75. // .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(10)))//设置超时来取消和终止命令;
    76. .topologyRefreshOptions(clusterTopologyRefreshOptions)//设置拓扑更新设置
    77. .validateClusterNodeMembership(true)//在允许连接到集群节点之前,验证集群节点成员关系,默认值为true;
    78. .build();
    79. clusterClient.setDefaultTimeout(Duration.ofSeconds(5L));
    80. clusterClient.setOptions(clusterClientOptions);
    81. StatefulRedisClusterConnection<String, String> clusterConn = clusterClient.connect();
    82. clusterConn.setReadFrom(ReadFrom.ANY);//设置从哪些节点读取数据;
    83. RedisAdvancedClusterCommands<String, String> clusterCmd = clusterConn.sync();
    84. clusterCmd.set("a", "A");
    85. clusterCmd.set("b", "B");
    86. clusterCmd.set("c", "C");
    87. clusterCmd.set("d", "D");
    88. System.out.println("get a=" + clusterCmd.get("a"));
    89. System.out.println("get b=" + clusterCmd.get("b"));
    90. System.out.println("get c=" + clusterCmd.get("c"));
    91. System.out.println("get d=" + clusterCmd.get("d"));
    92. //跨槽位命令
    93. Map<String, String> kvmap = new HashMap<>();
    94. kvmap.put("a", "AA");
    95. kvmap.put("b", "BB");
    96. kvmap.put("c", "CC");
    97. kvmap.put("d", "DD");
    98. clusterCmd.mset(kvmap);//Lettuce做了优化,支持一些命令的跨槽位命令;
    99. System.out.println("Lettuce mget:" + clusterCmd.mget("a", "b", "c", "d"));
    100. //选定部分节点操作
    101. NodeSelection<String, String> replicas = clusterCmd.replicas();
    102. NodeSelectionCommands<String, String> replicaseCmd = replicas.commands();
    103. Executions<KeyScanCursor<String>> executions = replicaseCmd.scan(ScanCursor.INITIAL);
    104. executions.forEach(s -> {System.out.println(s.getKeys());});
    105. //订阅发布消息
    106. StatefulRedisClusterPubSubConnection<String, String> pubSubConn = clusterClient.connectPubSub();
    107. pubSubConn.addListener(new RedisPubSubListener<String, String>() {
    108. @Override
    109. public void message(String channel, String message) {
    110. System.out.println("[message]ch:" + channel + ",msg:" + message);
    111. }
    112. @Override
    113. public void message(String pattern, String channel, String message) {
    114. }
    115. @Override
    116. public void subscribed(String channel, long count) {
    117. System.out.println("[subscribed]ch:" + channel);
    118. }
    119. @Override
    120. public void psubscribed(String pattern, long count) {
    121. }
    122. @Override
    123. public void unsubscribed(String channel, long count) {
    124. }
    125. @Override
    126. public void punsubscribed(String pattern, long count) {
    127. }
    128. });
    129. pubSubConn.sync().subscribe("TEST_Ch");//(回调内部使用阻塞调用或者lettuce同步api调用,需使用异步订阅)
    130. clusterCmd.publish("TEST_Ch", "MSGMSGMSG");
    131. //响应式订阅,可以监听ChannelMessage和PatternMessage,使用链式过滤处理计算等操作
    132. RedisClusterPubSubReactiveCommands<String, String> pubsubReactive = pubSubConn.reactive();
    133. pubsubReactive.subscribe("TEST_Ch2").subscribe();
    134. pubsubReactive.observeChannels()
    135. .filter(chmsg -> {return chmsg.getMessage().contains("tom");})
    136. .doOnNext(chmsg -> {System.out.println("<tom>" + chmsg.getChannel() + ">>" + chmsg.getMessage());})
    137. .subscribe();
    138. clusterCmd.publish("TEST_Ch2", "send to jerry");
    139. clusterCmd.publish("TEST_Ch", "tom MSG");
    140. clusterCmd.publish("TEST_Ch2", "this is tom");
    141. //keySpaceEvent事件
    142. StatefulRedisClusterPubSubConnection<String, String> clusterPubSubConn = clusterClient.connectPubSub();
    143. clusterPubSubConn.setNodeMessagePropagation(true);//启用禁用节点消息传播到该listener,例如只能在本节点通知的键事件通知;
    144. RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() {
    145. @Override
    146. public void unsubscribed(String channel, long count) {
    147. System.out.println("unsubscribed_ch:" + channel);
    148. }
    149. @Override
    150. public void subscribed(String channel, long count) {
    151. System.out.println("subscribed_ch:" + channel);
    152. }
    153. @Override
    154. public void punsubscribed(String pattern, long count) {
    155. System.out.println("punsubscribed_pattern:" + pattern);
    156. }
    157. @Override
    158. public void psubscribed(String pattern, long count) {
    159. System.out.println("psubscribed_pattern:" + pattern);
    160. }
    161. @Override
    162. public void message(String pattern, String channel, String message) {
    163. System.out.println("message_pattern:" + pattern + " ch:" + channel + " msg:" + message);
    164. }
    165. @Override
    166. public void message(String channel, String message) {
    167. System.out.println("message_ch:" + channel + " msg:" + message);
    168. }
    169. };
    170. clusterPubSubConn.addListener(listener);
    171. PubSubAsyncNodeSelection<String, String> allPubSubAsyncNodeSelection = clusterPubSubConn.async().all();
    172. NodeSelectionPubSubAsyncCommands<String, String> pubsubAsyncCmd = allPubSubAsyncNodeSelection.commands();
    173. clusterCmd.setex("a", 1, "A");
    174. pubsubAsyncCmd.psubscribe("__keyspace@0__:*");
    175. try {
    176. Thread.sleep(3000);
    177. } catch (InterruptedException e) {
    178. e.printStackTrace();
    179. }
    180. System.out.println("end");
    181. }
    182. }

    运行结果:

  • 相关阅读:
    算法题练习——JS Node+python题解合并k个已排序的链表及链表的奇偶重排
    jQWidgets 15.0 for JavaScript Crack
    酷开科技生态内容价值+酷开系统的开放性和可塑性,实现品效合一
    asp.net售后维修管理系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio
    ChatGPT 网站合集/NovelAI tag生成器/Novel资源大全
    美图SaaS、有赞美业,求同存异的差异化角逐
    Java开发过程中常用Linux命令总结
    物联网低代码平台选型六大标准,赶快学起来
    UWB室内定位系统全套源码 高精度人员定位系统源码
    【go】异步任务解决方案Asynq实战
  • 原文地址:https://blog.csdn.net/FlyLikeButterfly/article/details/125431065