• 分布式锁的实现- zookeeper


    前言

    大家好,我是飓风
    上一遍文章 分布式锁的实现- mysql,我们讲解了分布式锁实现的特性,主要包含:

    • 互斥性
    • 超时特性
    • 提供阻塞和非阻塞接口
    • 可重入性
    • 公平锁和非公平锁
    • 其他 高性能 高可用等

    今天咱们来看看用zookeeper 怎么来实现分布式锁。

    实现

    环境准备

    • zookeeper 3.4.11
    • jdk 8
    • spring-boot 2.3.2.RELEASE
    • curator-framework 4.0.0
    jdk 安装

    这里就不介绍jdk安装了,相信大家肯定google 或者百度都可以查到,很简单,略过。

    zookeeper 安装

    这里我们利用docker 来快速安装和启动
    安装:

    docker pull zookeeper:3.4.11
    
    • 1

    启动:

    docker run --name zookeeper --restart always -d zookeeper:3.4.11
    
    • 1
    创建maven 项目

    这里创建maven 项目就省略了,下面的maven的依赖配置,具体的版本号在我的父pom里,完成代理,我会传到github 上。

    <dependencies>
            <dependency>
                <groupId>org.apache.curatorgroupId>
                <artifactId>curator-frameworkartifactId>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4jgroupId>
                        <artifactId>org.slf4jartifactId>
                    exclusion>
                exclusions>
            dependency>
            <dependency>
                <groupId>org.apache.curatorgroupId>
                <artifactId>curator-recipesartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starterartifactId>
            dependency>
    
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <scope>providedscope>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-configuration-processorartifactId>
                <optional>trueoptional>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-bootartifactId>
                <optional>trueoptional>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-autoconfigureartifactId>
                <optional>trueoptional>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>transmittable-thread-localartifactId>
            dependency>
        dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    实现原理

    zookeeper 的特性

    • 结构简单类,似于文件系统的树状结构
    • 单系统镜像,无论客户端连接到哪一个服务器,他将看到相同的、Zookeeper视图
    • 有序性, 有序的事务编号,客户端的更新顺序与它们被发送的顺序相一致
    • 原子性, 更新操作要么成功要么失败,没有第三种结果

    zookeeper的节点

    • 每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
    • znode有三种类型,临时的( EPHEMERAL )、持久的( PERSISTENT )和有序的 (SEQUENTIAL)
    • znode的类型在创建时确定并且之后不能再修改znode可以包含数据和子节点,但是EPHEMERAL类型的节点不能有子节点
    • znode中的数据可以有多个版本,比如某一个路径下存有多个数据版本,那么查询这个路径下的数据就需要带上版本
    • 节点不支持部分读写,而是一次性完整读写
    • 短暂znode的客户端会话结束时,zookeeper会将该短暂znode删除,短暂znode不可以有子节点
    • 持久znode不依赖于客户端会话,只有当客户端明确要删除该持久znode时才会被删除
    • 客户端应用可以在节点上设置监视器

    zookeeper watcher

    Watcher 在 zookeeper 是一个核心功能,Watcher 可以监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知所有设置在这个目录节点上的 Watcher,从而每个客户端都很快知道它所关注的目录节点的状态发生变化,而做出相应的反应。

    公共创建相同的临时节点方式

    通过上面的zookeeper的介绍,我们知道,zookeeper 是读写是原子性的,且节点不能重复创建的,那么我们就让客户端,在获取分布式锁的时候,去创建这个临时节点,先创建的client ,那么就会返回创建成功,其他都会返回创建失败,其他创建失败,此时需要监听这个 临时节点,如果临时节点被删除了,那么说明就是释放锁了,其他client 可以接着创建这个临时节点,来争抢分布式锁。
    之所以会利用临时节点,如果程序down掉了,那么此时临时节点就会自动删除,不会出现死锁的现象。
    举例:比如我们进行某个SKU库存的扣减,那么此时zookeeper 的创建节点的路径咱们可以设置为 /lock/sku/100121212, 其中 100121212 就是要扣减的sku的值,也是一个临时节点。
    下面我画个图,看完相信会更加清晰。

    1.创建临时节点,也就是开始获取锁,如下图:
    在这里插入图片描述
    2.client1 获取锁成功,如下图:

    在这里插入图片描述
    3.此时client1 获取锁成功,其他client2 和client3 获取锁失败

    接着client2 和 client3 开始监听 这个临时节点是否被删除了,如下图:
    在这里插入图片描述
    4.client1 执行完扣减库存业务,那么就会删除临时节点,也就是释放锁,那么其他client2 和client3 监听到这个临时节点被删除了,那么就会再次进行锁的获取,也就是创建这个临时节点了。

    在这里插入图片描述
    通过上面这个几个步骤,一个基于zookeeper临时节点的分布式锁就实现了。但是这里有些问题需要说明下:

    当大量客户端去竞争锁的时候,会发生“惊群”效应,这里惊群效应指的是在分布式锁竞争的过程中,大量的"Watcher通知"和“创建/lock/sku/xxxx”两个操作重复运行,并且绝大多数运行结果都创建节点失败,从而继续等待下一次通知,若在集群规模较大的情况下,会对ZooKeeper服务器以及客户端服务器造成巨大的性能影响和网络冲击,所以基于这种方式的实现,并发量上支持不很高,大流量下不建议使用。

    下面我来介绍改进方案

    基于zookeeper的临时顺序节点方式

    临时顺序节点原理

    我们可以利用创建zookeeper的临时顺序节点的方式,来解决“惊群”效应,其实是一种公平锁的实现,下面说下具体的步骤:

    1. 使用 zookeeper 的临时节点和有序节点,每个线程获取锁就是在 ZK 创建一个临时有序的节点,比如在 /lock/sku/000001, /lock/sku/000002, /lock/ sku/000003
      其中sku 是要你进行写的公共资源。
      如下图所示:三个client 同时想进行sku= 100121212 进行扣钱库存,那么sku = 100121212 就是共享资源,需要进行加锁,三个client 就会去创建临时顺序节点,
      /lock/100121212,分别创建了 /lock/10012121/001,/lock/10012121/002 ,/lock/10012121/003

    在这里插入图片描述

    1. 创建节点成功后,获取 /lock/sku 目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点,如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。

    在这里插入图片描述

    1. 如果当前线程创建的节点不是所有节点序号最小的节点,则对节点序号的前一个节点添加一个事件监听。如下图所示:

    在这里插入图片描述

    1. 前一个节点被删除了,那么就会被监听到,此时又会获取临时顺序节点的集合,看自己是不是最小的,如果是,那么就获到了,如果不是继续进行监听。

    如下图所示,001 被删除了,那么client2 就会监听到001 被删除了,于是再次获取到子节点集合,判断自己已经示最小的节点了,那么获取锁成功了。

    在这里插入图片描述

    临时顺序节点代码

    这里代码不做过多解释了,给了主要类的实现说明,可以和上面的原理对应上的。
    1 实现了阻塞获取锁
    2 实现了非阻塞获取锁
    3 锁的可重入性

    lock 接口 ,定义要实现获取锁和释放锁的方法

    public interface Lock {
    
        /**
         * 阻塞获取锁
         * @return
         */
        void lock(String source) throws LockException;
    
        /**
         * 非阻塞获取锁
         * @return
         */
        boolean nonLock(String source,int retries);
    
    
       /**
       * 释放锁
       */
        boolean unLock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    lock 接口的实现

    @Slf4j
    @RequiredArgsConstructor
    @Component
    @Scope(value = "prototype")
    public class ZookeeperLock implements Lock {
    
        private final CuratorFramework curatorFramework;
    
    
    
        private final Watcher watcher = event -> {
            if (event.getType() == Watcher.Event.EventType.NodeDeleted){
                notifyAllFromWatcher();
            }
        };
    
        /**
         * key: lock path
         * value: 重入的次数
         */
        private static final TransmittableThreadLocal<LockInfo> THREAD_LOCAL = new TransmittableThreadLocal<>();
    
    
        private LockInfo getLocalMap(){
            LockInfo lockInfo = THREAD_LOCAL.get();
            if (lockInfo == null){
                lockInfo = new LockInfo();
                THREAD_LOCAL.set(lockInfo);
            }
            return lockInfo;
        }
    
        private String createLockPath(String source)  {
            String base = "/" + source;
            // 创建临时节点,这里肯定谁最小是谁先创建出来
            String currentPath = null;
            try {
                currentPath = curatorFramework.create().creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                        .forPath(base+"/lock_");
            } catch (Exception e) {
                e.printStackTrace();
            }
            return currentPath;
        }
    
        private String getLockPath(){
            final LockInfo lockInfo = getLocalMap();
            return lockInfo.lockPath;
        }
        private String getBasePath(){
            final LockInfo lockInfo = getLocalMap();
            return "/"+lockInfo.source;
        }
        private String getSource(){
            final LockInfo lockInfo = getLocalMap();
            return lockInfo.source;
        }
        private void setLock(){
            final LockInfo lockInfo = getLocalMap();
            lockInfo.lock = true;
        }
    
        private void incCount(){
            final LockInfo lockInfo = getLocalMap();
            lockInfo.count++;
        }
    
        private void deCount(){
            final LockInfo lockInfo = getLocalMap();
            lockInfo.count--;
        }
    
        private int getCount(){
            return getLocalMap().count;
        }
    
        private boolean isLock(){
            return  getLocalMap().lock;
        }
    
        private synchronized void notifyAllFromWatcher(){
            notifyAll();
        }
    
        private  void nextLock() throws LockException {
            boolean deleted = false;
            try {
                // 不相等,那么说明有比它大的,那么找出它的弟弟节点,进行监听
                //监听上一个节点
                final List<String> childrenPath = curatorFramework.getChildren().forPath(getBasePath());
                final String youngerBrother = CommonUtil.getYoungerBrother(getSource(), childrenPath, getLockPath());
                //如果为空个,说明就剩下她自己一个了,那么直接返回获取
                if (StringUtils.isEmpty(youngerBrother)){
                    log.error("currentThread=> "+Thread.currentThread().getId()+"'s youngerBrother is null ");
                    lock(getSource());
                    return;
                }
                curatorFramework.getData().usingWatcher(watcher).forPath(youngerBrother);
                synchronized(this){
                    wait();
                }
    
                lock(getSource());
            }catch (Exception e){
    
                //如果是 NoNodeException ,说明我监听的节点不存了,那么如要继续获取锁
                if(e instanceof KeeperException.NoNodeException){
                    lock(getSource());
                    return;
                }
                e.printStackTrace();
                deleted = true;
                throw new LockException("获取锁失败:" + e.getMessage());
            }finally {
                // 等待超时,没有获取到锁,那么删除zookeeper 中的临时节点和thread local内的数据
                if (deleted){
                    removeResource();
                    throw new LockException("超时-获取锁失败");
                }
            }
    
        }
    
        private void initCurrentLock(String source) throws LockException {
            final LockInfo lockInfo = getLocalMap();
            //如果为空,那么说第一次尝试获取锁
            if (StringUtils.isEmpty(lockInfo.lockPath)){
                String lockPath = createLockPath(source);
                if (StringUtils.isEmpty(lockPath)){
                    throw new LockException("创建锁失败,请稍后重试");
                }
                lockInfo.source = source;
                lockInfo.lockPath = lockPath;
            }
        }
    
        @Override
        public void lock(String source) throws LockException {
    
            initCurrentLock(source);
    
           //如果获得了,那么不要继续了
            if (lockResource()){
                return;
            }
            try {
                //阻塞 监听
                nextLock();
            } catch (Exception e) {
                e.printStackTrace();
                throw  new LockException("获取锁失败,请稍后重试");
            }
        }
    
        @Override
        public boolean nonLock(String source,int retries){
            boolean notLock = false;
            try {
                initCurrentLock(source);
                while (retries>0){
                    if (lockResource()){
                        return true;
                    }
                    retries--;
                }
                if (retries==0){
                    notLock = true;
                }
            } catch (LockException e) {
                e.printStackTrace();
                log.error("上锁失败: {} ",e.getMessage());
                notLock = true;
                return false;
            }finally {
                //如果出现异常了,那么一定要删除
                if (notLock){
                    removeResource();
                }
            }
            return !notLock;
        }
    
       
    
        @Override
        public boolean unLock() {
            if (getCount()>1){
                deCount();
                return true;
            }
    
            return removeResource();
        }
    
        private boolean removeResource(){
            try {
                String lockPath = getLockPath();
                THREAD_LOCAL.remove();
                if (!StringUtils.isEmpty(lockPath) && curatorFramework.checkExists().forPath(lockPath)!=null){
                    curatorFramework.delete().forPath(lockPath);
                }
            } catch (Exception e) {
                e.printStackTrace();
                //todo: 如果删除锁失败了,那么要记录日志,同时报警,进行人工干预
                return false;
            }
            return true;
        }
    
        private boolean lockResource() throws LockException {
    
            //判断是否重入了
            if (isLock()){
                incCount();
                return true;
            }
    
            String lockPath  = getLockPath();
            String basePath =  getBasePath();
            int currentNumber = CommonUtil.getNumber(lockPath);
            List<String> childrenPath;
            try {
                childrenPath = curatorFramework.getChildren().forPath(basePath);
            } catch (Exception e) {
                e.printStackTrace();
                throw new LockException("获取锁列表失败");
            }
            // 获取所有节点的最小节点数字
            int minNumber = CommonUtil.getMin(childrenPath);
            //如果相等,那么它就是最小的,获得锁
            if (currentNumber == minNumber){
                System.out.println("lock thread: " + Thread.currentThread().getId() +" , lock number: " + currentNumber);
                setLock();
                incCount();
                return true;
            }
            return false;
        }
    
        @Data
        public static class LockInfo{
            private String source;
            private String lockPath;
            private int count;
            private boolean lock = false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    源码地址

    具体源码地址: github 点击

    总结

    zookeeper分布式锁的实现方式

    • 共同创建临时节点的方式,会引起“惊群”效应,并发量不能太高
    • 临时顺序接的方式,只会监听上一个顺序节点,性能会很高。
  • 相关阅读:
    Java注解(3):一个真实的Elasticsearch案例
    基于DTU储油罐在线监测系统,防患于未然
    Flir Blackfly S USB3 工业相机:计数器和定时器的使用方法
    【R言R语】202x年,校招算法岗将走向何方
    【金融项目】尚融宝项目(十六)
    【(数据结构)— 单链表的实现】
    【汉诺塔】问题,详细解析,手把手教会你
    【C语言基础】结构体中内嵌联合体|联合体中内嵌结构体
    Markdown常用快捷键
    基于nodejs+vue学生论坛设计与实现
  • 原文地址:https://blog.csdn.net/ajun_studio/article/details/126567549