• 基于zk的分布式锁使用及原理分析


    Zookeeper 版本 3.4.12

    curator-framework、curator-recipes 版本 2.13.0

    curator版本要与zk版本对应,不然启动会报错

    使用示例

    先看一个基于zk实现分布式锁的例子

    public static void main(String[] args) throws InterruptedException {
    
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
                    .retryPolicy(retryPolicy)
                    .connectionTimeoutMs(4000).build();
    
            zkClient.start();
    
            InterProcessMutex lock = new InterProcessMutex(zkClient, "/lock");
    				//倒计时 子线程执行完毕后 主线程才结束
            CountDownLatch countDownLatch=new CountDownLatch(10);
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    try {
                      //获取锁
                        lock.acquire();
                        System.out.println(Thread.currentThread().getName()+"===>获取锁");
                      //模拟业务执行耗时
                        Thread.sleep(4000);
                      //释放锁
                        lock.release();
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                },"Thread-"+i).start();
            }
    
            countDownLatch.await();
            System.out.println("执行结束~~~");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    我们知道,zk有几种节点类型:持久化节点、临时节点、持久化有序节点、临时有序节点。这里用到的是临时有序节点。

    分布式锁流程分析

    我们结合上面的代码以5个线程抢占分布式锁为例图表方式 简要说明下锁的获取与释放过程

    1、首先会将**/lock**作为父节点 创建5个临时有序子节点

    在这里插入图片描述

    2、序号最小的子节点对应的线程先获取锁,为避免惊群效应其余的子节点 监听前一个节点(序号最小的子节点不监听节点了),然后对应的线程进行阻塞

    在这里插入图片描述

    图中 标红的子节点对应的线程 进行阻塞,标绿的子节点 对应的线程代表获取到了锁

    3、当xxx–lock-0000000001节点对应线程释放锁后,当前节点会被删除,后一个节点对应的线程因为watch机制 会被唤醒

    在这里插入图片描述

    如上xxx–lock-0000000001节点会被删除,并唤醒xxx–lock-0000000002对应的线程。

    4、以此类推 后面的线程大体按照这个流程去获取锁

    源码分析

    InterProcessMutex类
        
    		private final LockInternals internals;
        //基准路径 会根据这个路径创建临时有序节点
    		private final String basePath;
        //存储线程与锁信息的对应关系
        private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;
        //锁的名称
        private static final String LOCK_NAME = "lock-";
    
    		//client zk客户端对象
    		public InterProcessMutex(CuratorFramework client, String path) {
            this(client, path, new StandardLockInternalsDriver());
        }
    
        public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
            this(client, path, "lock-", 1, driver);
        }
    		//maxLeases=1 这个在后面会起到作用
    		InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, 			LockInternalsDriver driver) {
            this.threadData = Maps.newConcurrentMap();
            this.basePath = PathUtils.validatePath(path);
            this.internals = new LockInternals(client, driver, path, lockName, maxLeases);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    acquire加锁分析

    在这里插入图片描述

    上面给出了acquire的调用时序图 我们主要从internalLock方法开始看起

    public void acquire() throws Exception {
      			//这里直接调用internalLock方法,传入时间-1 表示不设置超时时间
            if (!this.internalLock(-1L, (TimeUnit)null)) {
                throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
      			//获取当前线程
            Thread currentThread = Thread.currentThread();
      			//从map中查找当前线程的锁信息
            InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
      			//如果查找的到 则将加锁次数+1 从这里可以看出zk的锁也是可重入的
            if (lockData != null) {
                lockData.lockCount.incrementAndGet();
                return true;
            } else {
              //尝试获取锁 并返回锁路径
                String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
              	//如果路径不为空 说明当前线程抢占到锁 因此在map中维护当前线程与锁信息的对应关系
                if (lockPath != null) {
                    InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
                    this.threadData.put(currentThread, newLockData);
                    return true;
                } else {
                    return false;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    LockInternals

    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
      			//获取当前时间 
            long startMillis = System.currentTimeMillis();
      			//如果设置了超时时间 这里会对超时时间进行单位换算,由于我们未设置 这里返回null
            Long millisToWait = unit != null ? unit.toMillis(time) : null;
      			//我们也没有为临时节点设置值 因此lockNodeBytes为null,localLockNodeBytes也为空
            byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
            int retryCount = 0;
            String ourPath = null;
      			//是否持有了锁
            boolean hasTheLock = false;
      			//是否操作已完成
            boolean isDone = false;
    
            while(!isDone) {
                isDone = true;
    
                try {
                  	//这里每个线程会基于path生成一个属于当前线程的临时有序节点,并返回节点路径 如/lock/xxx--lock-0000000002
                    ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
                  	//抢锁及监听操作 在这里
                    hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
                } catch (NoNodeException var14) {
                    if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                        throw var14;
                    }
    
                    isDone = false;
                }
            }
    
            return hasTheLock ? ourPath : null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
            boolean haveTheLock = false;
            boolean doDelete = false;
    
            try {
                if (this.revocable.get() != null) {
                    ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
                }
    						//如果当前客户端是启动状态 并且当前线程未获取锁
                while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
                  	//这里会将basePath下的所有节点取到 并按照自然顺序进行排序,集合中字符串格式形如/xxx--lock-0000000002
                    List<String> children = this.getSortedChildren();
                    //这里将ourPath中前缀去掉,如/lock/xxx--lock-0000000002==>xxx--lock-0000000002
                    String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
                    PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
                    //如果当前节点前没有字节点了 那么会获取锁 这里将haveTheLock标记为true
                    if (predicateResults.getsTheLock()) {
                        haveTheLock = true;
                    } else {
                       //如果未获取锁 拼接待监听的节点路径
                        String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                        synchronized(this) {
                            try {
                               //当前子节点监听previousSequencePath这个路径
                              ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                              	//未设置超时时间 直接阻塞线程  
                              if (millisToWait == null) {
                                  
                                    this.wait();
                                } else {
                                //设置了超时时间
                                    millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                                    startMillis = System.currentTimeMillis();
                                		//如果未超时 那么线程阻塞指定时间,否则超时后,会将当前子节点进行删除,可参见finally语句块逻辑
                                    if (millisToWait > 0L) {
                                        this.wait(millisToWait);
                                    } else {
                                        doDelete = true;
                                        break;
                                    }
                                }
                            } catch (NoNodeException var19) {
                            }
                        }
                    }
                }
            } catch (Exception var21) {
                ThreadUtils.checkInterrupted(var21);
                doDelete = true;
                throw var21;
            } finally {
                if (doDelete) {
                    this.deleteOurPath(ourPath);
                }
    
            }
    				//返回是否获取锁的标记
            return haveTheLock;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    StandardLockInternalsDriver

     public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
       			//children子节点列表是有序的,这里是查找当前子节点序号在children列表中的位置
            int ourIndex = children.indexOf(sequenceNodeName);
       			//如果未找到sequenceNodeName 则outIndex=-1这里主要对这种情况进行校验
            validateOurIndex(sequenceNodeName, ourIndex);
       			//这里就是获取锁的关键 前面提到过maxLeases=1,如果getsTheLock=true,就只有ourIndex=0,也从侧面印证了 每次只能序号最小的节点关联的线程才能获取锁
            boolean getsTheLock = ourIndex < maxLeases;
       			//如果未获取到锁 那就找一下当前子节点的前一个节点的路径 作为被监听的对象
            String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);
            return new PredicateResults(pathToWatch, getsTheLock);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    **acquire(long time, TimeUnit unit)**带超时时间的获取锁流程 与之有些不同:当超时时间内未获取到锁,则将当前子节点删除,下一个子节点继续尝试获取锁

    release 释放锁

    zk分布式锁是可重入的,因此在释放锁时 只有当重入次数减为0 才真正地去释放锁

    public void release() throws Exception {
      			//获取当前线程
            Thread currentThread = Thread.currentThread();
      			//获取当前线程的锁信息,通过acquire代码可知 只有当前线程获取了锁,才能有锁信息
            InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
            if (lockData == null) {
                throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
            } else {
              	//重入次数-1
                int newLockCount = lockData.lockCount.decrementAndGet();
                if (newLockCount <= 0) {
    								//异常情况判断
                    if (newLockCount < 0) {
                        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
                    } else {
                        try {
                          	//当重入次数=0 时 释放锁
                            this.internals.releaseLock(lockData.lockPath);
                        } finally {
                          	//最后 清除该线程的锁信息
                            this.threadData.remove(currentThread);
                        }
    
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    LockInternals

    void releaseLock(String lockPath) throws Exception {
            this.revocable.set((Object)null);
      			//这里实际上是删除当前被加锁的子节点
            this.deleteOurPath(lockPath);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    由上面可知,因为后一个节点会监听当前节点,因此当持有锁的节点被删除后会被监听到,触发监听逻辑

    private synchronized void notifyFromWatcher() {
      			//唤醒线程
            this.notifyAll();
        }
    
    • 1
    • 2
    • 3
    • 4

    触发监听逻辑后,当前节点的后一个节点所关联的线程将被唤醒,然后继续执行LockInternals#internalLockLoop方法获取锁。

    以上 就是zk实现分布式锁的主要逻辑了。带超时时间的锁实现逻辑没有详细地分析,其实现逻辑大体类似 读者可自行阅读分析。

  • 相关阅读:
    程序员必备,一款让你提高工作效率N倍的神器uTools
    腾讯云轻量应用服务器搭建跨境电商的方法步骤(非常详细)
    oracle_申明与赋值
    Utilizing Transformer Representations Efficiently
    Android退出应用后是否需要关闭数据库?
    “一键合并剪辑,轻松添加片头——全新的视频编辑工具让你成为视频制作达人“
    IDEA XML文件里写SQL比较大小条件
    Python3 module 'cups' has no attribute 'Connection'
    基于微信小程序的线上课堂系统
    Windows 11 Ubuntu子系统所在磁盘位置、访问windows磁盘、另一个程序正在使用文件进程无法访问解决方法
  • 原文地址:https://blog.csdn.net/zyxwvuuvwxyz/article/details/126139203