• java连接zookeeper


    API

    ZooKeeper官方提供了Java API,可以通过Java代码来连接zookeeper服务进行操作。可以连接、创建节点、获取节点数据、监听节点变化等操作,具体有以下几个重要的类:

    1. ZooKeeper:ZooKeeper类是Java API的核心类,用于与ZooKeeper服务器建立连接,并提供了一系列方法来操作ZooKeeper的节点。
    2. Watcher:Watcher是ZooKeeper的一个回调接口,当节点发生变化时会调用相应的方法进行通知。
    3. CreateMode:CreateMode枚举类定义了节点的类型,包括永久节点、临时节点、顺序节点和临时顺序节点。
    4. Stat:Stat类表示节点的元数据信息,比如修改版本、数据长度、子节点数量等。

    添加依赖

    
        org.apache.zookeeper
        zookeeper
        3.7.2
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    操作例子

    String host = "localhost:2181";
    //建立连接
    zooKeeper = new ZooKeeper(host, 2000, null);
    String path = "/test";
    Watcher watcher = new Watcher() {
      @Override
      public void process(WatchedEvent watchedEvent) {
        System.out.println("Node changed: " + watchedEvent.getPath());
        System.out.println(watchedEvent);
      }
    };
    //获取节点状态 如果不存在返回null
    Stat stat = zooKeeper.exists(path, false);
    if(null != stat){
    	System.out.println(stat.getCzxid()+"-"+stat.getAversion());
    }
    
    //创建节点 包含版本、时间、数据长度等信息
    zooKeeper.create(path,"123".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
    //添加watcher
    zooKeeper.addWatch(path, watcher, AddWatchMode.PERSISTENT);
    
    //获取节点数据
    byte[] data = zooKeeper.getData(path, false, null);
    System.out.println(new String(data));
    
    zooKeeper.create(path+"/1","child1".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
    //获取子节点
    List<String> children = zooKeeper.getChildren(path, false);
    System.out.println("childs size:"+children.size());
    //删除子节点
    zooKeeper.delete(path+"/1",-1);
    zooKeeper.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    zkClient

    zkClient封装了zookeeper的官方api,简化了一些繁琐的操作,并提供了一些额外的功能,提高了开发效.

    添加依赖

    
        com.101tec
        zkclient
        0.11
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    zkclient对节点数据的操作进行了序列化, 这里先准备一个string类型的序列化类。需要实现ZkSerializer接口

    public class ZkStringSerializer implements ZkSerializer {
        @Override
        public byte[] serialize(Object o) throws ZkMarshallingError {
            return String.valueOf(o).getBytes();
        }
    
        @Override
        public Object deserialize(byte[] bytes) throws ZkMarshallingError {
            return new String(bytes);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    基本操作

    ZkClient zkClient = new ZkClient("localhost:2181");
    //自定义序列化 否则报错
    zkClient.setZkSerializer(new ZkStringSerializer());
    String path = "/test";
    //判断节点是否存在
    boolean exist = zkClient.exists(path);
    System.out.println(exist);
    if(!exist){//创建节点
        zkClient.create(path,"123", CreateMode.PERSISTENT);
    }
    //读取节点数据
    System.out.println((String) zkClient.readData(path));
    zkClient.writeData(path,"456");//设置节点数据
    System.out.println((String) zkClient.readData(path));
    zkClient.delete(path);//删除节点
    
    zkClient.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    节点变化事件

    String path = "/test";
    /**
     * 节点变化事件
     * 只监听节点增减,不监听数据变化事件
     */
    zkClient.subscribeChildChanges(path, new IZkChildListener() {
        @Override
        public void handleChildChange(String parentPath, List<String> children) throws Exception {
            System.out.println("节点"+parentPath+"发生变化");
            System.out.println(children);
        }
    });
    //节点操作,观察handleChildChange接收到对应事件
    Thread.sleep(2000);
    zkClient.createPersistent(path);
    Thread.sleep(2000);
    zkClient.createPersistent(path+"/child1");
    Thread.sleep(2000);
    zkClient.writeData(path+"/child1","123");
    Thread.sleep(2000);
    zkClient.delete(path+"/child1");
    Thread.sleep(2000);
    zkClient.delete(path);
    Thread.sleep(100000);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    节点数据变化事件

        String path = "/test";
        /**
         * 节点变化事件,只检测当前节点,感知不到其子节点
         * 节点被删除或节点数据变化
         */
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println("节点:"+s+"数据变为:"+o);
            }
    
            @Override
            public void handleDataDeleted(String s) throws Exception {
                System.out.println("节点:"+s+"删除");
            }
        });
    
        Thread.sleep(2000);
        zkClient.createPersistent(path);
        Thread.sleep(2000);
        zkClient.createPersistent(path+"/child1");
        Thread.sleep(2000);
        zkClient.delete(path+"/child1");
        Thread.sleep(2000);
        zkClient.writeData(path,"123");
        Thread.sleep(2000);
        zkClient.delete(path);
        Thread.sleep(100000);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    Curator

    curator是另一个java连接zookeeper类库。功能更加强大。提供了连接重试、分布式锁、选举、队列等多种实际场景的用例。这里先简单搞个使用例子。

    添加依赖

    
        org.apache.curator
        curator-framework
        5.1.0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    curator-framework是基础的依赖,一些特定的使用方式需要添加不同的依赖,有curator-recipes、curator-x-discovery、curator-x-async等。

    基本操作

    //创建连接
    CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
    client.start();
    String path = "/test";
    client.checkExists().forPath(path);//判断是否存在
    client.create().forPath(path, "123".getBytes());//创建节点
    byte[] data = client.getData().forPath(path);//获取数据
    System.out.println(new String(data));
    client.setData().forPath(path, "456".getBytes());//设置数据
    client.delete().forPath(path);//删除节点
    
    client.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    节点监听

    CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
    client.start();
    String path = "/test";
    NodeCache  nodeCache = new NodeCache(client,path);
    //添加监听
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            ChildData data = nodeCache.getCurrentData();
            if (data != null) {
                System.out.println("Node changed: " + data.getPath() + ", value: " + new String(data.getData()));
            } else {
                System.out.println("Node deleted: " + nodeCache.getPath());
            }
        }
    });
    nodeCache.start();
    client.create().forPath(path);
    client.setData().forPath(path, "123".getBytes());
    client.delete().forPath(path);
    client.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    这里NodeCache被标识@Deprecated,也不知道被什么方式代替了,后面再研究。先简单使用。

  • 相关阅读:
    06-mq
    Spring源码:Spring源码阅读环境搭建
    RedisConnectionFactory is required已解决!!!!
    设计模式-代理模式
    来吧!再谈多线程
    关于一次两段式提交和数据库恢复数据我的一些想法
    springboot整合swagger3和knife4j
    Crack:Aspose.CAD 22.11.0 for .NET|Aspose.CAD
    React 状态管理 - Redux 进阶(上)
    小节8:Python之文件操作
  • 原文地址:https://blog.csdn.net/sinat_16493273/article/details/134157085