• Zookeeper Java SDK 开发入门


    如果您还没有安装Zookeeper请看ZooKeeper 安装说明Zookeeper 命令使用方法和数据说明

    一、概述

    • ZooKeeper是一个开源的、分布式的协调服务,它主要用于分布式系统中的数据管理和协调任务。它提供了一个具有高可用性的分布式环境,用于存储和管理小规模数据,例如配置信息、命名服务、分布式锁等。

    • 本文主要介绍如何使用 Java 与 ZooKeeper 建立连接,进行数据创建、修改、读取、删除等操作。

    • 源码地址:https://github.com/apache/zookeeper

      在这里插入图片描述

    二、导入依赖包

    • 在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。

      <dependency>
         <groupId>org.apache.zookeepergroupId>
          <artifactId>zookeeperartifactId>
          <version>3.8.2version>
      dependency>
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

    三、与 Zookeeper 建立连接

    • 与ZooKeeper集群建立连接使用 ZooKeeper 类,传递三个参数,分别是
      • connectionString ,是ZooKeeper 集群地址(没连接池的概念,是Session的概念)
      • sessionTimeout , 是ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
      • watcher, ZooKeeper Session 级别监听器( Watcher),(Watch只发生在读方法上,如 get、exists等)
        private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
    
            // ZooKeeper 集群地址(没连接池的概念,是Session的概念)
            String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";
            // ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
            Integer sessionTimeout = 3000;
            // ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)
            final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    try {
                        Event.KeeperState state = watchedEvent.getState();
                        Event.EventType type = watchedEvent.getType();
                        String path = watchedEvent.getPath();
    
                        switch (state) {
                            case Unknown:
                                break;
                            case Disconnected:
                                break;
                            case NoSyncConnected:
                                break;
                            case SyncConnected:
                                countDownLatch.countDown();
                                break;
                            case AuthFailed:
                                break;
                            case ConnectedReadOnly:
                                break;
                            case SaslAuthenticated:
                                break;
                            case Expired:
                                break;
                            case Closed:
                                break;
                        }
                        switch (type) {
                            case None:
                                break;
                            case NodeCreated:
                                break;
                            case NodeDeleted:
                                break;
                            case NodeDataChanged:
                                break;
                            case NodeChildrenChanged:
                                break;
                            case DataWatchRemoved:
                                break;
                            case ChildWatchRemoved:
                                break;
                            case PersistentWatchRemoved:
                                break;
                        }
    
                        System.out.println("Session watch state=" + state);
                        System.out.println("Session watch type=" + type);
                        System.out.println("Session watch path=" + path);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            // 由于建立连接是异步的,这里先阻塞等待连接结果
            countDownLatch.await();
            ZooKeeper.States state = zooKeeper.getState();
            switch (state) {
                case CONNECTING:
                    break;
                case ASSOCIATING:
                    break;
                case CONNECTED:
                    break;
                case CONNECTEDREADONLY:
                    break;
                case CLOSED:
                    break;
                case AUTH_FAILED:
                    break;
                case NOT_CONNECTED:
                    break;
            }
            System.out.println("ZooKeeper state=" + state);
    
            return zooKeeper;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    四、判断 ZooKeeper 节点是否存在

    • 创建节点数据使用 exists 方法,传递四个参数
      • path , 表示节点目录名称
      • watch, 表示监听器(只对该路径有效)
      • stat, 判断结果回调函数
      • context, 自定义上下文对象
        private static void testExists(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
            // 判断 ZooKeeper 节点是否存在
            Object context = new Object();
            zooKeeper.exists("/yiqifu", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
    
                }
            }, new AsyncCallback.StatCallback() {
                @Override
                public void processResult(int i, String s, Object o, Stat stat) {
                    if(null != stat){
                        System.out.println("ZooKeeper /yiqifu 节点存在");
                    }
                    else {
                        System.out.println("ZooKeeper /yiqifu 节点不存在");
                    }
                }
            }, context);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    四、创建 ZooKeeper 节点数据

    • 创建节点数据使用 create 方法,传递四个参数
      • path , 表示节点目录名称
      • data , 表示节点数据
       private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException 	{
    
            // 在 ZooKeeper 中创建节点
            String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("ZooKeeper 创建节点成功:" + nodeName);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    五、获取 ZooKeeper 节点数据

    • 获取 ZooKeeper 节点数据使用 getData 方法,传递三个参数
      • path , 表示节点目录名称
      • watch, 表示路径级别的监听器,这个监听器只对该路径下的数据操作监听生效。
        private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
    
            // 获取 ZooKeeper 节点数据,这里设置了Path级Watch
            final Stat stat = new Stat();
            byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    try {
                        Event.KeeperState state = watchedEvent.getState();
                        Event.EventType type = watchedEvent.getType();
                        String path = watchedEvent.getPath();
    
                        System.out.println("Path watch state=" + state);
                        System.out.println("Path watch type=" + type);
                        System.out.println("Path watch path=" + path);
    
                        //zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级Watch
                        zooKeeper.getData("/yiqifu", this, stat);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, stat);
            System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    六、修改 ZooKeeper 节点数据

    • 修改 ZooKeeper 节点数据使用 setData 方法,传递三个参数
      • path , 表示节点目录名称。
      • data, 表示新数据。
      • version, 表示数据版本。
        private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
            // 更新 ZooKeeper 节点数据(修改数据会触发Watch)
            zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);
            zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    七、异步获取 ZooKeeper 节点数据

    • 修改 ZooKeeper 节点数据使用 getData 方法,传递三个参数

      • path , 表示节点目录名称。

      • watch, 表示是否触发监听器。

      • dataCallback, 表示异步获取数据的回调函数。

     private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
    
            // 获取 ZooKeeper 节点数据(使用异步回调方式)
            Object context = new Object();
            zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {
                @Override
                public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
                    System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);
                    System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));
                }
            }, context);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    八、完整示例

    package top.yiqifu.study.p131;
    
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class Test01_Zookeeper {
    
        public static void main(String[] args) {
            try {
                // 创建 ZooKeeper 对象
                ZooKeeper zooKeeper = testCreateZookeeper();
                // 在 ZooKeeper 创建数据节点
                testCreateNode(zooKeeper);
                // 在 ZooKeeper 中同步获取节点数据
                testGetdata(zooKeeper);
                // 在 ZooKeeper 中更新节点数据
                testSetdata(zooKeeper);
                // 在 ZooKeeper 异步获取节点数据
                testAsyncGetdata(zooKeeper);
    
                Thread.sleep(3000);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
    
        private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
    
            // ZooKeeper 集群地址(没连接池的概念,是Session的概念)
            String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";
            // ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
            Integer sessionTimeout = 3000;
            // ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)
            final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    try {
                        Event.KeeperState state = watchedEvent.getState();
                        Event.EventType type = watchedEvent.getType();
                        String path = watchedEvent.getPath();
    
                        switch (state) {
                            case Unknown:
                                break;
                            case Disconnected:
                                break;
                            case NoSyncConnected:
                                break;
                            case SyncConnected:
                                countDownLatch.countDown();
                                break;
                            case AuthFailed:
                                break;
                            case ConnectedReadOnly:
                                break;
                            case SaslAuthenticated:
                                break;
                            case Expired:
                                break;
                            case Closed:
                                break;
                        }
                        switch (type) {
                            case None:
                                break;
                            case NodeCreated:
                                break;
                            case NodeDeleted:
                                break;
                            case NodeDataChanged:
                                break;
                            case NodeChildrenChanged:
                                break;
                            case DataWatchRemoved:
                                break;
                            case ChildWatchRemoved:
                                break;
                            case PersistentWatchRemoved:
                                break;
                        }
    
                        System.out.println("Session watch state=" + state);
                        System.out.println("Session watch type=" + type);
                        System.out.println("Session watch path=" + path);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            countDownLatch.await();
            ZooKeeper.States state = zooKeeper.getState();
            switch (state) {
                case CONNECTING:
                    break;
                case ASSOCIATING:
                    break;
                case CONNECTED:
                    break;
                case CONNECTEDREADONLY:
                    break;
                case CLOSED:
                    break;
                case AUTH_FAILED:
                    break;
                case NOT_CONNECTED:
                    break;
            }
            System.out.println("ZooKeeper state=" + state);
    
            return zooKeeper;
        }
    
        private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
    
            // 在 ZooKeeper 中创建节点
            String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("ZooKeeper 创建节点成功:" + nodeName);
        }
    
        private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
    
            // 获取 ZooKeeper 节点数据,这里设置了Path级Watch
            final Stat stat = new Stat();
            byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    try {
                        Event.KeeperState state = watchedEvent.getState();
                        Event.EventType type = watchedEvent.getType();
                        String path = watchedEvent.getPath();
    
                        System.out.println("Path watch state=" + state);
                        System.out.println("Path watch type=" + type);
                        System.out.println("Path watch path=" + path);
    
                        //zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级Watch
                        zooKeeper.getData("/yiqifu", this, stat);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, stat);
            System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));
        }
    
        private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
            // 更新 ZooKeeper 节点数据(修改数据会触发Watch)
            zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);
            zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);
        }
    
        private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
    
            // 获取 ZooKeeper 节点数据(使用异步回调方式)
            Object context = new Object();
            zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {
                @Override
                public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
                    System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);
                    System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));
                }
            }, context);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
  • 相关阅读:
    OPPO手机便签数据搬家到华为mate60Pro手机怎么操作
    pytorch的backward()的底层实现逻辑
    GEE:基于GLDAS数据集分析土壤湿度的时间序列变化
    前端知识大全之CSS
    [MDM9607]高通9607 QCMAP设置LAN IP之后无法获取到IP地址问题分析及解决方案
    数据结构与算法_哈希表_线性探测法原理和代码实现
    Android手机或平板设置浏览器的UserAgent
    bootstrap-datepicker实现只能选择每一年的某一个月份
    2022编译原理期末考试 回忆版
    树莓派系统压缩备份实操
  • 原文地址:https://blog.csdn.net/qifu123/article/details/134425520