• 16.Redis系列之Redisson分布式锁原理


    本文学习Redisson分布式锁的原理以及优缺点

    1. Redisson分布式锁原理

    lua脚本是原子操作,redis会将整个脚本作为一个整体执行,中间不会被其他命令打断

    # RedissonLock.tryLockInnerAsync方法内lua脚本加锁
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "return redis.call('pttl', KEYS[1]);",
                    Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    1.1 加锁原理

    1
    2

    如图与代码所示,sk_10001_stock_lock不存在时,加锁时会创建hash类型键sk_10001_stock_lock,并新增field为客户端id,value设置为1(hincrby key field 1不存在key则会创建它,并且设置field为1),并设置过期时间为60s

    1.2 可重入原理

    如lua脚本所示,当sk_10001_stock_lock存在客户端id时,会将客户端id的值+1,并且重新设置过期时间,所以保存客户端id就是为了可重入

    1.3 锁互斥原理

    如lua脚本所示,对于其他的客户端返回锁pttl过期时间,后续流程如代码所示

    // RedissonLock.tryLock方法
        public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // ttl为空获取到锁
            if (ttl == null) {
                return true;
            }
            // 如果时间超过waitTime则获取锁失败
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
    	// 订阅锁释放事件
            CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
            try {
                subscribeFuture.get(time, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
            } catch (ExecutionException e) {   
            }
    
            try {
                // 在最大等待时间内,循环获取锁,直到成功或失败
                while (true) {
                    long currentTime = System.currentTimeMillis();
                    ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        return true;
                    }
    
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(waitTime, unit, threadId);
                        return false;
                    }
    
                    // 通过信号量semaphore共享锁阻塞等待
                    currentTime = System.currentTimeMillis();
                    if (ttl >= 0 && ttl < time) {
                       commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                    }
                }
            } finally {
                // 取消订阅
                unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    1.4 锁续期原理
    // RedissonLock.tryAcquireAsync方法
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
            CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
                // 获取到锁
                if (ttlRemaining == null) {
                    if (leaseTime > 0) {
                        internalLockLeaseTime = unit.toMillis(leaseTime);
                    } else {
                        // 当leaseTime<=0时锁会自动续期
                        scheduleExpirationRenewal(threadId);
                    }
                }
                return ttlRemaining;
            });
            return new CompletableFutureWrapper<>(f);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
     private void renewExpiration() {
            // Watch Dog机制,每10秒检查是否仍然持有锁,是则续期
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
    	    }
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
            
            ee.setTimeout(task);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
        // 依次进入方法,最终找到RedissonBaseLock.renewExpirationAsync实现
        protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                            "end; " +
                            "return 0;",
                    Collections.singletonList(getRawName()),
                    internalLockLeaseTime, getLockName(threadId));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    通过代码可知,当leasetime<=0时,锁通过Watch Dog机制[其实就是一个后台定时任务线程]每10秒检查是否持有锁,持有则自动续期30s,也就是重新设置了sk_10001_stock_lock中field为客户端id的过期时间为30s

    1.5 锁释放原理
    public RFuture<Void> unlockAsync(long threadId) {
            // 释放锁
            RFuture<Boolean> future = unlockInnerAsync(threadId);
        
            CompletionStage<Void> f = future.handle((opStatus, e) -> {
                // 取消Watch Dog机制
                cancelExpirationRenewal(threadId);
            });
    
            return new CompletableFutureWrapper<>(f);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                            "return nil;" +
                            "end; " +
                            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                            "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                            "else " +
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; " +
                            "end; " +
                            "return nil;",
                    Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    由代码可知,释放锁时将sk_10001_stock_lock中field为客户端id的值依次减1,直到为0后在进行删除,删除后会向redisson_lock__channel通道中发送UNLOCK_MESSAGE消息也就是0L,通知阻塞等待的客户端

    2. Redisson优缺点

    优点

    • Redisson通过Watch Dog机制解决锁的续期问题
    • 与Zookeeper相比较,Redisson基于Redis性能更高
    • 通过Redisson实现分布式可重入锁,比原生的 SET mylock userId NX PX milliseconds + lua更加简洁,让我们更多关注业务逻辑
    • 在等待申请锁的实现上进行了优化,减少了无效锁申请,提升了资源利用率

    缺点

    在redisson3.12.5前,无法解决在master节点加锁后,master异步复制给slave时宕机,slave变为了master,其他客户端在新的master上重复加锁问题,需要使用redlock算法获取集群大多数锁时才算获取锁成功额外编码

    RLock rLock = redissonClient.getLock(PRODUCT_STOCK_KEY + "_lock");
    RLock rLock = redissonClient1.getLock(PRODUCT_STOCK_KEY + "_lock");
    RLock rLock = redissonClient2.getLock(PRODUCT_STOCK_KEY + "_lock");
    RedissonRedLock redissonRedLock = new RedissonRedLock(rLock, rLock1, rLock2);
    redissonRedLock.tryLock(30, 60, TimeUnit.SECONDS);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    https://github.com/redisson/redisson/blob/master/CHANGELOG.md
    16-Apr-2020 - 3.12.5 released
    Improvement - increased RLock reliability during failover. RedLock was deprecated

    但是对于redisson3.12.5及之后就不存在该问题会等到master异步复制给slave完成后才会进行加锁

     /**
         * Returns Lock instance by name.
         * 

    * Implements a non-fair locking so doesn't guarantees an acquire order by threads. *

    * To increase reliability during failover, all operations wait for propagation to all Redis slaves. * * @param name - name of object * @return Lock object */ RLock getLock(String name);

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    欢迎关注公众号算法小生

  • 相关阅读:
    opencv dnn模块 示例(16) 目标检测 object_detection 之 yolov4
    MySQL —— 排序,分页
    【Vue-Element-UI -el-tree数据格式与组件中的格式不一样】
    【笔记】css更改hr标签颜色
    小程序线上调试优化
    Golang sync.Map原理分析
    JMeter:断言之响应断言
    单片机硬件和软件延时、RTOS相对延时和绝对延时
    url编码 解码 百分号 %
    python r代表什么意思
  • 原文地址:https://blog.csdn.net/SJshenjian/article/details/127875670