前言
这一篇文章拖了有点久,虽然在项目中使用分布式锁的频率比较高,但整理成文章发布出来还是花了一点时间。在一些移动端、用户量大的互联网项目中,经常会使用到 Redis 分布式锁作为控制访问高并发的工具。
一、关于分布式锁
总结:分布式锁是一种在分布式系统中用于控制并发访问的机制。
在分布式系统中,多个客户端同时对一个资源进行操作时,容易影响数据的一致性。分布式锁的主要作用就是确保同一时刻只有一个客户端能够对某个资源进行操作,以避免数据不一致的问题。
主要应用场景:
- 数据库并发控制:在分布式数据库中,多个线程同时对某张表进行操作时,可能会出现并发冲突问题,使用分布式锁可以确保同一时刻只有一个线程能够对该表进行操作,避免并发冲突。
- 分布式缓存:在分布式缓存中,如果多个线程同时对某个缓存进行操作,可能会出现缓存数据不一致的问题。使用分布式锁可以确保同一时刻只有一个线程能够对该缓存进行操作,保证缓存数据的一致性。
- 分布式任务调度:在分布式任务调度中,多个线程同时执行某个任务,可能出现任务被重复执行的问题,使用分布式锁可以确保同一时刻只有一个线程能够执行该任务,避免任务被重复执行。
目前主流的分布式锁实现方案是基于 Redis 来实现的,今天要分享的有 2 种实现: 基于 RedLock 红锁和基于 setIfAbsent() 方法。
二、RedLock 红锁(不推荐)
RedLock 对于多节点(集群)的分布式锁算法使用了多个实例来存储锁信息,这种方式可以提高获取锁的速度和成功率,从而可以有效地防止单点故障;
但由于 RedLock 的实现比较复杂,且容易因为配置不正确而导致锁无法获取。此外,如果 Redis 服务宕机,也会导致锁无法正常使用。
RedLock 会对集群的每个节点进行加锁,如果大多数(N/2+1)加锁成功了,则认为获取锁成功。这个过程中可能会因为网络问题,或节点超时的问题,影响加锁的性能,故而在最新的 Redisson 版本中中已经正式宣布废弃 RedLock。
以下是一个简易的 demo 实现:
包括两部分:暴露给业务系统逻辑层使用的静态方法、锁的底层实现。思路用代码和注释说得比较清楚了,大家可以看一下:
/**
* 尝试获取锁,业务系统用
* @param key key
* @param requestId 唯一请求标识,用于解锁
* @param expireTime 过期时间
* @param timeUnit 过期时间单位
* @return
*/
public static boolean getLock(String key, String requestId, long expireTime, TimeUnit timeUnit) {
RedisSetArgs redisSetArgs = RedisSetArgs.instance().nx().px((int) timeUnit.toMillis(expireTime));
//CacheFactory 为缓存的抽象类,set() 为具体实现
String result = CacheFactory.getCache().set(key, requestId, redisSetArgs);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
/**
* 具体的底层实现
* @param key key
* @param value value
* @param redisSetArgs set 参数对象
* @return 返回值
*/
@Override
public String set(String key, String value, RedisSetArgs redisSetArgs) {
//这里引入的是 Jedis 的客户端,后来被抛弃了,Redis 推荐的是 Redission
try (Jedis jedis = getJedis()) {
SetParams setParams = new SetParams();
if (redisSetArgs.isNx()) {
setParams.nx();
} else if (redisSetArgs.isXx()) {
setParams.xx();
}
if (Objects.nonNull(redisSetArgs.getEx())) {
setParams.ex(redisSetArgs.getEx());
} else if (Objects.nonNull(redisSetArgs.getPx())) {
setParams.px(redisSetArgs.getPx());
}
return jedis.set(SafeEncoder.encode(buildKey(key)), SafeEncoder.encode(value), setParams);
}
}
/**
* 解锁,业务系统用
* @param key key
* @param requestId 唯一请求标识
* @return
*/
public static boolean unlock(String key, String requestId) {
//使用 Lua 脚本保证原子性:RELEASE_LOCK_LUA_SCRIPT = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Object result = CacheFactory.getCache().loose(RELEASE_LOCK_LUA_SCRIPT, Collections.singletonList(key), Collections.singletonList(requestId));
if (NumberUtils.LONG_ONE.equals(result)) {
return true;
}
return false;
}
/**
* 执行解锁脚本(底层实现)
* @param script 脚本
* @param keys keys
* @param args 参数
* @return 返回对象
*/
@Override
public Object loose(String script, List keys, List args) {
try (Jedis jedis = getJedis()) {
return jedis.eval(SafeEncoder.encode(script), keys.stream().map(this::buildKey).map(SafeEncoder::encode).collect(Collectors.toList()),
args.stream().map(SafeEncoder::encode).collect(Collectors.toList()));
}
}
三、基于 setIfAbsent() 方法
以下推荐的是在分布式集群环境中的最佳实践,其实无论是单机还是集群,保证原子性都是第一位的,如果能同时保证性能和高可用,那么就是一个可靠的分布式锁解决方案。
主要思路是:设置锁时,使用 setIfAbsent() 方法,因为其底层实际包含了 setnx 、expire 的功能,起到了原子操作的效果。给 key 设置随机且唯一的值,并且只有在 key 不存在时才设置成功返回 True,并且设置 key 的过期时间(最好是毫秒级别)。
以下同样给出一个简单的示例,包括两部分:暴露给业务系统逻辑层使用的静态方法、锁的底层实现。注释写得比较清楚了:
/**
* 获取锁,业务系统用
* @return 解锁唯一标识
*/
public String getLock() {
try {
// 获取锁的超时时间,超过这个时间则取锁失败
long end = System.currentTimeMillis() + acquireTimeout;
// 随机生成一个 value 作为解锁的唯一标识
this.unLockIdentify = UUID.randomUUID().toString();
while (System.currentTimeMillis() < end) {
Boolean result = iCache.setIfAbsent(lockKey, this.unLockIdentify, Duration.ofMillis(expireTime));
if (result) {
return this.unLockIdentify;
}
try {
//再休眠 100 微秒
TimeUnit.MICROSECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (Exception e) {
logger.error("error info", e);
}
return null;
}
/**
* 具体的 setIfAbsent() 底层实现
* @param key 键
* @param value 值
* @param timeout 超时时间
* @return 是否设置成功
*/
public Boolean setIfAbsent(String key, String value, Duration timeout) {
return this.redisTemplate.opsForValue().setIfAbsent(key, value, timeout);
}
/**
* 释放锁,业务系统用
* @param unLockIdentify 解锁唯一标识
* @return 是否解锁成功
*/
public Boolean loose(String unLockIdentify) {
if (unLockIdentify == null) {
return Boiolfalse;
}
try {
if (iCache.deleteIfEquals(lockKey, unLockIdentify)) {
return Boolean.TRUE;
}
} catch (Exception e) {
logger.error("error info", e);
}
return Boolean.FALSE;
}
/**
* 具体判断方法实现(底层实现)
* @param key 键
* @param expectedValue 期望的值
* @return 是否相等
*/
public Boolean deleteIfEquals(String key, String expectedValue) {
//Lua 脚本保证原子性
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript redisScript = new DefaultRedisScript<>(script, Long.class);
Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), expectedValue);
return result != null && result == 1;
}
四、使用示例
下面分别给出两个使用示例分别来介绍怎么在具体的业务场景中去使用的 demo,一般来说针对数据库的并发操作和多线程的并发任务操作,会使用得比较多。
至于为什么不使用分布式锁去保证缓存数据的一致性,其实是有专门的分布式缓存方案的:https://www.cnblogs.com/CodeBlogMan/p/18022719
4.1RedLock 使用
@Test
public void testRedLock(){
final String requestId = UUIDUtils.generateUUID();
if (RedisRedLock.attemptLock("xxxSys.insert.xxxId(唯一)", requestId, 3L, TimeUnit.SECONDS)) {
try {
//todo: 数据库并发插入操作
log.info("并发插入成功!");
} catch (Throwable e) {
//底层没有加 try catch,所以这里加一下
log.error("并发插入失败! error", e);
} finally {
RedisRedLock.unlock("xxxSys.insert.xxxId(唯一)", requestId);
}
}
}
4.2setIfAbsent() 方法使用
@Test
public void testDistributedLock(){
//这里是抽像类和接口,具体使用可以更加灵活
DistributedLock distributedLock = CacheFactory.getDistributedLock("xxxSys.insert.xxxId(唯一)" ,3,1);
Assert.hasText(distributedLock.getLock(), "操作频繁,请稍后重试");
//todo: 多线程的并发任务操作
if (distributedLock.loose("xxxSys.insert.xxxId(唯一)")){
//这里是为了演示才加的日志,其实底层已经加过了
log.info("释放锁成功!");
}
}
五、文章小结
到这里基于 Redis 实现分布式锁的全过程就分享完了,其实基于 Redis 实现分布式锁还有许多底层和实际应用的情况没有展开来说。目前笔者虽然在日常项目里有较多使用,但还是感到技术的海洋深不见底:学到的越多就感觉到自己的不足越多。
最后,如果文章有不足和错误,还请大家指正。或者你有其它想说的,也欢迎大家在评论区里交流!