• Redisson分布式锁原理浅析



    Redisson锁是我们常用的分布式锁,其核心方法就是获取锁对象(getLock)、加锁(lock、tryLock)和释放锁(unlock),下面从锁的初始化、加锁和释放锁三部分分析Redisson锁的原理。

    一、初始化

    这里我们一般使用Redisson的getLock方法获取RLock锁对象

    public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }
    
    • 1
    • 2
    • 3

    在getLock方法中新建了一个RedissonLock对象,其源码是

    
    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        // 父类的构造方法,最终是RedissonObject
        super(commandExecutor, name);
        // 初始化命令执行器
        this.commandExecutor = commandExecutor;
        // 初始化id,用于锁的前缀
        this.id = commandExecutor.getConnectionManager().getId();
        // 初始化锁的默认时间,这里采用的是看门狗的时间,默认30s
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        // 锁的标识
        this.entryName = id + ":" + name;
        // 初始化锁的监听器,用于在锁被占用时,使用Redis的pub/sub功能订阅锁释放消息
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    二、加锁

    加锁有lock和tryLock两类方法,区别在于tryLock方法会有一个等待时间,如果超过等待时间未获取到锁就会返回false,表示获取锁失败,而lock方法会一直等待,直到获取到锁,两者的源码相差不大,这里主要分析tryLock方法的源码。
    源码分析如下:

    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        // 获取当前线程id
        long threadId = Thread.currentThread().getId();
        // 获取锁,底层实现是lua脚本,具体参考下面的tryAcquireAsync源码(这部分包括lua脚本加锁和加锁时间续期)
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // 锁获取成功,返回true
        if (ttl == null) {
            return true;
        }
    
        // 后面是锁获取失败的处理流程
         
     
        time -= System.currentTimeMillis() - current;
        // 等待时间用完,获取锁失败
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
         
        current = System.currentTimeMillis();
        // 订阅锁释放的通知
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // 剩余时间内未订阅成功
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            // 尝试取消订阅过程,若无法取消,则会取消订阅
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            // 获取锁失败
            acquireFailed(threadId);
            return false;
        }
    
        try {
            time -= System.currentTimeMillis() - current;
            // 等待时间用完,加锁失败
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
         
            // 自旋获取锁
            while (true) {
                // 尝试获取锁
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // 获取成功
                if (ttl == null) {
                    return true;
                }
                 
                // 未获取成功,若等待时间用完,则加锁失败
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
    
                // 通过java.util.concurrent包的Semaphore(信号量)挂起线程等待锁释放
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
                 
                // 等待时间已用完,获取锁失败
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // 取消订阅锁释放
            unsubscribe(subscribeFuture, threadId);
        }
    //        return get(tryLockAsync(waitTime, leaseTime, unit));
    }
    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
        //  指定加锁时间
        if (leaseTime != -1) {
            // lua脚本加锁,详细见tryLockInnerAsync源码
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        // 未指定加锁时间,则异步执行尝试获取锁,加锁时间是默认的30s
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }
     
            // 获取锁成功,开启时间轮任务,用于锁续期
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
         
        return ttlRemainingFuture;
    }
     
    private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        // 获取当前线程的锁续期任务节点
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            // 若当前线程的锁续期任务任务节点已存在,将节点计数加1
            oldEntry.addThreadId(threadId);
        } else {
            // 不存在,说明是新的线程获取锁
            // 将节点计数加1
            entry.addThreadId(threadId);
            // 开始时间轮调度任务
            // 其会每10s执行一次调度任务,给锁续期
            renewExpiration();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125

    这里可以看到在不指定锁的过期时间时,Redisson会自动给锁续期,这可以解决业务未执行完锁就释放的问题,但同时也有缺点,若系统出现异常,锁未执行到finally的unlock,就会造成死锁,其他的线程将无法再获取到该锁。在日常编码中,我们更偏向于指定锁的过期时间,保证在异常情况下锁也能够被释放,至于业务未执行完锁就释放的问题,通过合理评估业务耗时,合理设置过期时间即可。
    锁每次重入都会将锁续期任务节点的计数加1,是为了在释放锁的时候判断释放几次后将锁续期任务停止。
    下面看一下锁存入redis的lua脚本:

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        // KEYS[1]为 Collections.singletonList(getName())的第一个元素,即锁名
        // ARGV[1]是internalLockLeaseTime,即为过期时间
        // ARGV[2]是getLockName(threadId),为锁的唯一标识
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                   // 判断锁是否存在
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      // 锁不存在,通过hash结果进行存储,hash-key是前缀+线程id,value是1,表示第一次加锁
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      // 设置过期时间
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      // 返回null,加锁成功
                      "return nil; " +
                  "end; " +
                  // 锁已存在,判断是否是当前线程获取到锁
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      // 是当前线程获取到锁,将hash的value加1,表示锁的进入次数加1
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      // 设置过期时间
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      // 返回null,加锁成功
                      "return nil; " +
                  "end; " +
                  // 返回锁的过期时间,表示获取锁失败
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    通过truLockInnerAsync的源码不难分析Redisson锁的存储结构,Redisson锁采用的是hash结构,Redis 的Key为锁名,也就是初始化时传入的name参数,hash的key为锁的id属性+线程id,hash的value为加锁次数,不难看出Redisson分布式锁是可重入的
    综上,加锁的流程如下:
    在这里插入图片描述

    三、释放锁

    释放锁一般在finally代码块里执行unlock方法,核心方法是unlockAsync,其源码如下:

    @Override
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        // 释放锁
        RFuture<Boolean> future = unlockInnerAsync(threadId);
     
        future.onComplete((opStatus, e) -> {
            if (e != null) {
                // 释放异常,取消锁续期任务
                cancelExpirationRenewal(threadId);
                result.tryFailure(e);
                return;
            }
             
            // 返回为null,表示该线程未拥有该锁,抛出异常
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
             
            // 取消锁续期任务
            cancelExpirationRenewal(threadId);
            result.trySuccess(null);
        });
     
        return result;
    }
     
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        //keys[1]]为 Arrays.asList(getName(), getChannelName())的第一个元素,即锁名
        //keys[2]为Arrays.asList(getName(), getChannelName())的第二个元素,为锁释放通知的通道,格式为:redisson_lock__channel:{lockName}
        //ARGV[1]为LockPubSub.UNLOCK_MESSAGE,是发布锁释放事件类型
        //ARGV[2]为internalLockLeaseTime,即过期时间
        //ARGV[3]为getLockName(threadId),是锁的唯一标识
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    // 锁不存在,返回null
                    "return nil;" +
                "end; " +
                // 加锁次数减1
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    // 剩余次数大于0
                    // 设置过期时间,时间为30s
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    // 返回0
                    "return 0; " +
                "else " +
                    // 剩余次数小于等于0
                    // 删除缓存
                    "redis.call('del', KEYS[1]); " +
                    // 发布锁释放通知
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    // 返回1
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
     
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    释放锁的流程主要是减少redis中锁的次数,若为0则删除缓存,同时减少锁续期任务节点的计数,若为0,则停止锁续期任务,其流程如下:
    在这里插入图片描述
    以上就是Redisson锁的主要原理。

    值得注意的是,如果采用redis-cluster模式,master节点加锁成功,在主从同步时,master节点异常,主从切换,slave节点变成master,此时加锁数据还未同步成功,也就是变成master的slave节点中没有该锁的数据,当其他线程再次对该锁进行加锁操作时,就会有两个线程拥有该锁,会造成并发问题

  • 相关阅读:
    10.2 整流电路
    Ubuntu apt-get换源
    口播神器,基于Edge,微软TTS(text-to-speech)文字转语音免费开源库edge-tts实践(Python3.10)
    【云原生 | Kubernetes 系列】K8s 实战 Kubernetes 对象管理之指令式命令管理和配置文件命令式管理
    Java如何将两个数组合并为一个数组呢?
    Revel框架基本使用实践教程
    java并发编程学习三——wait/notify与park/unpark
    Go 语言中 panic 和 recover 搭配使用
    Cadence OrCAD Capture 如何批量替换元器件
    redis 分布式锁
  • 原文地址:https://blog.csdn.net/qq_34826261/article/details/126177704