• Zookeeper原生API和Curator两种方式实现ZK分布式锁


    一、简介

    Zookeeper如何实现分布式锁
    本文记录一下如何使用Zookeeper原生APICurator的API两种方式实现分布式锁。

    CuratorNetflix开源的一套ZooKeeper客户端框架

    项目:Maven
    项目依赖:
    因为在curator-framework依赖中是包含了zk的依赖了,所以不需要再加入zk的依赖。

     <dependency>
          <groupId>org.apache.curatorgroupId>
          <artifactId>curator-frameworkartifactId>
          <version>5.1.0version>
      dependency>
      <dependency>
          <groupId>org.apache.curatorgroupId>
          <artifactId>curator-recipesartifactId>
          <version>5.1.0version>
      dependency>
      <dependency>
          <groupId>org.apache.curatorgroupId>
          <artifactId>curator-testartifactId>
          <version>5.1.0version>
      dependency>
      <dependency>
            <groupId>org.projectlombokgroupId>
            <artifactId>lombokartifactId>
            <version>1.18.12version>
      dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    curator-test 主要是用来模拟zk环境,如果没有部署zk环境的话可以使用curator-test的TestingServer来模拟ZK:

        public CuratorFramework curatorClient() throws Exception {
    	TestingServer testingServer = new TestingServer(2181, new File("zk-data"));
            CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()           
            	    .connectString(testingServer.getConnectString())//使用TestingServer模拟zk环境(无需部署zk环境)
                    .connectionTimeoutMs(15 * 1000)
                    //会话超时时间
                    .sessionTimeoutMs(60 * 1000)
                    //设置重试机制
                    .retryPolicy(new ExponentialBackoffRetry(10*1000,3))
                    .build();
            curatorFramework.start();
            return curatorFramework;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    二、zookeeper实现分布式锁的原理

    使用zookeeper来实现分布式锁主要是使用zk的临时序列节点
    zk的节点类型有四种:

    1. 临时节点
    2. 持久节点
    3. 临时序列节点
    4. 持久序列节点

    即:临时节点、持久节点和序列节点(可以是临时节点也可以是持久节点)

    而要实现分布式锁主要借助的是 临时序列节点,临时节点可以在Session会话结束后自动删除,序列节点可以按照顺序创建出节点,如下图所示,在/lock节点下创建path为hello的序列节点:节点会自动按顺序创建出带编号:
    在这里插入图片描述
    所以实现zk分布式锁的原理就是:
    多个线程争抢一把锁:每个线程去zk创建临时序列节点,利用zookeeper的watch机制,每个线程只监听他前面的节点,如果自己是这把锁的第一个位置,则执行,否则等待。当第一把锁的任务执行完成后,释放锁,即删除节点,这个事件只会被第二个节点监听到,所以只会有第二个节点收到通知,获得锁。这样的好处是不会通知到所有的节点去争夺锁,
    释放锁:第一把锁的任务执行完成后,会删除节点。当Session会话断开后,临时节点会被自动删除,所以也避免了死锁的问题。

    注意:所以使用Zookeeper实现的分布式锁是公平锁

    三、使用Zookeeper原生API实现分布式锁

    import lombok.Data;
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @Author: LiuShihao
     * @Date: 2022/11/2 15:38
     * @Desc: 直接使用Zookeeper原生API实现分布式锁
     */
    @Data
    public class ZookeeperLock implements Watcher,AsyncCallback.StringCallback, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback {
        ZooKeeper zk;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        String lockName ;
        String threadName;
        String lockRoot = "/lock";
    
        public ZookeeperLock(ZooKeeper zk, String threadName) {
            this.zk = zk;
            this.threadName = threadName;
        }
        /**
         * 使用异步方式创建临时序列节点,阻塞
         */
        public void tryLock(String name){
            try {
                zk.create(lockRoot+"/"+name,threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,threadName);
                countDownLatch.await();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        /**
         * 删除临时节点,触发watch delete事件
         */
        public void unLock(){
            try {
                zk.delete(lockRoot+lockName,-1);
                System.out.println(threadName + " over work....");
            }catch (Exception e){
                e.printStackTrace();
            }
            
        }
        /**
         * create callback
         * @param rc
         * @param path
         * @param ctx
         * @param name
         */
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            if (name != null){
                System.out.println(ctx.toString()+" create path: "+ name);
                lockName = name.substring(5);
                zk.getChildren(lockRoot, false, this, ctx );
            }
        }
        /**
         * getChildren callback
         * @param rc
         * @param path
         * @param ctx
         * @param children
         */
        @Override
        public void processResult(int rc, String path, Object ctx, List<String> children) {
            Collections.sort(children);
            int i = children.indexOf(lockName.substring(1));
            if(i == 0){
                //是当前第一位置, countDownLatch放行
                System.out.println(threadName +" i am first....");
                try {
                    zk.setData(lockRoot,threadName.getBytes(),-1);
                    countDownLatch.countDown();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{
                //不是,检查前一个是否还存在?
                zk.exists(lockRoot+"/"+children.get(i-1),this,this,"abc");
            }
        }
    
        /**
         * exists callback
         * @param rc
         * @param path
         * @param ctx
         * @param stat
         */
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            //todo
        }
    
        /**
         * watch event callback
         * 如果第一线程,锁释放了,其实只有第二个节点收到了回调事件!!
         * 如果不是第一个节点主动释放锁,而是某一个节点挂了(session断开连接,临时节点自动被删除),也能造成后边的节点收到这个通知,从而让他后边的节点那个跟去watch挂掉这个节点前边的
         * @param event
         */
        @Override
        public void process(WatchedEvent event) {
            System.out.println("watch event: "+event.getPath()+" "+event.getType());
            switch (event.getType()) {
                case None:
                    break;
                case NodeCreated:
                    break;
                case NodeDeleted:
                    zk.getChildren(lockRoot,false,this ,"abc");
                    break;
                case NodeDataChanged:
                    break;
                case NodeChildrenChanged:
                    break;
                case DataWatchRemoved:
                    break;
                case ChildWatchRemoved:
                    break;
                case PersistentWatchRemoved:
                    break;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133

    测试:

    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @Author: LiuShihao
     * @Date: 2022/11/2 16:03
     * @Desc:
     */
    public class ZkLockTest {
    
        public static ZooKeeper getZkClient()throws Exception{
            String connectionString = "192.168.153.131:2181";
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(connectionString,30*1000 , new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();
            return zooKeeper;
        }
    
        public static void main(String[] args) throws Exception {
            ZooKeeper zkClient = ZkLockTest.getZkClient();
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    try {
                        String threadName = Thread.currentThread().getName();
                        ZookeeperLock lock = new ZookeeperLock(zkClient,threadName);
                        lock.tryLock("lock");
                        Thread.sleep(2000);
                        lock.unLock();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
            System.in.read();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    四、使用Curator实现分布式锁

    使用Curator实现分布式锁就简单的多了,因为curator已经帮我们实现了分布式锁:InterProcessLock 类。

    import lombok.extern.slf4j.Slf4j;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessLock;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    /**
     * @Author: LiuShihao
     * @Date: 2022/10/31 17:46
     * @Desc: 使用Curator InterProcessLock 实现分布式锁(公平锁)
     * 实现Zookeeper分布式锁,主要是基于Zookeeper的 临时序列节点来实现的。
     * 1. 临时节点,指的是节点创建后,如果创建节点的客户端和 Zookeeper 服务端的会话失效(例如断开连接),那么节点就会被删除。
     * 2. 持久节点指的是节点创建后,即使创建节点的客户端和 Zookeeper 服务端的会话失效(例如断开连接),节点也不会被删除,只有客户端主动发起删除节点的请求,节点才会被删除。
     * 3. 序列节点,这种节点在创建时会有一个序号,这个序号是自增的。序列节点既可以是临时序列节点,也可以是持久序列节点。
     *
     * 临时序列实现分布式锁原理:
     * 当客户端来加锁的时候,会先在加锁的节点下建立一个子节点,这个节点就有一个序号,类似 lock-000001 ,
     * 创建成功之后会返回给客户端所创建的节点,然后客户端会去获取这个加锁节点下的所有客户端创建的子节点,当然也包括自己创建的子节点。
     * 拿到所有节点之后,给这些节点进行排序,然后判断自己创建的节点在这些节点中是否排在第一位,
     * 如果是的话,那么就代表当前客户端就算加锁成功了,如果不是的话,那么就代表当前客户端加锁失败。
     * 加锁失败的节点并不会不停地循环去尝试加锁,而是在自己创建节点的前一个节点上加一个监听器,然后就进行等待。
     * 当前面一个节点释放了锁,就会反过来通知等待的客户端,然后客户端就加锁成功了。
     *
     * 从这里可以看出redis和zk防止死锁的实现是不同的,redis是通过过期时间来防止死锁,而zk是通过临时节点来防止死锁的。
     *
     * 为什么使用顺序节点?其实为了防止羊群效应。
     * 如果没有使用顺序节点,假设很多客户端都会去加锁,那么加锁就会都失败,都会对加锁的节点加个监听器,
     * 那么一旦锁释放,那么所有的加锁客户端都会被唤醒来加锁,那么一瞬间就会造成很多加锁的请求,增加服务端的压力。
     *
     * zk实现的分布式锁是公平的吗?
     * 其实使用临时顺序节点实现的分布式锁就是公平锁。所谓的公平锁就是加锁的顺序跟成功加锁的顺序是一样的。
     * 因为节点的顺序就是被唤醒的顺序,所以也就是加锁的顺序,所以天生就是公平锁。
     */
    @Slf4j
    public class CuratorLock {
        public CuratorFramework getCuratorFramework(){
            String connectionString = "192.168.153.131:2181";
            ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE);
            CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
            curatorFramework.start();
            return curatorFramework;
        }
        public static void main(String[] args) throws Exception {
            String lock = "/lock";
            //获得两个客户端
            CuratorFramework client1 = new CuratorLock().getCuratorFramework();
            CuratorFramework client2 = new CuratorLock().getCuratorFramework();
            final InterProcessLock lock1 = new InterProcessMutex(client1, lock);
            final InterProcessLock lock2 = new InterProcessMutex(client2, lock);
            //模拟两个线程
            new Thread(() -> {
                try {
                    //线程加锁
                    lock1.acquire();
                    System.out.println("线程1获取锁");
                    //线程沉睡
                    Thread.sleep(5*1000);
                    //线程解锁
                    lock1.release();
                    System.out.println("线程1释放了锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
            //线程2
            new Thread(() -> {
                //线程加锁
                try {
                    lock2.acquire();
                    System.out.println("线程2获取到锁");
                    //线程沉睡
                    Thread.sleep(5*1000);
                    lock2.release();
                    System.out.println("线程2释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    源码

    Github:https://github.com/Liu-Shihao/springboot-curator

    文章参考

    https://baijiahao.baidu.com/s?id=1735496665409408331&wfr=spider&for=pc

    https://blog.csdn.net/weixin_47025166/article/details/125451987

  • 相关阅读:
    Python入门之模块
    2022-11-27 ARM- 用C语言实现stm32的三盏灯的点亮
    存储器和CPU的连接与TCP的流量控制
    基于STM32设计的室内环境监测系统(华为云IOT)_2023
    【c++】new一个新数组时数组地址变化的现象
    STM32F4-TFT-SPI时序逻辑分析仪调试记录
    《统计学习方法》 第4章 朴素贝叶斯法
    流程图高级用法【Markdown进阶篇】
    Linux之FinalShell的安装和使用
    某农信企业自主创新自动化安全基线检测平台建设实践
  • 原文地址:https://blog.csdn.net/DreamsArchitects/article/details/127649541