• Zookeeper


    目录

    zk安装

    (55条消息) Zookeeper安装与配置_Murmure__的博客-CSDN博客 

    介绍

     Zookeeper的数据模型

    节点四大类:

    服务端常见命令操作 

     客户端命令操作

     1.在bin目录在执行这个工具(连接server服务端口)开始客户端

     2.常见增删改查命令

    3.对节点的CRUD

    4.查看节点信息

      Java API的操作

    1.建立连接

    Watch事件的监听

     我们对于Zookeeper的服务端事件监听也是用的Curator来实现的——>引入了Cache

    分布式锁

     Zookeeper分布式锁原理

     Curator实现分布锁API

     分布式锁12306售票

    Zookeeper集群搭建


    zk安装

    (55条消息) Zookeeper安装与配置_Murmure__的博客-CSDN博客 

    介绍

     简介:Zookeeper是一个分布式开源的协调服务(感觉有点像nacos)

     功能:配置管理,分布式锁,集群管理

    配置管理:比如三个服务节点都用同一个配置文件,一个个去配置就很麻烦,我们直接在配置中心配置,abc节点连接即可(Nacos一站式解决)

    (51条消息) Nacos注册中心_Fairy要carry的博客-CSDN博客_nacos注册中心

    (51条消息) nacos实战项目中的配置_Fairy要carry的博客-CSDN博客_nacos配置有哪些

    (51条消息) 面试-SpringCloud常见组件和注册表结构+nacos_Fairy要carry的博客-CSDN博客

    分布式锁:

    主要是解决数据一致性,比如我们的seata就是解决分布式情况下数据一致性问题

    (51条消息) 缓存同步-Canal_Fairy要carry的博客-CSDN博客_canal同步缓存(51条消息) Seata的几种事务模式_Fairy要carry的博客-CSDN博客_seata 模式(51条消息) 缓存同步-Canal_Fairy要carry的博客-CSDN博客_canal同步缓存

    当多个节点对数据进行更改,你加synchornized这种锁只是在这个当前节点修改,比较JVM不一样了,这时候就要引入我们的分布式锁去保证数据的一致性,话说seata还跟这个有点不一样,它主要是应用在数据库,分布式下,各个服务数据库不一样,要保证数据一致性(引入了XA,AT,TCC等模式)

     集群管理

    就是类似于注册中心,拉取服务用的

     Zookeeper的数据模型

     节点也可以有子节点,允许少量的数据(1MB)存储在该节点下

    节点四大类:

    1.PERSISENT: 持久化节点——>就是宕机客户端与服务端断开再连时,数据还是保持一致性的

    2.EPHEMERAL:临时节点——>断开之后,数据节点就没了,相当于一次会话

    服务端常见命令操作 

    1. #在bin目录下
    2. ./zkServer.sh restart

     

     客户端命令操作

    客户端工具zkCli.sh

     1.在bin目录在执行这个工具(连接server服务端口)开始客户端

    ./zkCli.sh -server localhost:2181

     2.常见增删改查命令

     

     

    3.对节点的CRUD

    创建临时节点

    create -e /test2
    

     创建顺序节点

    create -s /test2
    

    创建临时顺序节点

    create -es /test2
    

     

     发现临时节点消失了

    4.查看节点信息

    ls -s /Path

      Java API的操作

    1.建立连接

    1. /**
    2. * 1.建立连接
    3. */
    4. @Before
    5. public void testConnect() throws Exception {
    6. //1.1第一种方式
    7. // CuratorFrameworkFactory client = CuratorFrameworkFactory.class.getConstructor().newInstance();
    8. /**
    9. * 1.2创建一个新的客户端
    10. * 参数: connectString - 连接到服务器的列表
    11. * sessionTimeoutMs - 会话超时 ,多久没连就挂
    12. * connectionTimeoutMs - 连接超时 ,连了多久
    13. * retryPolicy -使用重试策略返回:客户端
    14. */
    15. RetryPolicy retry = new ExponentialBackoffRetry(3000, 10);//重试策略:3s重试1次,最多10次
    16. // client = CuratorFrameworkFactory.newClient(
    17. // "192.168.184.129:2181",
    18. // 60 * 1000,
    19. // 15 * 1000, retry);
    20. 开启连接
    21. // client.start();
    22. /**
    23. * 1.3第二种方式
    24. * namespace(“xxx”)相当于是一个命名空间,方便区分,在根目录前加前缀
    25. */
    26. client = CuratorFrameworkFactory.builder().connectString("192.168.184.129:2181")
    27. .sessionTimeoutMs(60 * 1000)
    28. .connectionTimeoutMs(15 * 1000)
    29. .retryPolicy(retry).namespace("itheima").build();
    30. client.start();
    31. }

     2.创建节点并且赋值

    1. /**
    2. * 2.创建节点:持久 临时 顺序 数据
    3. * 2.1基本创建
    4. * 2.2创建带有数据的
    5. * 2.3多级节点+设置节点类型
    6. */
    7. @Test
    8. public void testCreate() throws Exception {
    9. //1.基本创建持久化
    10. String path = client.create()
    11. .withMode(CreateMode.EPHEMERAL)
    12. .forPath("/app3", "hehe".getBytes(StandardCharsets.UTF_8));
    13. System.out.println(path);
    14. //2.让他不结束
    15. while (true) {
    16. }
    17. }

    3.创建子节点

    1. /**
    2. * 创建子节点
    3. *
    4. * @throws Exception
    5. */
    6. @Test
    7. public void testMulti() throws Exception {
    8. //creatingParentsIfNeeded():如果父节点不存在就进行创建
    9. String path = client.create()
    10. .creatingParentsIfNeeded()
    11. .forPath("/app4/p1");
    12. System.out.println(path);
    13. }

    4.查询节点数据

    1. /**
    2. * 查询节点数据
    3. */
    4. @Test
    5. public void testGet() throws Exception {
    6. byte[] bytes = client.getData().forPath("/app3");
    7. System.out.println(new String(bytes));
    8. }

    5.查询当前路径下所有子节点

    1. /**
    2. * 查询子节点
    3. * getChildren().forPath():得到目录下所有子节点
    4. * 当前命名空间下的子目录节点
    5. */
    6. @Test
    7. public void testChildren() throws Exception {
    8. List path = client.getChildren().forPath("/");
    9. System.out.println(path);
    10. }

     6.查询节点状态

    1. /**
    2. * 查询节点状态信息
    3. * new Stat()
    4. * client.getData():查询数据
    5. * .storingStateIn(stat).forPath():查询节点状态值,将状态值给到Stat
    6. */
    7. @Test
    8. public void testState() throws Exception {
    9. Stat stat = new Stat();
    10. System.out.println(stat);
    11. //查询节点状态信息ls -s
    12. client.getData().storingStatIn(stat).forPath("/app3");
    13. System.out.println(stat);
    14. }

    7.利用版本version修改节点数据

    1. /**
    2. * 根据版本判断修改节点状态值
    3. * setData().withVersion().forPath()
    4. * 如果版本不一致就不允许更改,保证了数据一致性,操作的原子性
    5. */
    6. @Test
    7. public void testSetForVersion() throws Exception {
    8. Stat stat = new Stat();
    9. //1.查询节点状态
    10. client.getData().storingStatIn(stat).forPath("/app3");
    11. //2.得到查询出来节点状态中的版本version
    12. int version = stat.getVersion();
    13. System.out.println("该节点数据的版本:"+version);
    14. client.setData().withVersion(version).forPath("/app3","newLife".getBytes(StandardCharsets.UTF_8));
    15. }

    8.删除节点

    1. /**
    2. * 4.删除节点
    3. * 1.删除单个节点 delete().forPath()
    4. * 2.删除带有子节点的节点 delete().deleteChildrenIfNeeded().forPath
    5. * 3.必须成功的删除 delete().guaranteed().forPath()
    6. * 4.回调 inBackground(new BackgroundCallBack()).forPath()
    7. *
    8. */
    9. @Test
    10. public void testDelete() throws Exception {
    11. //1.删除单个节点
    12. client.delete().forPath("/app3");
    13. }
    14. @Test
    15. public void testDeleteNeeded() throws Exception {
    16. //1.删除带有子节点的节点
    17. client.delete().deletingChildrenIfNeeded().forPath("/app4");
    18. }
    19. /**
    20. * 可能删除的时候(客户端和Server需要建立连接),可能出现耗时较长导致删除失败的情况
    21. * delete().guaranteed().forPath()
    22. */
    23. @Test
    24. public void testMustDelete() throws Exception {
    25. //1.必须成功删除
    26. client.delete().guaranteed().forPath("app2");
    27. }

    9.回调机制

    1. /**
    2. * 回调机制——>这里是删除后的回调
    3. * delete().guaranteed().inBackground(new BackgroundCallBack())
    4. * @throws Exception
    5. */
    6. @Test
    7. public void testBack() throws Exception {
    8. //1.调用回调方法inBackground->并且实现参数接口的回调机制processResult
    9. client.delete().guaranteed().inBackground((client, event) -> {
    10. System.out.println("当前节点被删除~");
    11. System.out.println(event);//2.打印节点信息——>被CuratorEvent捕捉
    12. }).forPath("/app2");
    13. }

    Watch事件的监听

    主要用于发布订阅,比如三个子节点订阅了APP1,当其中一个节点修改了APP1的数据时,会发布信息给到其他两个节点告诉数据的变化 

    我们对比一下Redis的发布订阅

    (53条消息) Redis配置文件+发布订阅+新数据类型_Fairy要carry的博客-CSDN博客

     我们对于Zookeeper的服务端事件监听也是用的Curator来实现的——>引入了Cache

    NodeCache:监听特定节点

    PathChildrenCache:监听某节点的子节点们

    TreeCache:监听整个树节点=NodeCache+PathChildrenCache

    1. package com.wyh.curator;
    2. import org.apache.curator.RetryPolicy;
    3. import org.apache.curator.framework.CuratorFramework;
    4. import org.apache.curator.framework.CuratorFrameworkFactory;
    5. import org.apache.curator.framework.recipes.cache.*;
    6. import org.apache.curator.retry.ExponentialBackoffRetry;
    7. import org.apache.zookeeper.CreateMode;
    8. import org.apache.zookeeper.data.Stat;
    9. import org.junit.After;
    10. import org.junit.Before;
    11. import org.junit.Test;
    12. import java.nio.charset.StandardCharsets;
    13. import java.util.List;
    14. /**
    15. * @author diao 2022/9/4
    16. */
    17. public class CuratorWatchTest {
    18. private CuratorFramework client;
    19. /**
    20. * 1.建立连接
    21. */
    22. @Before
    23. public void testConnect() throws Exception {
    24. RetryPolicy retry = new ExponentialBackoffRetry(3000, 10);//重试策略:3s重试1次,最多10次
    25. /**
    26. * 1.3第二种方式
    27. * namespace(“xxx”)相当于是一个命名空间,方便区分,在根目录前加前缀
    28. */
    29. client = CuratorFrameworkFactory.builder().connectString("192.168.184.129:2181")
    30. .sessionTimeoutMs(60 * 1000)
    31. .connectionTimeoutMs(15 * 1000)
    32. .retryPolicy(retry).namespace("itheima").build();
    33. client.start();
    34. }
    35. /**
    36. * 1.NodeCache监听当前节点
    37. * @throws Exception
    38. */
    39. @Test
    40. public void testNodeCache() throws Exception {
    41. NodeCache nodeCache = new NodeCache(client, "/app2");//1、监听的节点
    42. //2、注册监听
    43. nodeCache.getListenable().addListener(new NodeCacheListener() {
    44. @Override
    45. public void nodeChanged() throws Exception {
    46. System.out.println("节点发生变化");
    47. //2.1变化的结果
    48. byte[] data = nodeCache.getCurrentData().getData();
    49. System.out.println(new String(data));
    50. }
    51. });
    52. //3.开启监听
    53. nodeCache.start(true);
    54. //4.需要持续监听——>线程则需要一直存活
    55. while (true) {
    56. }
    57. }
    58. /**
    59. * PathNodeCache监听子节点
    60. * @throws Exception
    61. */
    62. @Test
    63. public void testPathChildrenCache() throws Exception {
    64. //1.创建监听对象
    65. PathChildrenCache childrenCache = new PathChildrenCache(client, "/app2", true);
    66. //2.绑定监听器
    67. childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    68. @Override
    69. public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
    70. System.out.println("子节点发生变化...");
    71. //2.1打印监听事件
    72. System.out.println(pathChildrenCacheEvent);
    73. //2.2判断监听事件类型判断哪些是不能监听的
    74. PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
    75. //2.3当监听到的事件是update时
    76. if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
    77. System.out.println("数据变了!!!");
    78. byte[] data = pathChildrenCacheEvent.getData().getData();
    79. System.out.println(new String(data));
    80. }
    81. }
    82. });
    83. //3.开启
    84. childrenCache.start();
    85. //4.保证持续监听
    86. while(true){
    87. }
    88. }
    89. /**
    90. * TreeNodeCache完成监听
    91. * 1.new TreeCache封装连接以及节点
    92. * 2.getListenable().addListener(new TreeCacheListener{
    93. * childEvent();
    94. * })
    95. * @throws Exception
    96. */
    97. @Test
    98. public void testTreeCache() throws Exception {
    99. //1.创建监听器
    100. TreeCache treeCache = new TreeCache(client, "/app2");
    101. //2.注册监听
    102. treeCache.getListenable().addListener(new TreeCacheListener() {
    103. @Override
    104. public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
    105. System.out.println("节点发生变化");
    106. System.out.println(treeCacheEvent);
    107. byte[] data = treeCacheEvent.getData().getData();
    108. System.out.println("改变后数据为:"+data);
    109. }
    110. });
    111. //3.启动
    112. treeCache.start();
    113. //4.持续监听
    114. while(true){
    115. }
    116. }
    117. /**
    118. * 3.释放资源
    119. */
    120. @After
    121. public void testClose() {
    122. if (client != null) client.close();
    123. }
    124. }

    分布式锁

     Zookeeper分布式锁原理

    1.首先先创建节点,比如三个client都要获取,那就在lock节点下创建三个节点(临时节点,保证宕机的时候删除)——>2.获取lock下的所有子节点,客户端getChildren返回所有子节点后比较大小,最小的先获取到锁,如果说用完之后,会将节点删除(类似于之前在finally中将锁释放一样),这里是client.close() ——>3.其他节点不是最小的那么就会创建一个事件监听器,监听比自己小的节点,监听删除事件——>4.当发现监听的节点被删除后,也就是锁释放了,那么此时再次判断是不是最小的,如果是,则获取锁,不是则重复以上操作

     Curator实现分布锁API

     分布式锁12306售票

    1.将锁加到12306上面 

     1.任务类

    1. package com.wyh.curator;
    2. import org.apache.curator.framework.CuratorFramework;
    3. import org.apache.curator.framework.CuratorFrameworkFactory;
    4. import org.apache.curator.framework.recipes.locks.InterProcessLock;
    5. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    6. import org.apache.curator.retry.ExponentialBackoffRetry;
    7. import java.util.concurrent.TimeUnit;
    8. /**
    9. * @author diao 2022/9/4
    10. */
    11. public class Ticket12306 implements Runnable {
    12. private volatile int tickets = 10;//数据库票数
    13. private InterProcessMutex lock;
    14. /**
    15. * 1.通过构造方法初始化lock
    16. */
    17. public Ticket12306() {
    18. ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 10);
    19. CuratorFramework client= CuratorFrameworkFactory.builder()
    20. .connectString("192.168.184.129:2181")
    21. .sessionTimeoutMs(60*1000)
    22. .connectionTimeoutMs(15*1000)
    23. .retryPolicy(retry).build();
    24. client.start();
    25. this.lock=new InterProcessMutex(client,"/lock");
    26. }
    27. /**
    28. * 2.线程执行方法
    29. */
    30. @Override
    31. public void run() {
    32. while (true) {
    33. //1.加锁
    34. try {
    35. lock.acquire(3, TimeUnit.SECONDS);//本质其实就是cas
    36. if (tickets > 0) {
    37. System.out.println(Thread.currentThread() + ":" + tickets);
    38. tickets--;
    39. }
    40. } catch (Exception e) {
    41. e.printStackTrace();
    42. }finally {
    43. try {
    44. //2.释放锁
    45. lock.release();
    46. } catch (Exception e) {
    47. e.printStackTrace();
    48. }
    49. }
    50. }
    51. }
    52. }

     2.主方法

    1. package com.wyh.curator;
    2. import org.apache.curator.framework.CuratorFramework;
    3. import org.apache.curator.retry.ExponentialBackoffRetry;
    4. import org.junit.Before;
    5. /**
    6. * @author diao 2022/9/4
    7. */
    8. public class LockTest {
    9. public static void main(String[] args) {
    10. Ticket12306 ticket12306 = new Ticket12306();
    11. //1.创建客户端
    12. Thread t1 = new Thread(ticket12306, "携程");
    13. Thread t2 = new Thread(ticket12306, "飞猪");
    14. t1.start();
    15. t2.start();
    16. }
    17. }

     

    Zookeeper集群搭建

    类似redis的分片集群,master互票 

    Redis分片集群_Fairy要carry的博客-CSDN博客_redis分片和集群

     这里弄个假的,多个端口,而不是多个ip

    1.创建三个单节点的zk

    (55条消息) Zookeeper安装与配置_Murmure__的博客-CSDN博客

     2.然后修改配置文件的端口,data,log路径

    3.分别记录id值(对于每个节点的id文件)

    4.让他们互相知道彼此(在配置文件中配置)

    server.服务器id=服务器ip:服务器通信端口:服务器之间投票选举端口

     此时已经构建集群环境

    5.启动即可

    三个都启动

    usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start

     然后看彼此节点状态发现2节点是leader

    1. [root@localhost ~]# /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
    2. /usr/bin/java
    3. ZooKeeper JMX enabled by default
    4. Using config: /usr/local/zookeeper-cluster/zookeeper-2/bin/../conf/zoo.cfg
    5. Client port found: 2182. Client address: localhost. Client SSL: false.
    6. Mode: leader

    异常测试

     当你把主节点stop之后,再次start之后,主节点还是主节点

     

  • 相关阅读:
    【启扬方案】基于启扬安卓屏一体机的医疗手推车解决方案
    asp.net core 在linux上部署
    HTTP协议状态及报文组成 - 一文通读
    ts的装饰器
    实证与虚无,抽象和具象,Go lang1.18入门精炼教程,由白丁入鸿儒,Go lang接口(interface)的使用EP08
    《安富莱嵌入式周报》第291期:分分钟设计数字芯片,单片机版JS,神经网络DSP,microPLC,FatFS升级至V0.15,微软Arm64 VS正式版发布
    PyQt5 & PyQt6 Designer 的安装
    千万注意!使用GetAsyncKeyState检测窗口按键时要检查是否为前端窗口!
    Achronix与您相约“2023全球AI芯片峰会”
    ClickHouse学习笔记之数据类型
  • 原文地址:https://blog.csdn.net/weixin_57128596/article/details/126676348