• Zookeeper ---- ZooKeeper分布式锁案例


    什么叫做分布式锁呢?
    比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫做分布式锁。

    在这里插入图片描述

    1. 原生Zookeeper实现分布式锁案例

    1. 分布式锁实现

    package com.fickler.zkcase2;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @author dell
     * @version 1.0
     */
    public class DistributedLock {
    
        private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
        private final int sessionTimeout = 2000;
        private final ZooKeeper zooKeeper;
        private String rootNode = "locks";
        private String subNode = "seq-";
        private String waitPath;
        private CountDownLatch connectLatch = new CountDownLatch(1);
        private CountDownLatch waitLatch = new CountDownLatch(1);
        private String currentNode;
    
        public DistributedLock() throws IOException, InterruptedException, KeeperException {
    
            //获取连接
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
    
                    //连接建立时,打开latch,唤醒wait在该latch上的线程
                    if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
                        connectLatch.countDown();
                    }
                    //发生了waitPath的删除事件
                    if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
                        waitLatch.countDown();
                    }
    
                }
            });
            //等待zk正常连接后,往下走程序
            connectLatch.await();
            //判断根节点/locks是否存在
            Stat stat = zooKeeper.exists("/" + rootNode, false);
            if (stat == null){
                System.out.println("根节点不存在");
                zooKeeper.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
    
        }
    
        //对zk加锁
        public void zkLock() throws InterruptedException, KeeperException {
    
            //创建对应的临时带序号节点
            String currentMode = zooKeeper.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            //判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号的前一个节点
            List<String> children = zooKeeper.getChildren("/locks", false);
            //如果children只有一个值,那就直接获取资源,如果有多个,需要判断谁最小
            if (children.size() == 1){
                return;
            }else {
                Collections.sort(children);
                //获取节点名称
                String thisNode = currentMode.substring("/locks/".length());
                //通过seq-00000000获取该节点在children集合的位置
                int index = children.indexOf(thisNode);
                //判断
                if (index == -1){
                    System.out.println("数据异常");
                }else if (index == 0){
                    //只有一个节点,就可以获取锁了
                    return;
                }else {
                    //需要监听,他前一个节点的变化
                    waitPath = "/locks/" + children.get(index - 1);
                    //在waitPath上注册监听器,当waitPath被删除时,zookeeper会回调监听器process方法
                    zooKeeper.getData(waitPath, true, new Stat());
                    //进入等待锁状态
                    waitLatch.await();
                    return;
                }
            }
    
        }
    
        //解锁
        public void zkUnlock() throws InterruptedException, KeeperException {
    
            zooKeeper.delete(this.currentNode, -1);
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    2. 分布式锁测试

    1. 创建两个线程
    package com.fickler.zkcase2;
    
    import org.apache.zookeeper.KeeperException;
    
    import java.io.IOException;
    
    /**
     * @author dell
     * @version 1.0
     */
    public class DistributedLockTest {
    
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    
            //创建分布式锁1
            final DistributedLock lock1 = new DistributedLock();
            //创建分布式锁2
            final DistributedLock lock2 = new DistributedLock();
    
            new Thread(new Runnable(){
    
                @Override
                public void run() {
                    //获取锁对象
                    try {
                        lock1.zkLock();
                        System.out.println("线程1获取锁");
                        Thread.sleep(5 * 1000);
                        lock1.zkUnlock();
                        System.out.println("线程1释放锁");
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    } catch (KeeperException e) {
                        throw new RuntimeException(e);
                    }
                }
            }).start();
    
            new Thread(new Runnable(){
    
                @Override
                public void run() {
                    //获取锁对象
                    try {
                        lock2.zkLock();
                        System.out.println("线程2获取锁");
                        Thread.sleep(5 * 1000);
                        lock2.zkUnlock();
                        System.out.println("线程2释放锁");
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    } catch (KeeperException e) {
                        throw new RuntimeException(e);
                    }
                }
            }).start();
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    1. 观察控制台变化

    在这里插入图片描述

    2. Curator框架实现分布式锁案例

    1. 原生的 Java API 开发存在的问题

    1. 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
    2. Watch 需要重复注册,不然就不能生效
    3. 开发的复杂性还是比较高的
    4. 不支持多节点删除和创建。需要自己去递归

    2. Curator是一个专门解决分布式锁的框架,解决了原生 Java API 开发分布式遇到的问题。

    官方文档:https://curator.apache.org/index.html

    3. Curator案例实操

    1. 添加依赖
            <dependency>
                <groupId>org.apache.curatorgroupId>
                <artifactId>curator-frameworkartifactId>
                <version>4.3.0version>
            dependency>
            <dependency>
                <groupId>org.apache.curatorgroupId>
                <artifactId>curator-recipesartifactId>
                <version>4.3.0version>
            dependency>
            <dependency>
                <groupId>org.apache.curatorgroupId>
                <artifactId>curator-clientartifactId>
                <version>4.3.0version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    1. 代码实现
    package com.fickler.Lock;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessLock;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    public class CuratorLockTest {
        private String rootNode = "/locks";
        // zookeeper server 列表
        private String connectString =
                "hadoop102:2181,hadoop103:2181,hadoop104:2181";
        // connection 超时时间
        private int connectionTimeout = 2000;
        // session 超时时间
        private int sessionTimeout = 2000;
    
        public static void main(String[] args) {
            new CuratorLockTest().test();
        }
    
        // 测试
        private void test() {
            // 创建分布式锁 1
            final InterProcessLock lock1 = new
                    InterProcessMutex(getCuratorFramework(), rootNode);
            // 创建分布式锁 2
            final InterProcessLock lock2 = new
                    InterProcessMutex(getCuratorFramework(), rootNode);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 获取锁对象
                    try {
                        lock1.acquire();
                        System.out.println("线程 1 获取锁");
                        // 测试锁重入
                        lock1.acquire();
                        System.out.println("线程 1 再次获取锁");
                        Thread.sleep(5 * 1000);
                        lock1.release();
                        System.out.println("线程 1 释放锁");
                        lock1.release();
                        System.out.println("线程 1 再次释放锁");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 获取锁对象
                    try {
                        lock2.acquire();
                        System.out.println("线程 2 获取锁");
                        // 测试锁重入
                        lock2.acquire();
                        System.out.println("线程 2 再次获取锁");
                        Thread.sleep(5 * 1000);
                        lock2.release();
                        System.out.println("线程 2 释放锁");
                        lock2.release();
                        System.out.println("线程 2 再次释放锁");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    
        // 分布式锁初始化
        public CuratorFramework getCuratorFramework() {
            //重试策略,初试时间 3 秒,重试 3 次
            RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
            //通过工厂创建 Curator
            CuratorFramework client =
                    CuratorFrameworkFactory.builder()
                            .connectString(connectString)
                            .connectionTimeoutMs(connectionTimeout)
                            .sessionTimeoutMs(sessionTimeout)
                            .retryPolicy(policy).build();
            //开启连接
            client.start();
            System.out.println("zookeeper 初始化完成...");
            return client;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    1. 观察控制台变化
      在这里插入图片描述
  • 相关阅读:
    【哈希槽算法】概念、对比、优点、举例理解_Redis07
    第2关:创建表
    一文搞懂二叉树中序遍历的三种方法
    Linux MinIO 安装与配置(清晰明了)
    网桥、路由器和网关有什么区别?
    Harbor企业级Registry基础镜像仓库的详细安装使用教程(保姆级)
    ABAP BASE64/STRING/XSTRING/BINARY 等之间的转换总结
    ICDE 2023|TKDE Poster Session(CFP)
    数字集成电路设计(四、Verilog HDL数字逻辑设计方法)(二)
    【昇思MindSpore】MindSpore的安装
  • 原文地址:https://blog.csdn.net/qq_52354698/article/details/127365683