• ZooKeeper、CuratorFramework、Watcher、ConnectionStateListener


    ZooKeeper

    Zookeeper的作用是提供一个分布式的协调服务,它可以让分布式系统中的各个节点之间进行通信和协调,从而保证整个系统的一致性和可靠性。Zookeeper的核心是一个分布式的文件系统,它可以存储和管理分布式系统中的各种信息,如配置信息、元数据、状态信息等。

    常用的Zookeeper API包括:

    1. 创建节点:create()
    2. 获取节点数据:getData()
    3. 设置节点数据:setData()
    4. 删除节点:delete()
    5. 判断节点是否存在:exists()
    6. 获取子节点列表:getChildren()
    7. 监听节点变化:Watcher机制
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    CuratorFramework

    CuratorFramework是一个ZooKeeper客户端框架,它封装了ZooKeeper API,提供了更加简单易用的接口,同时也提供了一些高级特性,如分布式锁、选举等。

    CuratorFramework常用方法包括:

    1. start():启动CuratorFramework实例。
    2. close():关闭CuratorFramework实例。
    3. create():创建一个ZooKeeper节点。
    4. delete():删除一个ZooKeeper节点。
    5. setData():设置一个ZooKeeper节点的数据。
    6. getData():获取一个ZooKeeper节点的数据。
    7. checkExists():检查一个ZooKeeper节点是否存在。
    8. getChildren():获取一个ZooKeeper节点的子节点列表。
    9. sync():同步一个ZooKeeper节点的数据。
    10. inTransaction():开启一个ZooKeeper事务。
    11. createContainers():创建一个ZooKeeper节点的父节点。
    12. clearWatcherReferences():清除CuratorFramework实例中的所有Watcher引用。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    Watcher

    Watcher是ZooKeeper中的一个机制,用于监听节点的变化。当节点发生变化时,Watcher会触发相应的事件,通知客户端进行相应的处理


    Watcher与CuratorFramework结合使用的示例代码

    public class CuratorWatcherDemo {
        private static final String ZK_ADDRESS = "localhost:2181";
        private static final String ZK_PATH = "/test";
    
        public static void main(String[] args) throws Exception {
            CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new RetryNTimes(3, 1000));
            client.start();
    
            // 创建节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(ZK_PATH, "init".getBytes());
    
            // 注册Watcher
            client.getData().usingWatcher(new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("Watcher triggered, type=" + event.getType() + ", path=" + event.getPath());
                }
            }).forPath(ZK_PATH);
    
            // 修改节点数据
            client.setData().forPath(ZK_PATH, "update".getBytes());
    
            // 删除节点
            client.delete().deletingChildrenIfNeeded().forPath(ZK_PATH);
    
            client.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    创建了一个CuratorFramework客户端,并使用它创建了一个节点。然后,注册了一个Watcher,用于监听节点的变化。接着,修改了节点的数据,并最终删除了节点。在这个过程中,Watcher会被触发,并输出相应的日志。


    ConnectionStateListener

    ConnectionStateListener 是 ZooKeeper 客户端连接状态的监听器,用于监听客户端与 ZooKeeper 服务器之间的连接状态变化。当客户端与 ZooKeeper 服务器之间的连接状态发生变化时,ConnectionStateListener 会触发相应的事件,我们可以在事件处理方法中编写相应的业务逻辑。

    ConnectionStateListener 常用方法有:

    1. void stateChanged(CuratorFramework client, ConnectionState newState):当客户端与 ZooKeeper 服务器之间的连接状态发生变化时,会触发该方法。其中,client 表示 CuratorFramework 客户端对象,newState 表示新的连接状态。
    
    2. void addListener(ConnectionStateListener listener):添加连接状态监听器。
    
    3. void removeListener(ConnectionStateListener listener):移除连接状态监听器。
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ConnectionStateListener 与 CuratorFramework 结合使用示例代码:

    public class MyConnectionStateListener implements ConnectionStateListener {
    
        private CuratorFramework client;
    
        public MyConnectionStateListener(CuratorFramework client) {
            this.client = client;
        }
    
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if (newState == ConnectionState.CONNECTED) {
                System.out.println("连接成功");
            } else if (newState == ConnectionState.RECONNECTED) {
                System.out.println("重新连接成功");
            } else if (newState == ConnectionState.LOST) {
            
    				//    while (true) {
    				//        try {
    				//            if (curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut()) 			{
    				//                curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
    				//                    .forPath(zkRegPathPrefix, regContent.getBytes("UTF-8"));
    				//                break;
    				//            }
    				//        } catch (InterruptedException e) {
    				//            //TODO: log something
    				//            break;
    				//        } catch (Exception e) {
    				//            //TODO: log something
    				//        }
    				//    }
    				//
    				//
    				//    原文链接:https://blog.csdn.net/xsm666/article/details/85260861
    
    
    				//    Curator Framework的`getZookeeperClient()`方法获取ZooKeeper客户端对象,然后调用`blockUntilConnectedOrTimedOut()`方法来阻塞当前线程,直到与ZooKeeper服务器建立连接或超时。
    				//
    				//    在ZooKeeper连接过程中,客户端需要与服务器进行握手和建立会话,这个过程可能需要一些时间。`blockUntilConnectedOrTimedOut()`方法会阻塞当前线程,直到连接成功或超时。如果连接成功,代码会继续执行后续的操作;如果超时,可能会抛出异常或执行相应的错误处理逻辑。
    				//
    				//    这段代码的目的是确保在使用Curator Framework进行ZooKeeper操作之前,先确保与ZooKeeper服务器建立了可用的连接。这样可以避免在没有建立连接的情况下进行操作,从而提高代码的稳定性和可靠性。
    
    
                System.out.println("连接丢失");
            } else if (newState == ConnectionState.SUSPENDED) {
                System.out.println("连接挂起");
            } else if (newState == ConnectionState.READ_ONLY) {
                System.out.println("只读连接");
            }
        }
    }
    
    public class CuratorFrameworkTest {
    
        public static void main(String[] args) throws Exception {
            CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
            client.start();
            client.getConnectionStateListenable().addListener(new MyConnectionStateListener(client));
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    MyConnectionStateListener 类,实现了 ConnectionStateListener 接口,并在 stateChanged 方法中编写了相应的业务逻辑。然后,在 CuratorFrameworkTest 类中,创建了一个 CuratorFramework 客户端对象,并添加了 MyConnectionStateListener 监听器。最后,让主线程休眠,保证程序不会退出。当客户端与 ZooKeeper 服务器之间的连接状态发生变化时,MyConnectionStateListener 监听器会触发相应的事件,并输出相应的日志信息。

    Curator处理zookeeper会话过期session expired

    可参考:https://blog.csdn.net/u012373717/article/details/133062251
    https://blog.csdn.net/xsm666/article/details/85260861

  • 相关阅读:
    AUTOSAR之CanNm全局配置概述
    LoadRunner VUG 脚本
    Jupyter介绍和安装使用
    Taurus.MVC 性能压力测试(ap 压测 和 linux 下wrk 压测):.NET Core 版本
    Rust教程:How to Rust-变量
    Springboot毕业设计毕设作品,微信校园疫情防控小程序设计与实现
    数据湖建设解决方案 2022
    MySQL中如何随机获取一条记录
    1、读写分离、分库分表
    【Linux专题】SFTP 用户配置 ChrootDirectory
  • 原文地址:https://blog.csdn.net/weixin_42594143/article/details/133273490