• Curator实现Zookeeper分布式锁


    Curator实现Zookeeper分布式锁

    1、介绍

    Curatornetflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。Curator由一系列的模块构成,对于一般开发者而言,常用的是curator-framework和curator-recipes。

    2、锁分类

    InterProcessLock

    实现类特征
    InterprocessMutex互斥锁 可重入、独占锁
    InterProcessMultiLock可重入锁 多重共享锁 (将多个锁作为单个实体管理的容器)
    InterProcessReadWriteLock读写锁 读锁可以重入,写锁互斥。类似ReentrantReadWriteLock
    InterProcessSemaphoreMutex共享信号量。不可重入、独占锁
    InterProcessSemaphoreV2共享信号量

    相关拓展:

    2.1、 可重入锁 (Shared Reentrant Lock)

    Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。

    2.2、不可重入锁(Shared Lock)

    使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入。

    2.3、可重入读写锁(Shared Reentrant Read Write Lock)

    类似JDK的ReentrantReadWriteLock. 一个读写锁管理一对相关的锁。 一个负责读操作,另外一个负责写操作。 读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。 此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。 这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。 从读锁升级成写锁是不成的。

    2.4、信号量(Shared Semaphore)

    一个计数的信号量类似JDK的Semaphore。 JDK中Semaphore维护的一组许可(permits),而Cubator中称之为租约(Lease)。注意,所有的实例必须使用相同的numberOfLeases值。 调用acquire会返回一个租约对象。 客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。 但是, 但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。

    2.5、多锁对象(Multi Shared Lock)

    Multi Shared Lock是一个锁的容器。 当调用acquire, 所有的锁都会被acquire,如果请求失败,所有的锁都会被release。 同样调用release时所有的锁都被release(失败被忽略)。 基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。

    锁内容转载自:在代码世界游走,没几把“锁”防身可不行 | 京东云技术团队

    3、使用实例

    3.1、引入依赖
    <dependency>
        <groupId>org.apache.curatorgroupId>
        <artifactId>curator-recipesartifactId>
        <version>5.2.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.2、构建工具类

    package com.springbootcli.zookeeper.utils;
    
    import lombok.extern.log4j.Log4j;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    /**
     * Curator实现分布式锁工具类
     *
     * @author liangtl
     * @Date 2023/9/3
     */
    
    @Log4j
    public class CuratorUtils {
    
        //定义锁节点的路径
        private static final String LOCK_PATH = "/zkLock/test";
        //设置zookeeper连接,多台使用逗号分隔
        private static final String CONNECT_STRING = "localhost1:2181,localhost2:2181,localhost3:2181";
        //设置会话存活时间,根据业务灵活指定 单位:毫秒
        private static final int SESSION_TIME_OUT = 3000;
        //设置超时时间
        private static final int CONNECTION_TIME_OUT = 3000;
        //设置重试次数
        private static final int RETRY_TIMES = 3;
        ///设置失败重试间隔时间 单位:毫秒
        private static final int BASE_SLEEP_TIME_MS  = 3000;
    
        /**
         * 获取锁对象(可重入锁)
         *
         * @return 锁对象
         */
        public static InterProcessMutex getMutexLock() {
            return new InterProcessMutex(getCuratorFramework(), LOCK_PATH);
        }
    
        /**
         * 对分布式锁进行初始化
         *
         * @return
         */
        private static CuratorFramework getCuratorFramework() {
            //重试策略,定义初试时间3s,重试3次
            ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS , RETRY_TIMES);
            //初始化客户端
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    //配置zookeeper连接地址
                    .connectString(CONNECT_STRING)
                    //设置超时时间
                    .sessionTimeoutMs(SESSION_TIME_OUT)
                    //设置连接超时时间
                    .connectionTimeoutMs(CONNECTION_TIME_OUT)
                    //重试策略
                    .retryPolicy(exponentialBackoffRetry)
                    .build();
            //开启连接
            client.start();
            log.info("zookeeper初始化完成...");
            return client;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    3.3、使用
    @Test
    void contextLoads() {
        // 通过工具类初始化锁对象
        InterProcessMutex lock = CuratorUtils.getMutexLock();
        try {
            // 获取锁
            lock.acquire();
            log.info(Thread.currentThread().getName() + "获取zk锁成功");
        } catch (Exception e) {
            log.info("zk锁获取异常:", e);
        }
        /*
        业务处理......
         */
        try {
            lock.release();
            log.info(Thread.currentThread().getName() + "释放zk锁成功");
        } catch (Exception e) {
            log.info("zk锁释放异常:", e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
  • 相关阅读:
    [ARC116F] Deque Game
    【JavaScript复习十五】数组小练习
    VS Code多语言笔记本扩展插件 Polyglot Notebooks
    Java对象创建流程 —— 个人学习记录
    一位Java程序猿的“炫技“:从高级特性到深奥的代码实践
    精通Git(一)——入门
    【深度学习】吴恩达课程笔记(二)——浅层神经网络、深层神经网络
    【解刊】IF9.9,CCF-C类,1区SCI神刊,2天见刊!
    PHP笔记
    OpenAI首席科学家:直面AGI的可能性
  • 原文地址:https://blog.csdn.net/weixin_46508271/article/details/132656843