• 快速掌握Zookeeper及Java API


    Apache ZooKeeper是一个分布式协调服务,它简化了分布式应用程序开发。Apache Hadoop、HBase等项目依赖Zookeeper,用于领导者选举、配置管理、节点协调、服务租约管理等。
    ZooKeeper集群节点将数据存储在共享层级命名空间中,类似于标准的文件系统或树状数据结构。本文将展示如何使用Zookeeper的Java API来存储、更新和删除存储在Zookeeper中的信息。

    Zookeeper安装和命令

    官网下载最新版安装包,如apache-zookeeper-3.5.10-bin.tar.gz 。 为了简化,我们的示例使用单机版本,详细步骤参考安装文档。单机版本没有副本,如果Zookeeper处理异常,服务会停止。

    单机环境很简单,解压即完成安装。使用下面命令启动Zookeeper服务:

    ./zkServer.sh start
    
    
    • 1
    • 2

    输出内容如下:

    ZooKeeper JMX enabled by default
    Using config: E:\dev-tools\zookeeper\conf\zoo.cfg
    Starting zookeeper ... STARTED
    
    • 1
    • 2
    • 3

    Zookeeper采用层级命名空间(类似于分布式文件系统)存储协调数据,如状态信息、协调信息、位置信息等。这些信息存储在不同节点上。在Zookeeper上的每个节点称为ZNode。zk中创建的节点分为两种:永久性节点和临时性节点。永久性节点即创建以后,在不执行delete命令的前提下,该节点是永久存在的;而临时节点与session有关,每个客户端与zk建立链接的时候会生成一个session,这个session不会因为链接zk服务器节点的变化而变化,只有当客户端断开连接以后,该session才会消失,而临时节点会随着session的消失而消失。

    下面介绍常用的Zookeeper CLI示例。使用zkCli.sh 默认server为127.0.0.1:2181,在本机操作可以省略:zkCli.sh -server 127.0.0.1:2181。后面跟上要执行的命令。下面通过命令创建节点,然后获取节点的值。

    • 创建节点
     .\bin\zkCli.cmd create /FirstNode v001
    
    • 1

    在命名空间的根节点上创建FirstNode节点,并设定值为v001.

    • 获取节点值
     .\bin\zkCli.cmd get /FirstNode
    
    • 1
    • 更新节点值
    .\bin\zkCli.cmd set /FirstNode new001
    
    • 1
    • 删除节点
    .\bin\zkCli.cmd delete /FirstNode
    
    • 1

    Java Api 示例

    首先加入依赖:

    
        org.apache.zookeeper
        zookeeper
        3.4.11
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    下面通过java API 创建节点、更新节点、返回数据。

    ZooKeeper Java API主要由两个包组成:
    org.apache.zookeeper:它定义了ZooKeeper客户端库的主类,以及很多ZooKeeper事件类型和状态的静态定义。
    org.apache.zookeeper.data:它定义了与znode相关的特征,如访问控制列表(ACL)、id、统计信息等.

    还有一些ZooKeeper Java api用于服务端实现,比如org.apache.zookeeper.server,org.apache.zookeeper.server.quorum,org.apache.zookeeper.server.upgrade。但这些超出了本文的范围。

    连接Zookeeper实例

    下面代码创建ZkConnection类,用于连接或关闭连接正在运行的Zookeeper服务:

    public class ZKConnection {
        private ZooKeeper zoo;
        final CountDownLatch connectionLatch = new CountDownLatch(1);
    
        public ZooKeeper connect(String host) throws IOException, InterruptedException {
            zoo = new ZooKeeper(host, 2000, new Watcher() {
                @Override
                public void process(WatchedEvent we) {
                    if (we.getState() == Event.KeeperState.SyncConnected) {
                        connectionLatch.countDown();
                    }
                }
            });
            connectionLatch.await();
            return zoo;
        }
    
        public void close() throws InterruptedException {
            zoo.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    要使用Zookeeper服务,应用程序首先需要实例化ZooKeeper 类,它是Zookeeper客户端的主类。在connect方法中,实例化ZooKeeper类,为了保证连接成功,注册了回调方法,处理从Zookeeper服务端传回的连接确认事件WatchedEvent ,方法内使用CountDownLatch的countdown方法,用于解除阻塞并返回连接实例。一旦建立到服务器的连接,给客户端分配会话ID。为了保持会话有效,客户端应该定期向服务器发送心跳。只要它的会话ID仍然有效,则客户端可以调用ZooKeeper api。

    客户端操作

    首先定义操作接口,用于操作ZNode,如创建节点、更新节点值、获取节点值:

    import org.apache.zookeeper.KeeperException;
    
    public interface ZKManager {
        /**
         * Create a Znode and save some data
         *
         * @param path path
         * @param data data
         */
        void create(String path, byte[] data) throws KeeperException, InterruptedException;
    
        /**
         * Get ZNode Data
         *
         * @param path path
         * @param  watchFlag flag
         */
        Object getZNodeData(String path, boolean watchFlag);
    
        /**
         * Update the ZNode Data
         *
         * @param path path
         * @param data data
         */
        void update(String path, byte[] data) throws KeeperException, InterruptedException, KeeperException;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    接口实现类:

    public class ZKManagerImpl implements ZKManager {
        private static ZooKeeper zkeeper;
        private static ZKConnection zkConnection;
    
        public ZKManagerImpl() {
            initialize();
        }
    
        /** * Initialize connection */
        private void initialize() {
            try {
                zkConnection = new ZKConnection();
                zkeeper = zkConnection.connect("localhost");
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
        }
    
        public void closeConnection() {
            try {
                zkConnection.close();
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        }
    
        @Override
        public void create(String path, byte[] data) throws KeeperException, InterruptedException {
            zkeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    
        @Override
        public Object getZNodeData(String path, boolean watchFlag) {
            try {
                byte[] b;
                b = zkeeper.getData(path, null, null);
                String data = new String(b, StandardCharsets.UTF_8);
                System.out.println(data);
                return data;
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
            return null;
        }
    
        @Override
        public void update(String path, byte[] data) throws KeeperException, InterruptedException {
            int version = zkeeper.exists(path, true)
                    .getVersion();
            zkeeper.setData(path, data, version);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    上面代码中,connect 和 disconnect方法调用之前创建ZKConnection类的实例。create方法创建ZNode节点,参数为路径以及内容对应的字节数组,为了演示我们没有使用ACL,且为持久节点,即使客户端断开连接节点也不会被删除。

    getZNodeData方法从ZooKeeper中获取ZNode数据的逻辑非常简单。最后使用update方法,检查ZNode在给定路径是否存在。除此之外,为了更新数据首先检查ZNode是否存在并获得当前版本。然后用ZNode路径、data和当前版本作为参数调用setData方法。只有传入的版本与最新版本匹配,ZooKeeper才会更新数据。

    总结

    在开发分布式应用程序时,Apache ZooKeeper作为分布式协调服务起着至关重要的作用。特别是对于存储共享配置、选择主节点等应用场景。ZooKeeper还提供了优雅的javaAPI,用于客户端应用程序代码与ZooKeeper znode间无缝通信。

  • 相关阅读:
    【Node】第三方模块&自定义模块
    超融合架构和传统架构有什么区别?
    linux安装oracle jdk
    快速搭建云原生开发环境(k8s+pv+prometheus+grafana)
    zookeeper典型使用场景
    分析问题难以决策,如何用一颗树来解决?
    webpack 压缩图片
    php开发100问?
    Allegro Design Entry HDL(OrCAD Capture HDL)RF-PCB菜单详细介绍
    温度转换(c++基础)
  • 原文地址:https://blog.csdn.net/neweastsun/article/details/126891361