• zookeeper分布式锁demo


    思路

    加锁

    首先,在Zookeeper当中创建一个持久节点lock,当第一个客户端想要获得锁时,需要在lock这个节点下面创建一个临时顺序节点 。

    之后,Client1查找lock下面所有的临时顺序节点并排序,判断自己所创建的节点是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。

    如果再有一个客户端 Client2 前来获取锁,则在lock下再创建一个临时顺序节点。Client2查找lock下面所有的临时顺序节点并排序,判断自己所创建的节点是不是顺序最靠前的一个,结果发现节点Lock2并不是最小的。于是,Client2向排序仅比它靠前的节点注册Watcher,用于监听前一个节点是否存在。这意味着Client2抢锁失败,进入了等待状态。

    这时候,如果又有一个客户端Client3前来获取锁,则在再创建一个临时顺序节点并重复Client2的操作,监听Client2创建的节点。

    这样一来,Client1得到了锁,Client2监听了Lock1,Client3监听了Lock2。形成一个等待队列

    释放锁

    释放锁分为两种情况:

    1.任务完成,客户端显式释放

    当任务完成时,Client1会显示调用删除对应节点的指令。

    2.任务执行过程中,客户端崩溃

    获得锁的Client1在任务执行过程中如果崩溃,则会断开与Zookeeper服务端的链接。根据临时节点的特性,相关联的节点会随之自动删除。

    由于Client2一直监听着Client1节点的存在状态,当该节点被删除时,*Client2会立刻收到通知。这时候Client2会再次查询lock下面的所有节点,确认自己创建的节点是不是目前最小的节点。*如果是最小,则Client2顺理成章获得了锁。

    同理,如果Client2也因为任务完成或者节点崩溃而删除了自己的节点,那么Client3就会接到通知。

    代码实现

    DLock

    package com.th.zkdistributedlock;
    
    /**
     * 锁操作接口
     */
    public interface DLock {
        /**
         * 阻塞获取锁
         * @return
         */
        boolean lock();
    
    
        /**
         * 释放锁
         * @return
         */
        boolean unlock();
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    ZkLock

    package com.th.zkdistributedlock;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 锁方法实现类
     */
    public class ZkLock implements DLock {
    
        // 根节点绝对路径
        private String path;
    
        // 根节点名称
        private String name;
    
        // 临时有序节点绝对路径
        private String lockPath ;
    
        private ZooKeeper zk;
    
        // 锁重入计数
        private int state ;
    
        public ZkLock(String path, String name, ZooKeeper zk) {
            this.path = path;
            this.name = name;
            this.zk = zk;
            this.state=0;
        }
    
        /**
         * 加锁操作,锁重入如果成功state+1
         * @return
         */
        public boolean lock() {
            boolean flag= lockInternal();
            if(flag){
                state++;
            }
            return flag;
        }
    
        /**
         * 加锁实现
         * @return
         */
        private  boolean lockInternal(){
            try {
                // 创建临时有序节点
                String result = zk.create(getPath(), "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                this.lockPath = result;
                // 获取主节点下所有子节点并排序
                List<String> waits = zk.getChildren(path, false);
                Collections.sort(waits);
                // 将当前创建的节点路径转为数组
                String[] paths=result.split("/");
                // 获取当前子节点name
                String curNodeName =  paths[paths.length-1];
                // 如果当前节点是第一个子节点
                if (waits.get(0).equalsIgnoreCase(curNodeName)) {
                    return true;
                }
                CountDownLatch latch = new CountDownLatch(1);
                for (int i = 0; i < waits.size(); i++) {
                    String cur = waits.get(i);
                    // 遍历所有子节点,直到找到当前创建的节点
                    if (!cur.equalsIgnoreCase(curNodeName)) {
                        continue;
                    }
                    // 给当前节点的前一个节点添加watcher,监控该节点
                    String prePath = path+"/"+waits.get(i - 1);
                    zk.exists(prePath, new Watcher() {
                        public void process(WatchedEvent event) {
                            if (event.getType().equals(Event.EventType.NodeDeleted))
                            latch.countDown();
                        }
                    });
                    break;
                }
                latch.await();
                return true;
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
            return false;
        }
    
        private String getPath() {
            return path+"/"+name;
        }
    
        public boolean unlock() {
            if(state>1){
                state--;
                return true;
            }
            try {
                Stat stat=zk.exists(lockPath,false);
                int version= stat.getVersion();
                zk.delete(lockPath,version);
                state--;
                return true;
            } catch (Exception e) {
                System.out.println("unlock:"+lockPath+" ,exception,");
            }
            return false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113

    DLockFactory

    package com.th.zkdistributedlock;
    
    /**
     * 持久节点创建工厂
     */
    public interface DLockFactory {
    
        /**
         * 拿到对象
         * @param key
         * @return
         */
        DLock getLock(String key);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    ZkDLockFactory

    package com.th.zkdistributedlock;
    
    import lombok.Getter;
    import lombok.Setter;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    /**
     * 持久节点实现类
     */
    @Setter
    @Getter
    public class ZkDLockFactory implements DLockFactory {
    
    
        private String DLOCK_ROOT_PATH="/testLock";
        private ZooKeeper zooKeeper;
    
        public DLock getLock(String key) {
            String path = getPath(key);
            try {
                zooKeeper.create(path,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (Exception e) {
    
            }finally {
                try {
                    Stat stat= zooKeeper.exists(path,false);
                    if(stat == null){
                        return null;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
            DLock lock = new ZkLock(path,key,zooKeeper);
            return lock;
        }
    
        private String getPath(String key) {
            return DLOCK_ROOT_PATH+"/"+key;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    DLockTest

    package com.th.zkdistributedlock;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.Objects;
    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 测试类
     */
    public class DLockTest {
    
        public final static Random random  = new Random();
    
        public static void main(String[] args) {
            ZooKeeper zk  = getZkClient();
    
            CountDownLatch latch = new CountDownLatch(10);
            ZkDLockFactory factory = new ZkDLockFactory();
            factory.setZooKeeper(zk);
            for(int i = 0; i<10; i++){
                int finalI = i;
                Thread t = new Thread(()->{
                    exec(factory);
                    System.out.println("Thread_"+ finalI +"释放锁完成");
                    latch.countDown();
                },"Thread_"+i);
                t.start();
            }
            try {
                latch.await();
                TimeUnit.SECONDS.sleep(5);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("测试完成");
        }
    
        /**
         * 加锁释放锁操作
         * @param factory
         */
        public static void exec(ZkDLockFactory factory){
            DLock lock=factory.getLock("lock");
            if (Objects.nonNull(lock)) {
                System.out.println("Thread:"+Thread.currentThread().getName()+",尝试获取锁");
                boolean flag=lock.lock();
                System.out.println("Thread:"+Thread.currentThread().getName()+",尝试获取锁,结果:"+flag);
    
                try {
                    TimeUnit.MILLISECONDS.sleep(random.nextInt(30));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lock.unlock();
                System.out.println("Thread:"+Thread.currentThread().getName()+",释放锁锁");
            } else {
                System.out.println("获取持久节点失败");
            }
        }
    
        /**
         * 获取链接
         * @return
         */
        public static ZooKeeper getZkClient(){
            try {
                ZooKeeper zooKeeper = new ZooKeeper("192.168.137.142:2181", 200000, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if(event.getState() == Event.KeeperState.SyncConnected){
                            System.out.println("连接成功");
                        }
                    }
                });
                return zooKeeper;
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    依赖

    
    
        4.0.0
        
            org.springframework.boot
            spring-boot-starter-parent
            2.4.12
             
        
        com.th
        zkDistributedLock
        0.0.1-SNAPSHOT
        zkDistributedLock
        Demo project for Spring Boot
        
            1.8
        
        
            
                org.springframework.boot
                spring-boot-starter-web
            
    
            
                org.projectlombok
                lombok
                true
            
            
                org.springframework.boot
                spring-boot-starter-test
                test
            
            
                org.apache.zookeeper
                zookeeper
                3.7.0
            
            
                com.alibaba
                druid
                1.2.11
            
            
                junit
                junit
            
            
                junit
                junit
            
        
    
        
            
                
                    org.springframework.boot
                    spring-boot-maven-plugin
                    
                        
                            
                                org.projectlombok
                                lombok
                            
                        
                    
                
            
        
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    测试结果

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    如有不足欢迎指出,如有问题随时提问

  • 相关阅读:
    「互动有礼,感谢有你」参与互动就有机会获赠 Navicat Premium 16
    PLC-Recorder高速采集西门子S7-300(400) PLC数据的方法(开放以太网协议)
    在 Mac 客户端 C++ 代码中使用 breakpad
    linux安装宝塔
    Linux下安装mongodb详细教程
    PSI-BLAST位点特异性矩阵PSSM和ProteinMPNN中氨基酸顺序映射
    某大型国有银行 VMware 替换与轻量信创云底座转型实践 |信创专题
    缓存篇—缓存雪崩
    Flink入门系列02-编程基础
    do-exercise-排序子序列
  • 原文地址:https://blog.csdn.net/qq_41683000/article/details/126365906