• redisson分布式锁原理剖析


    redisson分布式锁原理剖析

    ​ 相信使用过redis的,或者正在做分布式开发的童鞋都知道redisson组件,它的功能很多,但我们使用最频繁的应该还是它的分布式锁功能,少量的代码,却实现了加锁、锁续命(看门狗)、锁订阅、解锁、锁等待(自旋)等功能,我们来看看都是如何实现的。

    加锁

    //获取锁对象
    RLock redissonLock = redisson.getLock(lockKey);
    //加分布式锁
    redissonLock.lock();
    

    根据redissonLock.lock()方法跟踪到具体的private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)方法,真正获取加锁的逻辑是在tryAcquireAsync该方法中调用的tryLockInnerAsync()方法,看看这个方法是怎么实现的?

     RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
    
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                   // 判断是否存在分布式锁,getName()也就是KEYS[1],也就是锁key名                     
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                   // 加锁,执行hset 锁key名 1                           
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                   // 设置过期时间                           
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                   // 这个分支是redisson的重入锁逻辑,锁还在,锁计数+1,重新设置过期时长                 
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  // 返回锁的剩余过期时长                            
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    
    

    发现底层是结合lua脚本实现了加锁逻辑。

    为什么底层结合了Lua脚本?
    Redis是在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到redis执行。使用脚本的好处如下:

    1、减少网络开销:本来5次网络请求的操作,可以用一个请求完成,原先5次请求的逻辑,可以一次性放到redis中执行,较少了网络往返时延。这点跟管道有点类似

    2、原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。管道不是原子的,不过
    redis的批量操作命令(类似mset)是原子的

    也就意味着虽然脚本中有多条redis指令,那即使有多条线程并发执行,在同一时刻也只有一个线程能够执行这段逻辑,等这段逻辑执行完,分布式锁也就获取到了,其它线程再进来就获取不到分布式锁了。

    锁续命(自旋)

    ​ 大家都听过锁续命,肯定也知道这里涉及到看门狗的概念。在调用tryLockInnerAsync()方法时,第一个参数是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()也就是默认的看门狗过期时间是private long lockWatchdogTimeout = 30 * 1000毫秒。

    private  RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        // 添加监听器,判断获取锁是否成功,成功的话,添加定时任务:定期更新锁过期时间
        ttlRemainingFuture.addListener(new FutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
                // 根据tryLockInnerAsync方法,加锁成功,return nil 也就是null
                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    // 添加定时任务:定期更新锁过期时间
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
    

    ​ 当线程获取到锁后,会进入if (ttlRemaining == null)分支,调用定期更新锁过期时间scheduleExpirationRenewal方法,我们看看该方法实现:

    private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }
    
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                
                RFuture future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        // 检测KEYS[1]锁是否还在,在的话再次设置过期时间                               
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                future.addListener(new FutureListener() {
                    @Override
                    public void operationComplete(Future future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        // 通过上面lua脚本执行后会返回1,也就true,再次调用更新过期时间进行续期
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
            // 延迟 internalLockLeaseTime / 3再执行续命
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }
    
    

    ​ 发现scheduleExpirationRenewal方法只是用了Timeout作为任务,并没有使用java的Timer()之类的定时器,而是在Timeout任务run()方法中定义了RFuture对象,通过给RFuture对象设置listener,在listener中通过Lua脚本执行结果进行判断是否还需要进行续期。通过这样的方式来给分布式锁进行续期。

    ​ 这种方式实现定时更新确实很巧妙,定期时间很灵活。

    锁订阅及锁等待

    ​ 锁订阅是针对那些没有获取到分布式锁的线程而言的。来看看整个获取锁的方法:

    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
            long threadId = Thread.currentThread().getId();
            Long ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired,获取到锁,直接退出
            if (ttl == null) {
                return;
            }
    		// 没有获取到锁,进行订阅
            RFuture future = subscribe(threadId);
            commandExecutor.syncSubscription(future);
    
            try {
                while (true) {
                    ttl = tryAcquire(leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        break;
                    }
    
                    // waiting for message
                    if (ttl >= 0) {
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        getEntry(threadId).getLatch().acquire();
                    }
                }
            } finally {
                unsubscribe(future, threadId);
            }
    //        get(lockAsync(leaseTime, unit));
        }
    

    ​ 当第一个线程获取到锁后,会在if (ttl == null)分支进行返回,第二个及以后的线程进来在没获取到锁时,只能接着走下面的逻辑,进行锁的订阅。

    ​ 接着进入到一个while循环,首先还是会进行一次尝试获取锁(万一此时第一个线程已经释放锁了呢),通过tryAcquire(leaseTime, unit, threadId)方法,如果没有获取到锁的话,会返回锁的剩余过期时间,如果剩余过期时间大于0,则当前线程通过Semaphore信号号,将当前线程阻塞,底层执行LockSupport.parkNanos(this, nanosTimeout)线程挂起剩余过期时间后,会自动进行唤醒,再次执行tryAcquire尝试获取锁。所有没有获取到锁的线程都会执行这个流程。

    一定要等待剩余过期时间后才唤醒吗?

    ​ 假设线程一获取到锁,过期时间默认为30s,当前执行业务逻辑已经过了5s,那其他线程走到这里,则需要等待25s后才行进行唤醒,那万一线程一执行业务逻辑只要10s,那其他线程还需要等待20s吗?这样岂不是导致效率很低?

    ​ 答案是否定的,详细看解锁逻辑。

    解锁

    ​ 解锁:redissonLock.unlock();

    ​ 我们来看看具体的解锁逻辑:

    protected RFuture unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // 锁不存在,发布unlockMessage解锁消息,通知其他等待线程                              
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                // 不存在该锁,异常捕捉                              
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                // redisson可重入锁计数-1,依旧>0,则重新设置过期时间                              
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                // redis删除锁,发布unlockMessage解锁消息,通知其他等待线程                         
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    
    }
    
    

    ​ 发现解锁逻辑底层也是用了一个lua脚本实现。具体的说明可以看代码注释,删除锁后,并发布解锁消息,通知到其它线程,也就意味着不会其它等待的线程一直等待。

    Semophore信号量的订阅中有个onMessage方法,

    protected void onMessage(RedissonLockEntry value, Long message) {
        // 唤醒线程
        value.getLatch().release(message.intValue());
        
        while (true) {
            Runnable runnableToExecute = null;
            synchronized (value) {
                Runnable runnable = value.getListeners().poll();
                if (runnable != null) {
                    if (value.getLatch().tryAcquire()) {
                        runnableToExecute = runnable;
                    } else {
                        value.addListener(runnable);
                    }
                }
            }
            
            if (runnableToExecute != null) {
                runnableToExecute.run();
            } else {
                return;
            }
        }
    }
    

    解锁后通过if (opStatus)分支取消锁续期逻辑。

    总结:

    ​ 总的来说,可以借助一张图加深理解:

    ​ 分布式锁的整体实现很巧妙,借助lua脚本的原子性,实现了很多功能,当然redisson还有其它很多功能,比如为了解决主从集群中的异步复制会导致锁丢失问题,引入了redlock机制,还有分布式下的可重入锁等。

  • 相关阅读:
    Springboot实验室自主预约系统毕业设计源码111953
    教你用Python制作BMI计算器
    mysql之GROUP_CONCAT
    iOS调试技巧——使用Python 自定义LLDB
    Java 22正式发布,一文了解全部新特性
    CSS概述 | CSS的引入方式 | 选择器
    学习笔记|回顾(1-12节课)|应用模块化的编程|分时复用|STC32G单片机视频开发教程(冲哥)|阶段小结:应用模块化的编程(下)
    表单追加数据 - 工作经历
    Python ML实战-工业蒸汽量预测02-数据探索
    Windows 0x80190001错误登录失败
  • 原文地址:https://www.cnblogs.com/process-h/p/16915309.html