• zookeeper介绍


    一、说明

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务

    zookeeper分为两种节点
    1、持久节点(Persistent):客户端和服务器端断开连接后,创建的节点不删除
    2、短暂节点(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除

    选举机制
    1、只有集群中的节点超过半数启动,才会选举出Leader,否则所有已近启动的节点都处于LOOKING状态
    2、在集群的启动过程中,myid小的会把票数投给myid大的
    3、当集群已经选出了Leader,则后续启动的节点直接成为Follower

    二、常用API(基于3.5)

    2.1、help

    显示所有操作命令
    在这里插入图片描述

    2.2、ls

    作用:查看节点信息
    语法:ls [-s] [-w] [-R] path
    说明:-s:附加次级信息;-w:监听子节点变化
    样例:在这里插入图片描述

    2.3、create

    作用:创建节点/数据
    语法:create [-s] [-e] [-c] [-t ttl] path [data] [acl]

    -s:顺序节点(为创建的节点添加唯一的顺序后缀)
    -e:临时节点(创建临时节点,当客户端断开连接时节点会被删除)
    -c:容器节点(创建节点时同时创建其父级路径中不存在的任何节点)
    -t ttl:生存时间(设置节点的生存时间)
    path:节点路径
    data:节点上的数据
    acl:访问控制列表

    样例:在这里插入图片描述

    2.4、get

    作用:获取节点值
    语法:get [-s] [-w] path
    说明:-s:附加次级信息;-w:监听子节点变化
    样例:在这里插入图片描述

    2.5、set

    作用:修改节点数据
    语法:set [-s] [-v version] path data
    说明:-s:附加次级信息;-v:指定要修改的节点的版本号
    样例:在这里插入图片描述

    2.6、delete

    作用:删除节点
    语法:delete [-v version] path
    说明:-v:指定要删除的节点的版本号
    样例:在这里插入图片描述

    2.7、deleteall

    作用:递归删除节点
    语法:deleteall path
    说明:-s:附加次级信息
    样例:在这里插入图片描述

    三、Java API(基于3.5)

    3.1 连接zk客户端

    void createZkClient() throws IOException {
            // 创建zk客户端(zk集群地址, 连接超时时间, 监听)
            zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
    
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3.2 创建节点

    访问权限说明
    OPEN_ACL_UNSAFE完全开放的ACL,任何连接的客户端都可以操作该属性znode
    CREATOR_ALL_ACL只有创建者才有ACL权限
    READ_ACL_UNSAFE只能读取ACL
    节点类型说明
    PERSISTENT持久节点
    PERSISTENT_SEQUENTIAL持久顺序节点
    EPHEMERAL临时节点
    EPHEMERAL_SEQUENTIAL临时顺序节点
    void create () throws InterruptedException, KeeperException, IOException {
            // 创建节点(节点,数据,访问权限,节点类型)
            // 访问权限:OPEN_ACL_UNSAFE:完全开放的ACL,任何连接的客户端都可以操作该属性znode;CREATOR_ALL_ACL:只有创建者才有ACL权限;READ_ACL_UNSAFE:只能读取ACL
            // 节点类型:PERSISTENT:持久节点;PERSISTENT_SEQUENTIAL:持久顺序节点;EPHEMERAL:临时节点;EPHEMERAL_SEQUENTIAL:临时顺序节点
            String node = createZkClient().create("/zoo", "dog".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(node);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.3 获取节点数据

    void get() throws IOException, InterruptedException, KeeperException {
            // 获取节点数据(节点,是否监听)
            byte[] data = createZkClient().getData("/zoo", false, null);
            System.out.println(new String(data, StandardCharsets.UTF_8));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.4 获取节点列表

    void getChildren() throws IOException, InterruptedException, KeeperException {
            // 查询节点列表(节点,是否监听)
            List<String> children = createZkClient().getChildren("/", false);
            for (String child : children) {
                System.out.println(child);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.5 判断节点是否存在

    void exist() throws IOException, InterruptedException, KeeperException {
            // 判断节点是否存在(节点,是否监听)
            Stat stat = createZkClient().exists("/zoo", false);
        }
    
    • 1
    • 2
    • 3
    • 4

    3.6 修改节点数据

    void update () throws IOException, InterruptedException, KeeperException {
            // 修改节点数据(节点,数据,版本号(用于原子性校验,设置为-1则表示忽略版本号检查))
            Stat stat = createZkClient().setData("/zoo", "cat".getBytes(), -1);
        }
    
    • 1
    • 2
    • 3
    • 4

    3.7 删除节点

    void delete() throws IOException, InterruptedException, KeeperException {
            // 删除节点(节点,数据,版本号(用于原子性校验,设置为-1则表示忽略版本号检查))
            createZkClient().delete("/zoo", -1);
        }
    
    • 1
    • 2
    • 3
    • 4

    四、监听器

    3.6.0版本之前不支持注册持久监听器,所以想要之久监听只能反复注册

    4.1 3.5版本

    单次监听

    @Test
        void watchData() throws IOException, InterruptedException, KeeperException {
            // 监听节点数据变化
            ZooKeeper zkClient = createZkClient();
            zkClient.getData("/zoo", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("监听节点数据变化...");
                    System.out.println("监听路径:" + event.getPath());
                    System.out.println("事件类型:" + event.getType());
                    System.out.println("状态:" + event.getState());
                }
            }, null);
            Thread.sleep(Integer.MAX_VALUE);
        }
    
        @Test
        void watchNode() throws IOException, InterruptedException, KeeperException {
            // 监听节点变化
            ZooKeeper zkClient = createZkClient();
            zkClient.getChildren("/zoo", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("监听节点变化...");
                    System.out.println("监听路径:" + event.getPath());
                    System.out.println("事件类型:" + event.getType());
                    System.out.println("状态:" + event.getState());
                }
            });
            Thread.sleep(Integer.MAX_VALUE);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    持久监听,原理:每次触发监听之后再注册一遍监听,以此实现持久监听的效果

    @Test
        void watchData() throws IOException, InterruptedException, KeeperException {
            // 监听节点数据变化
            ZooKeeper zkClient = createZkClient();
            zkClient.getData("/zoo", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("监听节点数据变化...");
                    System.out.println("监听路径:" + event.getPath());
                    System.out.println("事件类型:" + event.getType());
                    System.out.println("状态:" + event.getState());
    
                    // 重新注册监听
                    try {
                        byte[] data = zkClient.getData("/zoo", this, null);
                        System.out.println("节点数据:" + new String(data, StandardCharsets.UTF_8));
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
    
                }
            }, null);
            Thread.sleep(Integer.MAX_VALUE);
        }
    
        @Test
        void watchNode() throws IOException, InterruptedException, KeeperException {
            // 监听节点变化
            ZooKeeper zkClient = createZkClient();
            zkClient.getChildren("/zoo", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("监听节点变化...");
                    System.out.println("监听路径:" + event.getPath());
                    System.out.println("事件类型:" + event.getType());
                    System.out.println("状态:" + event.getState());
    
                    try {
                        List<String> children = zkClient.getChildren("/zoo", this);
                        System.out.println("节点信息...");
                        children.forEach(System.out::println);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            Thread.sleep(Integer.MAX_VALUE);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    4.2 3.6版本

    2.6.0版本提供了addWatch API,可以直接实现持久监听

    @Test
        void addWatch() throws IOException, InterruptedException, KeeperException {
            ZooKeeper zkClient = createZkClient();
            // PERSISTENT:持久化订阅,PERSISTENT_RECURSIVE:持久化递归订阅
            zkClient.addWatch("/zoo", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("监听节点变化...");
                        System.out.println("监听路径:" + event.getPath());
                        System.out.println("事件类型:" + event.getType());
                        System.out.println("状态:" + event.getState());
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }, AddWatchMode.PERSISTENT);
            Thread.sleep(Integer.MAX_VALUE);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    五、版本选择

    目前最新的版本为:3.9.1,最新稳定版为:3.8.3,稳定版本为:3.7.2,3.6.4版本目前已经停产,理论上来说最推荐使用3.7.2,不过也要兼容其他组件的版本

    官网版本说明

    六、分布式

    6.1、原生方式

    思路:每次创建临时顺序节点之后,判断当前节点是否为为最小序号节点,如果是最小序号节点,说明锁没有被占用,可以直接使用,如果不是最小序号节点,那说明锁已经被占用,需要等待上一个节点释放,才可以使用锁

    工具类

    package com.xx;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @author aqi
     * @describe 分布式锁
     * @since 2023/11/13 14:20
     */
    public class DistributedLock {
    
        /**
         * zk集群地址
         */
        private final String connectString = "127.0.0.1:2181";
    
        private ZooKeeper zk;
    
        /**
         * 连接超时时间
         */
        private final int sessionTimeout = 2000;
    
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        private CountDownLatch waitLatch = new CountDownLatch(1);
    
        /**
         * 上一个节点
         */
        private String waitPath;
    
        /**
         * 当前节点
         */
        private String currentNode;
    
        /**
         * 创建锁
         */
        public DistributedLock() throws IOException, InterruptedException, KeeperException {
            zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    // 如果成功连接上zk则释放
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();
                    }
                    if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                        waitLatch.countDown();
                    }
                }
            });
    
            // 等待zk连接后释放
            countDownLatch.await();
    
            // 判断根节点/locks是否存在
            Stat stat = zk.exists("/locks", false);
            if (null == stat) {
                // 创建根节点
                zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }
    
        /**
         * 获取锁
         */
        public void zkLock() throws InterruptedException, KeeperException {
            // 创建带序号的临时节点
            currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("currentNode:" + currentNode + ",waitPath:" + waitPath);
            // 判断当前节点是否为最小的序号,如果是最小的需要,则获取到锁,如果不是则监听前一个序号
            List<String> children = zk.getChildren("/locks", false);
            // 如果children只有一个值,则直接获取锁,如果存在多个节点,则需要判断是否为最小节点
            if (children.size() == 1) {
                return;
            } else {
                Collections.sort(children);
                // 获取当前节点在集合中的位置
                int index = children.indexOf(currentNode.substring("/locks/".length()));
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    return;
                }else {
                    // 监听前一个节点的变化
                    System.out.println("index:" + index);
                    System.out.println("children:" + children);
                    waitPath = "/locks/" + children.get(index - 1);
                    System.out.println("waitPath:" + waitPath);
                    zk.getData(waitPath, true, new Stat());
                    // 等待监听
                    waitLatch.await();
                }
            }
        }
    
        /**
         * 释放锁
         */
        public void unZkLock() throws InterruptedException, KeeperException {
            zk.delete(currentNode, -1);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111

    使用样例

    package com.xx;
    
    import org.apache.zookeeper.KeeperException;
    
    import java.io.IOException;
    
    /**
     * @author aqi
     * @describe
     * @since 2023/11/13 14:45
     */
    public class DistributeLockTest {
    
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            DistributedLock lock1 = new DistributedLock();
            DistributedLock lock2 = new DistributedLock();
    
    
            new Thread(() -> {
                try {
                    lock1.zkLock();
                    System.out.println("线程1启动,获取到锁");
                    Thread.sleep(5000);
    
                    lock1.unZkLock();
                    System.out.println("线程1释放锁");
    
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).start();
    
    
            new Thread(() -> {
                try {
                    lock2.zkLock();
                    System.out.println("线程2启动,获取到锁");
                    Thread.sleep(5000);
    
                    lock2.unZkLock();
                    System.out.println("线程2释放锁");
    
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).start();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    6.2、curator

    引入POM依赖

    <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-client</artifactId>
                <version>4.3.0</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    使用

    package com.xx;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    /**
     * @author aqi
     * @describe
     * @since 2023/11/13 15:29
     */
    public class CuratorLockTest {
    
        private static CuratorFramework getCuratorFramework() {
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .connectionTimeoutMs(2000)
                    .sessionTimeoutMs(2000)
                    .retryPolicy(new ExponentialBackoffRetry(3000, 3)).build();
            client.start();
            System.out.println("zk启动成功...");
            return client;
        }
    
        public static void main(String[] args) {
            // 创建锁1
            InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
            // 创建锁2
            InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
    
            new Thread(() -> {
                try {
                    lock1.acquire();
                    System.out.println("线程1获取到锁");
    
                    lock1.acquire();
                    System.out.println("线程1再次获取到锁");
    
                    Thread.sleep(5000);
    
                    lock1.release();
                    System.out.println("线程1释放锁");
    
                    lock1.release();
                    System.out.println("线程1再次释放锁");
    
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
    
            }).start();
    
            new Thread(() -> {
                try {
                    lock2.acquire();
                    System.out.println("线程2获取到锁");
    
                    lock2.acquire();
                    System.out.println("线程2再次获取到锁");
    
                    Thread.sleep(5000);
    
                    lock2.release();
                    System.out.println("线程2释放锁");
    
                    lock2.release();
                    System.out.println("线程2再次释放锁");
    
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
    
            }).start();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
  • 相关阅读:
    总结:Spring Boot之@Value
    JAX的深度学习和科学计算
    elasticsearch7 实战应用
    vue3和vue2生命周期​
    Vue简介
    长胜证券:美科技股大跌,甲骨文一夜蒸发超3000亿
    卷积神经网络的基本操作,卷积神经网络百度百科
    2022-2023罗戈物流行业年度报告,物流人必看报告!
    【0229】libpq库实现压测PG服务器max_connections的最大连接数
    Wps Excel 如何时间格式化
  • 原文地址:https://blog.csdn.net/progammer10086/article/details/134039286