• 分布式锁之 redis & redisson (你学了 synchronized 真的有用吗?)


    分布式锁之 redis & redisson (你学了 synchronized 真的有用吗?)

    相信大多数同学在开始接触 “锁” 时都是 java 中本身给我们提供的关键字 synchronized。

    但是,实际工作中,我们真的会用 synchronized 来对一些共享资源、互斥场景进行加锁操作吗?

    实际上是不行的(当然如果你的系统仅仅是单机,之后也不考虑会扩容的情况当我没说),通常我们生产的项目都是部署在多机是存在多个服务的,所以会选择使用分布式锁。

    本文将一步步带你了解:

    为何 synchronized 无法对多机环境起作用?

    如何一步步实现商业级别的分布式锁?

    探究现成分布式锁解决方案 redisson 源码

    synchronized 单机锁

    先来看一个典型的 synchronized 单机锁的实现,以红包获取接口为例构建一个简单的共享互斥场景。

    代码基于 Spring Boot 框架,为了文章的连贯性和可阅读性,代码实例只给出部分核心逻辑

    // 初始化一个红包锁
    private final static Object redPacketLock = new Object();
    
    @PostMapping("/getRedPacket")
    public String getRedPacket() {
       	// 加锁
        synchronized (redPacketLock) {
            // 查询当前剩余红包
            Long redPacketNum = getRedPacketNum();
            log.info("getRedPacket ={}", redPacketNum);
            if (redPacketNum <= 0) {
                return "RedPacketNum less than 0 " + redPacketNum;
            } else {
              	// 获取红包
                doGetRedPacket();
                return "got it 1 , RedPacketNum: " + getRedPacketNum();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在这里插入图片描述

    在理想的锁获取场景下(如此时在单机环境),这个代码是没有任何问题的,如上图,用户 1 和 用户 2 ,有序的获取共享资源锁,并消费资源。

    ,如果此时我们有两台服务器,synchronized 还能正常工作吗?让我们一起看下这个场景

    在这里插入图片描述

    synchronized 作为 jvm 级别的锁,它只能作用于单个服务器上,也就是说多个服务器使用 synchronized 就相当于有多把锁,自己用自己的,就像家里有多个进出的大门(同一时间供一人进入),那么有几个大门就可以同时允许几个人进入,结果显而易见,这个锁失败的。

    上图的例子就出现了红包被抢成 负值 的情况,在并发更大机器更多的场景下,情况可能会更糟。(Ooo No 这可如何是好……)

    这个时候可能有人就慌了,关键时刻我们不要慌,想想加锁的本质是什么?

    其实就是将并行的操作串行化,多服务器同样如此。

    如何串行化整个操作流程就是我们需要考虑的问题。

    redis 的高性能,串行化 用来来解决这个问题就非常合适。

    尝试实现分布式锁

    redis 为我们提供了 setnx 命令,如果 key 已经设置了值将返回 0,以下是部分代码帮助理解逻辑

    @PostMapping("/getRedPacketWithDistributedLock")
    public String getRedPacketWithDistributedLock() {
        try {
            while (true) {
              	// 如果加锁不成功就会一直循环
                if (redisCache.setnx(distributedLockKey, "1") == 0) {
                    continue;
                }
                // 为锁设置一个自动过期时间
                redisCache.expire(distributedLockKey, 10);
                Long redPacketNum = getRedPacketNum();
                log.info("getRedPacket ={}", redPacketNum);
                if (redPacketNum <= 0) {
                    return "RedPacketNum less than 0 " + redPacketNum;
                } else {
                    doGetRedPacket();
                    return "got it 1 , RedPacketNum: " + getRedPacketNum();
                }
            }
        } catch (Exception e) {
            log.error("getRedPacketWithDistributedLock error!", e);
        } finally {
          	// 释放锁
            redisCache.del(distributedLockKey);
        }
        return "get lock fail";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    这个代码在分布式场景下运行良好。此时之前的问题解决了,我们的 “门” 重新回归了一个

    在这里插入图片描述

    长舒一口气,哦,似乎问题已经解决了,很简单嘛,几行代码就 ok 了。但事实真的如此吗,让我们看看这段代码中隐藏的陷阱。

    问题剖析

    @PostMapping("/getRedPacketWithDistributedLock")
    public String getRedPacketWithDistributedLock() {
        try {
            while (true) {
                if (redisCache.setnx(distributedLockKey, "1") == 0) {
                    continue;
                }
                //TODO 1.此处中断,过期时间没设置上 
                //TODO 2.过期时间设置多少,没执行完就过期了怎么办?
                redisCache.expire(distributedLockKey, 10);
            //…………省略业务逻辑
     break;
            }
        } catch (Exception e) {
            log.error("getRedPacketWithDistributedLock error!", e);
        } finally {
            //TODO 3.超过过期时间删掉了别人刚加的锁,锁失效
            redisCache.del(distributedLockKey);
        }
        return "get lock fail";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    TODO 1 此处中断,过期时间没设置上

    TODO 2 过期时间设置多少,没执行完就过期了怎么办?

    TODO 3 超过过期时间删掉了别人刚加的锁,锁失效

    以上 TODO 对应 3 个问题,你可以先思考一下 🤔 应该如何解决。

    大概有的同学会疑惑,看着好像都是过期时间搞的鬼,你设置过期时间干嘛,不设置不啥事情都没有了?!

    还是讲下:设置过期时间的原因是 即使我们的锁释放是在 finally 中执行,但仍可能执行失败,比如中断,这会造成更严重问题,永久锁,这个锁不会被任何人释放,所以任何人都无法使用这个服务了。所以需要过期时间来处理程序中断异常导致的永久锁问题。

    ps : 根据业务场景不同,有些业务是可以容忍部分的锁失效的情况,所以在这种场景下以上代码已经可以胜任,且好处是简单易懂 毕竟 keep it simple


    TODO 1 Redis 事务 Lua 脚本

    之所以存在设置 key 过期时间失败的问题,是我们分成了两步进行,整体的操作不原子,问题就产生了。

    Redis 为我们提供了内部事务实现: Lua 脚本,通过编写 Lua 脚本,可以将设置 key 和 设置过期时间放到一个事务中进行,要么都成功要么都失败。

    public boolean setNxWithExpire(String key, String uId, int seconds) {
        String luaScripts = "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('hset', KEYS[1], ARGV[1], 1);" +
                "redis.call('expire', KEYS[1], ARGV[2]);" +
                "return 1;" +
                "end;" +
                "return 0;";
        List<String> keys = new ArrayList<>();
        List<String> values = new ArrayList<>();
        keys.add(key);
        values.add(uId);
        values.add(String.valueOf(seconds));
        long result = (long) jedis.eval(luaScripts, keys, values);
        log.info("setNxWithExpire key={}, uId={}, seconds={}, result={}", key, uId, seconds, result);
        return result == 1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    具体语法就不多解释了,用到查文档即可(PS 后文 redisson 源码剖析中会有部分讲解)。至此 TODO 1 解决。

    TODO 3 锁什么失效了?

    TODO 2 ,先按下不表,先来讲讲 TODO 3,怎么就删掉了别人的锁呢?好端端的锁为何就失效了?让我们看一个较为复杂的场景(而实际情况可能更为复杂)

    在这里插入图片描述

    这里 用户1 首先加锁成功,但是执行时间超出了锁的过期时间,这时锁过期自动释放,被用户 2 抢占,此时共享资源就有两个人在共用,这时处理 用户 1 的线程执行完毕进行锁释放操作,结果却释放用户 2 加的锁,又变成了无锁状态用户 3 又可以获取锁了。

    再执行下去的结果显而易见,又出现了多抢红包的问题,钱不翼而飞了。

    而在高并发场景下,可能一直持续这种情况,我的锁被前人释放,循环往复,这个锁加与不加已经没有任何区别了。

    如何是好?咋办?咋办?!

    细心的同学应该注意到,在 TODO 1 的方法参数有一个 uid 的参数。uid 的作用在于标识这个锁是谁加的,谁加的谁有权力释放。

    所以释放锁的方法就变成了这样

    public boolean delNxWithUid(String key, String uId) {
        String luaScripts = "if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then " +
                "redis.call('del', KEYS[1]); " +
                "return 1;" +
                "end;" +
                "return 0;";
        List<String> keys = new ArrayList<>();
        List<String> values = new ArrayList<>();
        keys.add(key);
        values.add(uId);
        long result = (long) jedis.eval(luaScripts, keys, values);
        log.info("delNxWithUid key={}, uId={}, result={}", key, uId, result);
        return result == 1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    有效避免了我加的锁他人释放的问题。

    TODO 2 过期时间设置多少,没执行完就过期了怎么办?

    回到 TODO 2,过期时间问题,虽然我们引入过期时间来解决 “永久锁” 问题,但是同样带来了新的问题。

    令人头疼的是过期的时间设置多少才好?太多太少都不太合适,我们很难预估一个恰好的过期时间,而对于复杂的业务场景,代码运行期间耗时操作卡死,或者 timeout ,都有可能导致锁过期释放。

    如果能在程序运行的时候,给锁续命就好了,只要我还在执行,就在合适的时候延长锁的过期时间。

    没错,这是一个好方法,我们可以开一个守护线程来为我们主线程加的锁 “续命”

    在 redisson 中称其为 watchdog 看门狗,来看看 redisson 是如何实现分布式锁的

    Redisson 源码剖析(⚠️警告,本文将进入 Hard Cord 模式,如有不适请酌情观看)

    Redisson Github 地址 使用 redisson 后,代码又回到了最初的样子

     @PostMapping("/getRedPacketWithRedissonLock")
        public String getRedPacketWithRedissonLock() {
            RLock redissonLock = redissonClient.getLock(redPacketLockKey);
            try {
                redissonLock.lock();
                Integer redPacketNum = getRedPacketNum();
                log.info("getRedPacket ={}", redPacketNum);
                if (redPacketNum <= 0) {
                    return "RedPacketNum less than 0 " + redPacketNum;
                } else {
                    doGetRedPacket();
                    return "got it 1 , RedPacketNum: " + getRedPacketNum();
                }
            } catch (Exception e) {
                log.error("getRedPacketWithDistributedLock error!", e);
            } finally {
                redissonLock.unlock();
            }
            return "get lock fail";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    进入 lock 方法,基础的实现, 此处省略其他方法

    public class RedissonLock extends RedissonBaseLock {
    
        @Override
        public void lock() {
            try {
                lock(-1, null, false);
            } catch (InterruptedException e) {
                throw new IllegalStateException();
            }
        }
    		// lock(-1, null, false); 实际调用此方法
        private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
            // 获取当前线程 id
          	long threadId = Thread.currentThread().getId();
          	// 尝试获取锁
            Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return;
            }
    				// 订阅解锁通知
            CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
            pubSub.timeout(future);
            RedissonLockEntry entry;
            if (interruptibly) {
                entry = commandExecutor.getInterrupted(future);
            } else {
                entry = commandExecutor.get(future);
            }
    
            try {
                while (true) {
                    ttl = tryAcquire(-1, leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        break;
                    }
    
                    // waiting for message
                    if (ttl >= 0) {
                        try {
                            entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException e) {
                            if (interruptibly) {
                                throw e;
                            }
                            entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else {
                        if (interruptibly) {
                            entry.getLatch().acquire();
                        } else {
                            entry.getLatch().acquireUninterruptibly();
                        }
                    }
                }
            } finally {
                unsubscribe(entry, threadId);
            }
    //        get(lockAsync(leaseTime, unit));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    进入 tryAcquire 方法

       	//  Long ttl = tryAcquire(-1, leaseTime, unit, threadId); 注意这里传参是 tryAcquire(-1, -1, null, threadId); 
    		private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
            return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
        }
    		// 这里又包了一层,实际执行 tryAcquireAsync
        private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
            RFuture<Long> ttlRemainingFuture;
            if (leaseTime > 0) {
                ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
            } else {
              	// leaseTime = -1 进入这个分支逻辑
              	// 区别是 lockWatchdogTimeout = 30 * 1000 给了一个默认的值,也就是锁过期时间 30 秒
                ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                        TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            }
            CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
                // lock acquired
                if (ttlRemaining == null) {
                    if (leaseTime > 0) {
                        internalLockLeaseTime = unit.toMillis(leaseTime);
                    } else {
                        scheduleExpirationRenewal(threadId);
                    }
                }
                return ttlRemaining;
            });
            return new CompletableFutureWrapper<>(f);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    进入 tryLockInnerAsync 方法

        <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "return redis.call('pttl', KEYS[1]);",
                    Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    是不是很熟悉,是的就是 Lua 脚本

    这里

    KEYS[1] = getRawName() // 这个就是一开始我们设置的锁的 key
    ARGV[1] = unit.toMillis(leaseTime) // 默认是 30 s
    ARGV[2] = getLockName(threadId) // 这个名字是 uuid + threadId
    
    • 1
    • 2
    • 3
       	protected String getLockName(long threadId) {
            return id + ":" + threadId;
        }
    
     		this.id = commandExecutor.getConnectionManager().getId();
    
        public static ConnectionManager createConnectionManager(Config configCopy) {
          	// 这里创建时生成
            UUID id = UUID.randomUUID();
    
            if (configCopy.getMasterSlaveServersConfig() != null) {
                validate(configCopy.getMasterSlaveServersConfig());
                return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
            } else if (configCopy.getSingleServerConfig() != null) {
                validate(configCopy.getSingleServerConfig());
                return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
            } else if (configCopy.getSentinelServersConfig() != null) {
                validate(configCopy.getSentinelServersConfig());
                return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id);
            } else if (configCopy.getClusterServersConfig() != null) {
                validate(configCopy.getClusterServersConfig());
                return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
            } else if (configCopy.getReplicatedServersConfig() != null) {
                validate(configCopy.getReplicatedServersConfig());
                return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id);
            } else if (configCopy.getConnectionManager() != null) {
                return configCopy.getConnectionManager();
            }else {
                throw new IllegalArgumentException("server(s) address(es) not defined!");
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    ok 参数明白了,让我们看下这个 Lua 脚本

    "if (redis.call('exists', KEYS[1]) == 0) then " +
    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "return nil; " +
    "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "return nil; " +
    "end; " +
    "return redis.call('pttl', KEYS[1]);",
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    第一个 if ,如果无锁则加锁,hash 结构 ,增量 1

    第二个 if 是用来支持重入锁的,就是说存在 key 且 LockName 是一个则 +1 ,从新设置过期时间 (这里 LockName 就是用来判断是否是同一个人加锁,如我们上面设计的那样)

    加锁成功都是返回 nil 也就是 null

    否则的话会返回一个 过期时间

    回到 tryAcquireAsync 方法

    		// 这里又包了一层,实际执行 tryAcquireAsync
        private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
            RFuture<Long> ttlRemainingFuture;
            if (leaseTime > 0) {
                ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
            } else {
              	// leaseTime = -1 进入这个分支逻辑
              	// 区别是 internalLockLeaseTime = 30 * 1000 给了一个默认的值,也就是锁过期时间 30 秒
             		//  this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
                ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                        TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            }
          	//	这里设置了一个回调监听
            CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
                // lock acquired 如果获取锁成功
                if (ttlRemaining == null) {
                    if (leaseTime > 0) {
                      	// 这里如果 leaseTime > 0 重新赋值 internalLockLeaseTime
                      	// 也就是说,自己设置过期时间是不会启动 watchdog 机制的
                        internalLockLeaseTime = unit.toMillis(leaseTime);
                    } else {
                      	// 默认是 -1 走这里的逻辑,这里逻辑很重要 【过期续订】从名字可以看出来这里就是续命逻辑了
                        scheduleExpirationRenewal(threadId);
                    }
                }
                return ttlRemaining;
            });
            return new CompletableFutureWrapper<>(f);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    进入 scheduleExpirationRenewal 方法,看主流程

    protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            try {
              	// 这里执行续期
                renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                    cancelExpirationRenewal(threadId);
                }
            }
        }
    }
    // renewExpiration 执行续期
    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        // 这里创建了一个 TimerTask
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
              	// 获取 threadId
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                // 异步续命
                CompletionStage<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    // 这里当主线程释放锁之后,就会停止续命
                    if (res) {
                        // reschedule itself 这里会重复调用自己,达到一直续命的效果
                        renewExpiration();
                    } else {
                        cancelExpirationRenewal(null);
                    }
                });
            }
          // 这里就是默认 30/3 = 10 每 10s 执行一次
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }
    
    protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
      // 我加的我续命 判断是不是主线程加的锁 重新设置过期时间 30s
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    !! 等等,我似乎发现了华点,节点宕机主线程挂了没有释放锁,怎么办?会一直续命吗。

    这个时候,只需要来一个战术后仰:程序都没了,你觉得定时任务还在吗?定时任务都不在了,所以也不会存在死锁的问题。

    当然你释放锁的代码要写正确,放到 finally 里。

    到这里加锁的主流程就完成了,那其他没有加锁成功的线程在做什么呢?

    他们会一直循环尝试获取锁。

    再回到最初的 lock 方法看看

    public class RedissonLock extends RedissonBaseLock {
    
        @Override
        public void lock() {
            try {
                lock(-1, null, false);
            } catch (InterruptedException e) {
                throw new IllegalStateException();
            }
        }
    		// lock(-1, null, false); 实际调用此方法
        private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
            // 获取当前线程 id
          	long threadId = Thread.currentThread().getId();
          	// 尝试获取锁
            Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired 加锁成功直接返回
            if (ttl == null) {
                return;
            }
    
            CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
            pubSub.timeout(future);
            RedissonLockEntry entry;
            if (interruptibly) {
                entry = commandExecutor.getInterrupted(future);
            } else {
                entry = commandExecutor.get(future);
            }
    
            try {
                while (true) {
                    ttl = tryAcquire(-1, leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        break;
                    }
    
                    // waiting for message 等待一个锁失效的时间后再尝试获取做
                    if (ttl >= 0) {
                        try {
                            entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException e) {
                            if (interruptibly) {
                                throw e;
                            }
                            entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else {
                        if (interruptibly) {
                            entry.getLatch().acquire();
                        } else {
                            entry.getLatch().acquireUninterruptibly();
                        }
                    }
                }
            } finally {
                unsubscribe(entry, threadId);
            }
    //        get(lockAsync(leaseTime, unit));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    最后看下解锁的代码

     @Override
     public void unlock() {
         try {
           	// 这里是主逻辑,下跳看看
             get(unlockAsync(Thread.currentThread().getId()));
         } catch (RedisException e) {
             if (e.getCause() instanceof IllegalMonitorStateException) {
                 throw (IllegalMonitorStateException) e.getCause();
             } else {
                 throw e;
             }
         }
     }
    
     @Override
     public RFuture<Void> unlockAsync(long threadId) {
       	// 主逻辑方法 unlockInnerAsync
         RFuture<Boolean> future = unlockInnerAsync(threadId);
         CompletionStage<Void> f = future.handle((opStatus, e) -> {
             cancelExpirationRenewal(threadId);
             if (e != null) {
                 throw new CompletionException(e);
             }
             if (opStatus == null) {
                 IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                         + id + " thread-id: " + threadId);
                 throw new CompletionException(cause);
             }
             return null;
         });
         return new CompletableFutureWrapper<>(f);
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    主要代码还是在 protected abstract RFuture unlockInnerAsync(long threadId);

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    熟悉的配方,Lua 脚本

    这里为了支持可重入,首先是对 key 值 -1 ,如果是小于 0 ,则删除 key。

    常用配置项 common-settings

    这里注意下 subscriptionsPerConnection 的配置 ,redisson 在加锁的时候会订阅解锁操作,默认订阅的连接数是 5

    subscriptionsPerConnection
    Default value: 5
    
    Subscriptions per Redis connection limit
    
    • 1
    • 2
    • 3
    • 4

    超过上限会报 RedisTimeoutException

    Unable to acquire connection for subscription after " + attempts.get() + " attempts. " +
                                            "Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.
    
    • 1
    • 2

    也就是在加锁的时候,加锁失败会订阅解锁的通知。

    在 lock() 方法中

    CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
    
    在这个类的方法中, lock 的 subscribe 链路
    org.redisson.pubsub.PublishSubscribeService#subscribeNoTimeout(...)
    
    • 1
    • 2
    • 3
    • 4

    在实际生产环境中,需要适当调大这个配置。

    并非完美

    虽然 redisson 加入了 watchdog 机制,解决了设置多少过期时间的问题,但到这里并非说一点问题都没有了。

    在集群下的 redis,主节点加锁成功后未同步到从节点,这个时候主节点宕机,重新选举,此时就有可能出现同时加锁成功的情况。

    此时就是一个取舍的问题,大部分场景可以容忍这种问题,但如果确实想解决,可以改用 Zookeeper ,集群强一致性,

    redisson 也提供了类似的解决方案 RedissonRedLock 红锁

     public RedissonRedLock(RLock... locks) {
            super(locks);
     }
    
    • 1
    • 2
    • 3

    他需要通过多个 RLock 进行构建,实际上就是我要将这个加锁同时同步到多个 redis 后才算成功,当然如此做的代价就是要消耗更多的性能。

    最后的 Tips

    锁的优化:

    1. 减少锁的持有时间 ,尽量缩小加锁后的代码逻辑
    2. 降低锁的请求频率 ,锁分解,锁分段
    3. 使用带有协调机制的独占锁,这些机制允许更高的并发性。如读写锁

    锁分解是采用多个相互独立的锁来保护独立的状态变量,从而改变这些变量在之前由单个锁来保护的情况。这些技术能减小锁操作的粒度,并能实现更高的可伸缩性,然而,使用的锁越多,那么发生死锁的风险也就越高。

    锁分段:比如 ConcurrentHashMap 的分段锁的实现方式。

  • 相关阅读:
    Ubuntu处理依赖问题
    PHP(3)PHP基础语法
    4-20mA转RS-485,Modbus数据采集模块 YL121
    Vue3.2中的setup语法糖(易懂)
    前缀和与差分
    Vue学习—vuex
    day01-3-界面显示&用户登录&餐桌状态显示
    如何避免重复创建线程?创建线程池的方式有哪些?各自优缺点有哪些?
    如何使用MySQL Shell连接数据库
    【类、抽象与继承】
  • 原文地址:https://blog.csdn.net/w903328615/article/details/126164261