Zookeeper的作用是提供一个分布式的协调服务,它可以让分布式系统中的各个节点之间进行通信和协调,从而保证整个系统的一致性和可靠性。Zookeeper的核心是一个分布式的文件系统,它可以存储和管理分布式系统中的各种信息,如配置信息、元数据、状态信息等。
常用的Zookeeper API包括:
1. 创建节点:create()
2. 获取节点数据:getData()
3. 设置节点数据:setData()
4. 删除节点:delete()
5. 判断节点是否存在:exists()
6. 获取子节点列表:getChildren()
7. 监听节点变化:Watcher机制
CuratorFramework是一个ZooKeeper客户端框架,它封装了ZooKeeper API,提供了更加简单易用的接口,同时也提供了一些高级特性,如分布式锁、选举等。
CuratorFramework常用方法包括:
1. start():启动CuratorFramework实例。
2. close():关闭CuratorFramework实例。
3. create():创建一个ZooKeeper节点。
4. delete():删除一个ZooKeeper节点。
5. setData():设置一个ZooKeeper节点的数据。
6. getData():获取一个ZooKeeper节点的数据。
7. checkExists():检查一个ZooKeeper节点是否存在。
8. getChildren():获取一个ZooKeeper节点的子节点列表。
9. sync():同步一个ZooKeeper节点的数据。
10. inTransaction():开启一个ZooKeeper事务。
11. createContainers():创建一个ZooKeeper节点的父节点。
12. clearWatcherReferences():清除CuratorFramework实例中的所有Watcher引用。
Watcher是ZooKeeper中的一个机制,用于监听节点的变化。当节点发生变化时,Watcher会触发相应的事件,通知客户端进行相应的处理
public class CuratorWatcherDemo {
private static final String ZK_ADDRESS = "localhost:2181";
private static final String ZK_PATH = "/test";
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new RetryNTimes(3, 1000));
client.start();
// 创建节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(ZK_PATH, "init".getBytes());
// 注册Watcher
client.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("Watcher triggered, type=" + event.getType() + ", path=" + event.getPath());
}
}).forPath(ZK_PATH);
// 修改节点数据
client.setData().forPath(ZK_PATH, "update".getBytes());
// 删除节点
client.delete().deletingChildrenIfNeeded().forPath(ZK_PATH);
client.close();
}
}
创建了一个CuratorFramework客户端,并使用它创建了一个节点。然后,注册了一个Watcher,用于监听节点的变化。接着,修改了节点的数据,并最终删除了节点。在这个过程中,Watcher会被触发,并输出相应的日志。
ConnectionStateListener 是 ZooKeeper 客户端连接状态的监听器,用于监听客户端与 ZooKeeper 服务器之间的连接状态变化。当客户端与 ZooKeeper 服务器之间的连接状态发生变化时,ConnectionStateListener 会触发相应的事件,我们可以在事件处理方法中编写相应的业务逻辑。
ConnectionStateListener 常用方法有:
1. void stateChanged(CuratorFramework client, ConnectionState newState):当客户端与 ZooKeeper 服务器之间的连接状态发生变化时,会触发该方法。其中,client 表示 CuratorFramework 客户端对象,newState 表示新的连接状态。
2. void addListener(ConnectionStateListener listener):添加连接状态监听器。
3. void removeListener(ConnectionStateListener listener):移除连接状态监听器。
ConnectionStateListener 与 CuratorFramework 结合使用示例代码:
public class MyConnectionStateListener implements ConnectionStateListener {
private CuratorFramework client;
public MyConnectionStateListener(CuratorFramework client) {
this.client = client;
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.CONNECTED) {
System.out.println("连接成功");
} else if (newState == ConnectionState.RECONNECTED) {
System.out.println("重新连接成功");
} else if (newState == ConnectionState.LOST) {
// while (true) {
// try {
// if (curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
// curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
// .forPath(zkRegPathPrefix, regContent.getBytes("UTF-8"));
// break;
// }
// } catch (InterruptedException e) {
// //TODO: log something
// break;
// } catch (Exception e) {
// //TODO: log something
// }
// }
//
//
// 原文链接:https://blog.csdn.net/xsm666/article/details/85260861
// Curator Framework的`getZookeeperClient()`方法获取ZooKeeper客户端对象,然后调用`blockUntilConnectedOrTimedOut()`方法来阻塞当前线程,直到与ZooKeeper服务器建立连接或超时。
//
// 在ZooKeeper连接过程中,客户端需要与服务器进行握手和建立会话,这个过程可能需要一些时间。`blockUntilConnectedOrTimedOut()`方法会阻塞当前线程,直到连接成功或超时。如果连接成功,代码会继续执行后续的操作;如果超时,可能会抛出异常或执行相应的错误处理逻辑。
//
// 这段代码的目的是确保在使用Curator Framework进行ZooKeeper操作之前,先确保与ZooKeeper服务器建立了可用的连接。这样可以避免在没有建立连接的情况下进行操作,从而提高代码的稳定性和可靠性。
System.out.println("连接丢失");
} else if (newState == ConnectionState.SUSPENDED) {
System.out.println("连接挂起");
} else if (newState == ConnectionState.READ_ONLY) {
System.out.println("只读连接");
}
}
}
public class CuratorFrameworkTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
client.start();
client.getConnectionStateListenable().addListener(new MyConnectionStateListener(client));
Thread.sleep(Integer.MAX_VALUE);
}
}
MyConnectionStateListener 类,实现了 ConnectionStateListener 接口,并在 stateChanged 方法中编写了相应的业务逻辑。然后,在 CuratorFrameworkTest 类中,创建了一个 CuratorFramework 客户端对象,并添加了 MyConnectionStateListener 监听器。最后,让主线程休眠,保证程序不会退出。当客户端与 ZooKeeper 服务器之间的连接状态发生变化时,MyConnectionStateListener 监听器会触发相应的事件,并输出相应的日志信息。
Curator处理zookeeper会话过期session expired
可参考:https://blog.csdn.net/u012373717/article/details/133062251
https://blog.csdn.net/xsm666/article/details/85260861