• zk常用命令,zk分布式job, zk分布式锁,通俗易懂笔记


    Zookeeper

    概述

    • 是什么

      • 节点数据管理软件, 可以创建, 修改, 删除节点或者节点数据. 客户端连接zookeeper, 就可以实现对节点的一致性共享
    • 干什么

      • 实现数据一致性共享, 用于服务注册与发现(Dubbo就是用Zookeeper实现服务注册与发现), 实现分布式锁, 消息中间件入Kafka的底层实现
    • 怎么用

      1. 安装zookeeper软件 , 启动zookeeper服务, 或者zookeeper服务集群搭建与开启
      2. 对各个需要使用zookeeper功能的微服务添加依赖(一般使用zookeeperClient), 配置文件添加zookeeper 单个服务(或者集群)的ip和端口.
      3. 使用ZkClient对节点的创建, 修改, 删除, 监听. 完成特定功能

    内部结构

    • 每一个节点可以抽象的理解为一个目录,

      • 在这里插入图片描述

      • 节点的类型

        • 临时节点

          • 会在客户端会话断开后自动删除。适用于心跳,服务发现等场景
        • 持久节点

          • 持久化保存的节点,也是默认创建的
        • 序列化节点

          • 序列化临时节点

            • 创建时zookeeper 会在路径上加上序号作为后缀,会在客户端会话断开后自动删除
          • 序列化持久节点

            • 创建时zookeeper 会在路径上加上序号作为后缀,。非常适合用于分布式锁、分布式选举等场景。创建时添加 -s 参数即可

            • #创建序号节点
              create -s /test
              #返回创建的实际路径
              Created /test0000000001
              create -s /test
              #返回创建的实际路径2
              Created /test0000000002
              
              • 1
              • 2
              • 3
              • 4
              • 5
              • 6
              • 7
      • 注意细节

        • 节点可以只有一个节点名称(相等于空目录), 也可以有节点内容.
        • 节点里面可以继续包含节点(临时节点只能是叶子节点)
        • 可以对节点做一些监听, 来实现各种功能, 比如对节点的监听(监听创建, 修改(节点本身修改/节点的子节点的创建或修改)), 节点数据的监听(节点数据的修改), 节点属性的监听(很少用)

    常用命令

    • 创建节点

      • create [-s] [-e] [-c] [-t ttl] path [data] [acl]
        • -s 序列化节点
        • -e 临时节点
    • 查看节点

      • 查看节点数据
        • get [-s] [-w] path
          • -s 包含节点状态 -w 添加监听
      • 列出子节点
        • ls [-s] [-w] [-R] path
          • -s状态 -R 递归查看所有子节点 -w 添加监听
    • 删除节点

      • delete [-v version] path
        删除节点,(不能存在子节点)
      • deleteall path
        删除路径及所有子节点
    • 监听节点

      • 客户添加 -w 参数可实时监听节点与子节点的变化,并且实时收到通知。非常适用保障分布式情况下的数据一至性。其使用方式如下:

        命令描述
        ls -w path监听子节点的变化(增,删)
        get -w path监听节点数据的变化
        stat -w path监听节点属性的变化
        printwatches on|off触发监听后,是否打印监听事件(默认on)

    集群

    • 说是集群, 其实我个人觉得说是主从复制比较贴切
    • 原理
      • 启动多个zookeeper服务, 让他们相互关联(配置文件中每一个zookeeper服务都配有集群中每个其他zookeeper服务的ip和端口)
      • 把一些zookeeper服务设置成Leader, 一些设置成follower, 一些设置成observe(可有可无)
        • leader: 主节点相当于mysql中的master, 用于写操作
        • follower: 从节点相当于mysql中的slave, 用于读操作
        • observer: 旁观者节点(与follower节点不同, 他不用于选举, 所以叫他旁观者). 用于读操作
      • 通信机制: 现在就有了leader(一个)节点, 和若干个follower和observer节点, 他们直接的通信如图
        • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vVaDih7Q-1662916667930)(D:\all\思维导图\思维导图关联文件\markdownImg\image-20201101104053639.png)]
        • 如果客户端连接的是集群中的从节点
          • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ED5EHzUi-1662916667931)(D:\all\思维导图\思维导图关联文件\markdownImg\image-20201101104821617.png)]
        • 客户端写入流程说明:
          1. client向zk中的server发送写请求,如果该server不是leader,则会将该写请求转发给leader server,leader将请求事务以proposal形式分发给follower;
          2. 当follower收到收到leader的proposal时,根据接收的先后顺序处理proposal;
          3. 当Leader收到follower针对某个proposal过半的ack后,则发起事务提交,重新发起一个commit的proposal
          4. Follower收到commit的proposal后,记录事务提交,并把数据更新到内存数据库;
          5. 当写成功后,反馈给client。
      • 选举机制:
        • 触发条件
          1. 服务节点初始化启动
          2. 半数一上的从节点无法感知主节点
        • 选举说明
          1. 第一轮投票全部投给自己
          2. 第二轮投票给myid比自己大的相邻节点
          3. 如果得票超过半数,选举结束。
          4. image-20201101223104687

    实际应用

    • dubbo中使用zookeeper实现服务注册与发现

      • 目录结构
        • 在这里插入图片描述

        • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hkXyTSUL-1662916667934)(D:\all\思维导图\思维导图关联文件\markdownImg\image-20201101223751771.png)]

    • 分布式锁

      • 和分布式job一样, 也是创建序列化临时节点
      • 用zkclien 或者其他第三方写一个程序工具类,放在每一个分布式应用对应的服务中,其中包含lock, 和unlock 。 其业务实现逻辑如下
      1. 当调用lock方法是,如果不纯在, 就创建一个根目录zk持久节点 (比如就叫做 /lockRoot)
      2. 创建序列化临时节点 , 比如lockSeq (比如有3个应用调用了lock方法, 就会有lockSeq0000000000, lockSeq0000000001, lockSeq0000000002, 3个临时节点)
      3. 调用tryLock, 里面的实现是:获取根节点下的所有子节点, 判断最小的那个序列化临时节点是不是就是当前自己的节点 , 如果是, 那么获取成功, 如果不是,挂起当前线程, 监听他的上一个序列化临时节点, 如果上一个序列化临时节点释放了锁(也就是删除锁, 后面会说), 那么久唤醒单签线程
      4. 上面1,2,3就是加锁过程, 解锁如何实现呢?看如下
      5. 调用unlock 方法, 实现是: 删除当前正在占用的锁(也就是锁对应的序列化临时节点), 然后唤醒挂起的线程, 也就是第3步
      6. 代码如下:( create by 尼恩 @ 疯狂创客圈)
      • package com.crazymakercircle.zk.distributedLock;
        
        import com.crazymakercircle.zk.ZKclient;
        import lombok.extern.slf4j.Slf4j;
        import org.apache.curator.framework.CuratorFramework;
        import org.apache.zookeeper.WatchedEvent;
        import org.apache.zookeeper.Watcher;
        
        import java.util.Collections;
        import java.util.List;
        import java.util.concurrent.CountDownLatch;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.atomic.AtomicInteger;
        
        /**
         * create by 尼恩 @ 疯狂创客圈
         **/
        @Slf4j
        public class ZkLock implements Lock {
            //ZkLock的节点链接
            private static final String ZK_PATH = "/test/lock";
            private static final String LOCK_PREFIX = ZK_PATH + "/";
            private static final long WAIT_TIME = 1000;
            //Zk客户端
            CuratorFramework client = null;
        
            private String locked_short_path = null;
            private String locked_path = null;
            private String prior_path = null;
            final AtomicInteger lockCount = new AtomicInteger(0);
            private Thread thread;
        
            public ZkLock() {
                ZKclient.instance.init();
                if (!ZKclient.instance.isNodeExist(ZK_PATH)) {
                    ZKclient.instance.createNode(ZK_PATH, null);
                }
                client = ZKclient.instance.getClient();
            }
        
            @Override
            public boolean lock() {
        
                synchronized (this) {
                    if (lockCount.get() == 0) {
                        thread = Thread.currentThread();
                        lockCount.incrementAndGet();
                    } else {
                        if (!thread.equals(Thread.currentThread())) {
                            return false;
                        }
                        lockCount.incrementAndGet();
                        return true;
                    }
                }
        
                try {
                    boolean locked = false;
        
                    locked = tryLock();
        
                    if (locked) {
                        return true;
                    }
                    while (!locked) {
        
                        await();
        
                        //获取等待的子节点列表
        
                        List<String> waiters = getWaiters();
        
                        if (checkLocked(waiters)) {
                            locked = true;
                        }
                    }
                    return true;
                } catch (Exception e) {
                    e.printStackTrace();
                    unlock();
                }
        
                return false;
            }
        
        
            @Override
            public boolean unlock() {
        
                if (!thread.equals(Thread.currentThread())) {
                    return false;
                }
        
                int newLockCount = lockCount.decrementAndGet();
        
                if (newLockCount < 0) {
                    throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + locked_path);
                }
        
                if (newLockCount != 0) {
                    return true;
                }
                try {
                    if (ZKclient.instance.isNodeExist(locked_path)) {
                        client.delete().forPath(locked_path);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return false;
                }
        
                return true;
            }
        
            private void await() throws Exception {
        
                if (null == prior_path) {
                    throw new Exception("prior_path error");
                }
        
                final CountDownLatch latch = new CountDownLatch(1);
        
        
                //订阅比自己次小顺序节点的删除事件
                Watcher w = new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监听到的变化 watchedEvent = " + watchedEvent);
                        log.info("[WatchedEvent]节点删除");
        
                        latch.countDown();
                    }
                };
        
                client.getData().usingWatcher(w).forPath(prior_path);
        /*
                //订阅比自己次小顺序节点的删除事件
                TreeCache treeCache = new TreeCache(client, prior_path);
                TreeCacheListener l = new TreeCacheListener() {
                    @Override
                    public void childEvent(CuratorFramework client,
                                           TreeCacheEvent event) throws Exception {
                        ChildData data = event.getData();
                        if (data != null) {
                            switch (event.getType()) {
                                case NODE_REMOVED:
                                    log.debug("[TreeCache]节点删除, path={}, data={}",
                                            data.getPath(), data.getData());
        
                                    latch.countDown();
                                    break;
                                default:
                                    break;
                            }
                        }
                    }
                };
        
                treeCache.getListenable().addListener(l);
                treeCache.start();*/
                latch.await(WAIT_TIME, TimeUnit.SECONDS);
            }
        
            private boolean tryLock() throws Exception {
                //创建临时Znode
                List<String> waiters = getWaiters();
                locked_path = ZKclient.instance
                        .createEphemeralSeqNode(LOCK_PREFIX);
                if (null == locked_path) {
                    throw new Exception("zk error");
                }
                locked_short_path = getShorPath(locked_path);
        
                //获取等待的子节点列表,判断自己是否第一个
                if (checkLocked(waiters)) {
                    return true;
                }
        
                // 判断自己排第几个
                int index = Collections.binarySearch(waiters, locked_short_path);
                if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了
                    throw new Exception("节点没有找到: " + locked_short_path);
                }
        
                //如果自己没有获得锁,则要监听前一个节点
                prior_path = ZK_PATH + "/" + waiters.get(index - 1);
        
                return false;
            }
        
            private String getShorPath(String locked_path) {
        
                int index = locked_path.lastIndexOf(ZK_PATH + "/");
                if (index >= 0) {
                    index += ZK_PATH.length() + 1;
                    return index <= locked_path.length() ? locked_path.substring(index) : "";
                }
                return null;
            }
        
            private boolean checkLocked(List<String> waiters) {
        
                //节点按照编号,升序排列
                Collections.sort(waiters);
        
                // 如果是第一个,代表自己已经获得了锁
                if (locked_short_path.equals(waiters.get(0))) {
                    log.info("成功的获取分布式锁,节点为{}", locked_short_path);
                    return true;
                }
                return false;
            }
        
        
            /**
             * 从zookeeper中拿到所有等待节点
             */
            protected List<String> getWaiters() {
        
                List<String> children = null;
                try {
                    children = client.getChildren().forPath(ZK_PATH);
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
        
                return children;
        
            }
        
        
        }
        
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44
        • 45
        • 46
        • 47
        • 48
        • 49
        • 50
        • 51
        • 52
        • 53
        • 54
        • 55
        • 56
        • 57
        • 58
        • 59
        • 60
        • 61
        • 62
        • 63
        • 64
        • 65
        • 66
        • 67
        • 68
        • 69
        • 70
        • 71
        • 72
        • 73
        • 74
        • 75
        • 76
        • 77
        • 78
        • 79
        • 80
        • 81
        • 82
        • 83
        • 84
        • 85
        • 86
        • 87
        • 88
        • 89
        • 90
        • 91
        • 92
        • 93
        • 94
        • 95
        • 96
        • 97
        • 98
        • 99
        • 100
        • 101
        • 102
        • 103
        • 104
        • 105
        • 106
        • 107
        • 108
        • 109
        • 110
        • 111
        • 112
        • 113
        • 114
        • 115
        • 116
        • 117
        • 118
        • 119
        • 120
        • 121
        • 122
        • 123
        • 124
        • 125
        • 126
        • 127
        • 128
        • 129
        • 130
        • 131
        • 132
        • 133
        • 134
        • 135
        • 136
        • 137
        • 138
        • 139
        • 140
        • 141
        • 142
        • 143
        • 144
        • 145
        • 146
        • 147
        • 148
        • 149
        • 150
        • 151
        • 152
        • 153
        • 154
        • 155
        • 156
        • 157
        • 158
        • 159
        • 160
        • 161
        • 162
        • 163
        • 164
        • 165
        • 166
        • 167
        • 168
        • 169
        • 170
        • 171
        • 172
        • 173
        • 174
        • 175
        • 176
        • 177
        • 178
        • 179
        • 180
        • 181
        • 182
        • 183
        • 184
        • 185
        • 186
        • 187
        • 188
        • 189
        • 190
        • 191
        • 192
        • 193
        • 194
        • 195
        • 196
        • 197
        • 198
        • 199
        • 200
        • 201
        • 202
        • 203
        • 204
        • 205
        • 206
        • 207
        • 208
        • 209
        • 210
        • 211
        • 212
        • 213
        • 214
        • 215
        • 216
        • 217
        • 218
        • 219
        • 220
        • 221
        • 222
        • 223
        • 224
        • 225
        • 226
        • 227
        • 228
        • 229
        • 230
        • 231
        • 232
        • 233
        • 234
    • 分布式Job

      • 用zkclien 或者其他第三方写一个程序,放在每一个分布式job对应的服务中, 用来选举一个job作为Master, 让他跑定时任务。 其业务实现逻辑如下

        1. 多个服务节点只允许其中⼀个主节点运⾏JOB任务。
        2. 当主节点挂掉后能⾃动切换主节点,继续执⾏JOB任务。
      • 实现细节

        • 用zkclien 或者其他第三方写一个程序,放在每一个分布式job对应的服务中, 用来选举一个job作为Master, 让他跑定时任务。 其业务实现逻辑如下

          1. 创建一个根目录zk持久节点 (比如就叫做 /JobMaster)
          2. 创建根目录下的序列化临时节点( 比如叫做 /MasterCandidate)为什么是临时节点呢? 因为如果有服务宕机了, zk的临时节点会删除, 那么根节点就可以感知(监听)他的变化, 切设置节点内容为candidateNode
            • 每个分布式job 对应的服务都会创建这么一个序列化临时节点。 比如有三个, 那么就会创建 /MasterCandidate001 , /MasterCandidate002, /MasterCandidate003
          3. 选举master
            • 用CAS的方式/ 或者用zk的方法监听根目录都是一样可以的, 不断监听根节点, 遍历根节点
              • 如果根节点中有值为 masterNode, 那么节点就作为master
              • 如果根节点没有值为masterNode, 那么用节点序列号最小的节点, 他的节点值由candidateNode 编程 masterNode
          4. 每个分布式锁应用, 在执行业务逻辑的时候, 先要判断自己是不是master(怎么判断? 方式有很多, 比如可以把服务自己的IP也写到zk里面比如“masterNode:192.168.122.111”), 如果是, 就执行, 如果不是, 就不执行
  • 相关阅读:
    【Mysql】什么是大事务?以及大事务产生的问题
    SLF4J: Class path contains multiple SLF4J bindings.
    vue2项目封装axios(vite打包)
    华为服务器安装操作系统
    6、Nacos服务多级存储模型
    微信如何实现自动转发朋友圈(跟圈)?
    Linux之并发竞争管理
    在windows下CorelDraw中VBE的调用原理?
    Bash: export:”=”不是有效的标识符;Bash:cyber_launch command not found
    JavaScript01(JavaScript入门语法)
  • 原文地址:https://blog.csdn.net/biubiubiubibibi/article/details/126811853