相信大多数同学在开始接触 “锁” 时都是 java 中本身给我们提供的关键字 synchronized。
但是,实际工作中,我们真的会用 synchronized 来对一些共享资源、互斥场景进行加锁操作吗?
实际上是不行的(当然如果你的系统仅仅是单机,之后也不考虑会扩容的情况当我没说),通常我们生产的项目都是部署在多机是存在多个服务的,所以会选择使用分布式锁。
本文将一步步带你了解:
为何 synchronized 无法对多机环境起作用?
如何一步步实现商业级别的分布式锁?
探究现成分布式锁解决方案 redisson 源码
先来看一个典型的 synchronized 单机锁的实现,以红包获取接口为例构建一个简单的共享互斥场景。
代码基于 Spring Boot 框架,为了文章的连贯性和可阅读性,代码实例只给出部分核心逻辑
// 初始化一个红包锁
private final static Object redPacketLock = new Object();
@PostMapping("/getRedPacket")
public String getRedPacket() {
// 加锁
synchronized (redPacketLock) {
// 查询当前剩余红包
Long redPacketNum = getRedPacketNum();
log.info("getRedPacket ={}", redPacketNum);
if (redPacketNum <= 0) {
return "RedPacketNum less than 0 " + redPacketNum;
} else {
// 获取红包
doGetRedPacket();
return "got it 1 , RedPacketNum: " + getRedPacketNum();
}
}
}
在理想的锁获取场景下(如此时在单机环境),这个代码是没有任何问题的,如上图,用户 1 和 用户 2 ,有序的获取共享资源锁,并消费资源。
但,如果此时我们有两台服务器,synchronized 还能正常工作吗?让我们一起看下这个场景
synchronized 作为 jvm 级别的锁,它只能作用于单个服务器上,也就是说多个服务器使用 synchronized 就相当于有多把锁,自己用自己的,就像家里有多个进出的大门(同一时间供一人进入),那么有几个大门就可以同时允许几个人进入,结果显而易见,这个锁失败的。
上图的例子就出现了红包被抢成 负值 的情况,在并发更大机器更多的场景下,情况可能会更糟。(Ooo No 这可如何是好……)
这个时候可能有人就慌了,关键时刻我们不要慌,想想加锁的本质是什么?
其实就是将并行的操作串行化,多服务器同样如此。
如何串行化整个操作流程就是我们需要考虑的问题。
redis 的高性能,串行化 用来来解决这个问题就非常合适。
redis 为我们提供了 setnx 命令,如果 key 已经设置了值将返回 0,以下是部分代码帮助理解逻辑
@PostMapping("/getRedPacketWithDistributedLock")
public String getRedPacketWithDistributedLock() {
try {
while (true) {
// 如果加锁不成功就会一直循环
if (redisCache.setnx(distributedLockKey, "1") == 0) {
continue;
}
// 为锁设置一个自动过期时间
redisCache.expire(distributedLockKey, 10);
Long redPacketNum = getRedPacketNum();
log.info("getRedPacket ={}", redPacketNum);
if (redPacketNum <= 0) {
return "RedPacketNum less than 0 " + redPacketNum;
} else {
doGetRedPacket();
return "got it 1 , RedPacketNum: " + getRedPacketNum();
}
}
} catch (Exception e) {
log.error("getRedPacketWithDistributedLock error!", e);
} finally {
// 释放锁
redisCache.del(distributedLockKey);
}
return "get lock fail";
}
这个代码在分布式场景下运行良好。此时之前的问题解决了,我们的 “门” 重新回归了一个
长舒一口气,哦,似乎问题已经解决了,很简单嘛,几行代码就 ok 了。但事实真的如此吗,让我们看看这段代码中隐藏的陷阱。
问题剖析
@PostMapping("/getRedPacketWithDistributedLock")
public String getRedPacketWithDistributedLock() {
try {
while (true) {
if (redisCache.setnx(distributedLockKey, "1") == 0) {
continue;
}
//TODO 1.此处中断,过期时间没设置上
//TODO 2.过期时间设置多少,没执行完就过期了怎么办?
redisCache.expire(distributedLockKey, 10);
//…………省略业务逻辑
break;
}
} catch (Exception e) {
log.error("getRedPacketWithDistributedLock error!", e);
} finally {
//TODO 3.超过过期时间删掉了别人刚加的锁,锁失效
redisCache.del(distributedLockKey);
}
return "get lock fail";
}
TODO 1 此处中断,过期时间没设置上
TODO 2 过期时间设置多少,没执行完就过期了怎么办?
TODO 3 超过过期时间删掉了别人刚加的锁,锁失效
以上 TODO 对应 3 个问题,你可以先思考一下 🤔 应该如何解决。
大概有的同学会疑惑,看着好像都是过期时间搞的鬼,你设置过期时间干嘛,不设置不啥事情都没有了?!
还是讲下:设置过期时间的原因是 即使我们的锁释放是在 finally 中执行,但仍可能执行失败,比如中断,这会造成更严重问题,永久锁,这个锁不会被任何人释放,所以任何人都无法使用这个服务了。所以需要过期时间来处理程序中断异常导致的永久锁问题。
ps : 根据业务场景不同,有些业务是可以容忍部分的锁失效的情况,所以在这种场景下以上代码已经可以胜任,且好处是简单易懂 毕竟 keep it simple
之所以存在设置 key 过期时间失败的问题,是我们分成了两步进行,整体的操作不原子,问题就产生了。
Redis 为我们提供了内部事务实现: Lua 脚本,通过编写 Lua 脚本,可以将设置 key 和 设置过期时间放到一个事务中进行,要么都成功要么都失败。
public boolean setNxWithExpire(String key, String uId, int seconds) {
String luaScripts = "if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[1], 1);" +
"redis.call('expire', KEYS[1], ARGV[2]);" +
"return 1;" +
"end;" +
"return 0;";
List<String> keys = new ArrayList<>();
List<String> values = new ArrayList<>();
keys.add(key);
values.add(uId);
values.add(String.valueOf(seconds));
long result = (long) jedis.eval(luaScripts, keys, values);
log.info("setNxWithExpire key={}, uId={}, seconds={}, result={}", key, uId, seconds, result);
return result == 1;
}
具体语法就不多解释了,用到查文档即可(PS 后文 redisson 源码剖析中会有部分讲解)。至此 TODO 1 解决。
TODO 2 ,先按下不表,先来讲讲 TODO 3,怎么就删掉了别人的锁呢?好端端的锁为何就失效了?让我们看一个较为复杂的场景(而实际情况可能更为复杂)
这里 用户1 首先加锁成功,但是执行时间超出了锁的过期时间,这时锁过期自动释放,被用户 2 抢占,此时共享资源就有两个人在共用,这时处理 用户 1 的线程执行完毕进行锁释放操作,结果却释放了 用户 2 加的锁,又变成了无锁状态,用户 3 又可以获取锁了。
再执行下去的结果显而易见,又出现了多抢红包的问题,钱不翼而飞了。
而在高并发场景下,可能一直持续这种情况,我的锁被前人释放,循环往复,这个锁加与不加已经没有任何区别了。
如何是好?咋办?咋办?!
细心的同学应该注意到,在 TODO 1 的方法参数有一个 uid 的参数。uid 的作用在于标识这个锁是谁加的,谁加的谁有权力释放。
所以释放锁的方法就变成了这样
public boolean delNxWithUid(String key, String uId) {
String luaScripts = "if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"return 1;" +
"end;" +
"return 0;";
List<String> keys = new ArrayList<>();
List<String> values = new ArrayList<>();
keys.add(key);
values.add(uId);
long result = (long) jedis.eval(luaScripts, keys, values);
log.info("delNxWithUid key={}, uId={}, result={}", key, uId, result);
return result == 1;
}
有效避免了我加的锁他人释放的问题。
回到 TODO 2,过期时间问题,虽然我们引入过期时间来解决 “永久锁” 问题,但是同样带来了新的问题。
令人头疼的是过期的时间设置多少才好?太多太少都不太合适,我们很难预估一个恰好的过期时间,而对于复杂的业务场景,代码运行期间耗时操作卡死,或者 timeout ,都有可能导致锁过期释放。
如果能在程序运行的时候,给锁续命就好了,只要我还在执行,就在合适的时候延长锁的过期时间。
没错,这是一个好方法,我们可以开一个守护线程来为我们主线程加的锁 “续命”
在 redisson 中称其为 watchdog 看门狗,来看看 redisson 是如何实现分布式锁的
Redisson Github 地址 使用 redisson 后,代码又回到了最初的样子
@PostMapping("/getRedPacketWithRedissonLock")
public String getRedPacketWithRedissonLock() {
RLock redissonLock = redissonClient.getLock(redPacketLockKey);
try {
redissonLock.lock();
Integer redPacketNum = getRedPacketNum();
log.info("getRedPacket ={}", redPacketNum);
if (redPacketNum <= 0) {
return "RedPacketNum less than 0 " + redPacketNum;
} else {
doGetRedPacket();
return "got it 1 , RedPacketNum: " + getRedPacketNum();
}
} catch (Exception e) {
log.error("getRedPacketWithDistributedLock error!", e);
} finally {
redissonLock.unlock();
}
return "get lock fail";
}
进入 lock 方法,基础的实现, 此处省略其他方法
public class RedissonLock extends RedissonBaseLock {
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
// lock(-1, null, false); 实际调用此方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 获取当前线程 id
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 订阅解锁通知
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
}
进入 tryAcquire 方法
// Long ttl = tryAcquire(-1, leaseTime, unit, threadId); 注意这里传参是 tryAcquire(-1, -1, null, threadId);
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
// 这里又包了一层,实际执行 tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// leaseTime = -1 进入这个分支逻辑
// 区别是 lockWatchdogTimeout = 30 * 1000 给了一个默认的值,也就是锁过期时间 30 秒
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
进入 tryLockInnerAsync 方法
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
是不是很熟悉,是的就是 Lua 脚本
这里
KEYS[1] = getRawName() // 这个就是一开始我们设置的锁的 key
ARGV[1] = unit.toMillis(leaseTime) // 默认是 30 s
ARGV[2] = getLockName(threadId) // 这个名字是 uuid + threadId
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
this.id = commandExecutor.getConnectionManager().getId();
public static ConnectionManager createConnectionManager(Config configCopy) {
// 这里创建时生成
UUID id = UUID.randomUUID();
if (configCopy.getMasterSlaveServersConfig() != null) {
validate(configCopy.getMasterSlaveServersConfig());
return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
} else if (configCopy.getSingleServerConfig() != null) {
validate(configCopy.getSingleServerConfig());
return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
} else if (configCopy.getSentinelServersConfig() != null) {
validate(configCopy.getSentinelServersConfig());
return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id);
} else if (configCopy.getClusterServersConfig() != null) {
validate(configCopy.getClusterServersConfig());
return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
} else if (configCopy.getReplicatedServersConfig() != null) {
validate(configCopy.getReplicatedServersConfig());
return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id);
} else if (configCopy.getConnectionManager() != null) {
return configCopy.getConnectionManager();
}else {
throw new IllegalArgumentException("server(s) address(es) not defined!");
}
}
ok 参数明白了,让我们看下这个 Lua 脚本
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
第一个 if ,如果无锁则加锁,hash 结构 ,增量 1
第二个 if 是用来支持重入锁的,就是说存在 key 且 LockName 是一个则 +1 ,从新设置过期时间 (这里 LockName 就是用来判断是否是同一个人加锁,如我们上面设计的那样)
加锁成功都是返回 nil 也就是 null
否则的话会返回一个 过期时间
回到 tryAcquireAsync 方法
// 这里又包了一层,实际执行 tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// leaseTime = -1 进入这个分支逻辑
// 区别是 internalLockLeaseTime = 30 * 1000 给了一个默认的值,也就是锁过期时间 30 秒
// this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
// 这里设置了一个回调监听
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired 如果获取锁成功
if (ttlRemaining == null) {
if (leaseTime > 0) {
// 这里如果 leaseTime > 0 重新赋值 internalLockLeaseTime
// 也就是说,自己设置过期时间是不会启动 watchdog 机制的
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 默认是 -1 走这里的逻辑,这里逻辑很重要 【过期续订】从名字可以看出来这里就是续命逻辑了
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
进入 scheduleExpirationRenewal 方法,看主流程
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
// 这里执行续期
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
// renewExpiration 执行续期
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 这里创建了一个 TimerTask
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 获取 threadId
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 异步续命
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 这里当主线程释放锁之后,就会停止续命
if (res) {
// reschedule itself 这里会重复调用自己,达到一直续命的效果
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
// 这里就是默认 30/3 = 10 每 10s 执行一次
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
// 我加的我续命 判断是不是主线程加的锁 重新设置过期时间 30s
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
!! 等等,我似乎发现了华点,节点宕机主线程挂了没有释放锁,怎么办?会一直续命吗。
这个时候,只需要来一个战术后仰:程序都没了,你觉得定时任务还在吗?定时任务都不在了,所以也不会存在死锁的问题。
当然你释放锁的代码要写正确,放到 finally 里。
到这里加锁的主流程就完成了,那其他没有加锁成功的线程在做什么呢?
他们会一直循环尝试获取锁。
再回到最初的 lock 方法看看
public class RedissonLock extends RedissonBaseLock {
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
// lock(-1, null, false); 实际调用此方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 获取当前线程 id
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired 加锁成功直接返回
if (ttl == null) {
return;
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message 等待一个锁失效的时间后再尝试获取做
if (ttl >= 0) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
}
最后看下解锁的代码
@Override
public void unlock() {
try {
// 这里是主逻辑,下跳看看
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
// 主逻辑方法 unlockInnerAsync
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
主要代码还是在 protected abstract RFuture
中
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
熟悉的配方,Lua 脚本
这里为了支持可重入,首先是对 key 值 -1 ,如果是小于 0 ,则删除 key。
这里注意下 subscriptionsPerConnection 的配置 ,redisson 在加锁的时候会订阅解锁操作,默认订阅的连接数是 5
subscriptionsPerConnection
Default value: 5
Subscriptions per Redis connection limit
超过上限会报 RedisTimeoutException
Unable to acquire connection for subscription after " + attempts.get() + " attempts. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.
也就是在加锁的时候,加锁失败会订阅解锁的通知。
在 lock() 方法中
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
在这个类的方法中, lock 的 subscribe 链路
org.redisson.pubsub.PublishSubscribeService#subscribeNoTimeout(...)
在实际生产环境中,需要适当调大这个配置。
虽然 redisson 加入了 watchdog 机制,解决了设置多少过期时间的问题,但到这里并非说一点问题都没有了。
在集群下的 redis,主节点加锁成功后未同步到从节点,这个时候主节点宕机,重新选举,此时就有可能出现同时加锁成功的情况。
此时就是一个取舍的问题,大部分场景可以容忍这种问题,但如果确实想解决,可以改用 Zookeeper ,集群强一致性,
redisson 也提供了类似的解决方案 RedissonRedLock 红锁
public RedissonRedLock(RLock... locks) {
super(locks);
}
他需要通过多个 RLock 进行构建,实际上就是我要将这个加锁同时同步到多个 redis 后才算成功,当然如此做的代价就是要消耗更多的性能。
锁的优化:
锁分解是采用多个相互独立的锁来保护独立的状态变量,从而改变这些变量在之前由单个锁来保护的情况。这些技术能减小锁操作的粒度,并能实现更高的可伸缩性,然而,使用的锁越多,那么发生死锁的风险也就越高。
锁分段:比如 ConcurrentHashMap 的分段锁的实现方式。