分布式锁是一种用于在分布式系统中实现同步和互斥访问的机制。在分布式系统中,多个节点同时访问共享资源可能会导致数据不一致或竞争条件的发生。分布式锁提供了一种保护共享资源的方式,以确保在任意时刻只有一个节点可以访问该资源,如:同一时刻每个订单只能有一个线程操作取消订单功能。
实现的分布式锁,需要具备一下特征:
特点 | 描述 |
---|---|
独占性 | 任何时刻有且只有一个线程持有使用该锁 |
高可用 & 高性能 | 程序不易崩溃,时刻都保证较高的可用性,在redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况;在高并发请求下,分布式锁依旧具有良好的性能; |
防死锁 | 不能出现死锁问题,必须有超时重试机制或者撤销操作,有个终止跳出的途径; |
不乱抢 | 多线程下,防止张冠李戴,只能解锁自己的锁,不能把别人的锁给释放了; |
重入性 | 同一节点的同一线程如果获得锁之后,该线程可以再次获取使用这个锁 |
在常规的实现方式中Redis锁机制一般是由 setnx
命令实现是”set if not exists”的简写,语法setnx key value
,将key设置值为value,如果key不存在会返回1
,这种情况下等同 set
命令。 当key存在时,什么也不做会返回0
,并且要使用 expire
设置一个锁的过期时间,避免应用程序异常导致锁一直没有释放。
例如:
127.0.0.1:6379> setnx key1 1
(integer) 1
127.0.0.1:6379> setnx key1 1
(integer) 0
127.0.0.1:6379> expire key1 60
(integer) 1
但是上面的setnx
和expire
实现分布式锁的方式是不安全,两条命令非原子性的,并不能保证一致性,可以通过一些第三方框架或者自己通过lua
脚本实现原子操作,下面会通过代码分析分布式锁来实现。
这里使用的是SpringBoot环境,会使用RedisTemplate的API操作Redis实现分布式锁,解决分布式锁原子性、死锁、误删、可重入、自动续期等问题。
需要SpringBoot集成调用Redis资料的可以跳转:https://blog.csdn.net/weixin_44606481/article/details/133907103
这个分布式锁实现工具类已经将分布式锁原子性、死锁、误删、可重入、自动续期等问题都已解决,为了做演示和重点讲解问题解决步骤这里没有进行特定封装,可以根据需要自行封装增强拓展性。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author Kerwin
*/
@Component
public class RedisLockUtils {
@Autowired
private RedisTemplate<String, String> redisTemplate;
// 活跃锁key+value集合,续期的时候会使用
private volatile static CopyOnWriteArraySet activeLockKeySet = new CopyOnWriteArraySet();
// 定时线程池 用于续期
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
/**
* 加锁
* @Param key
* @Param value 锁的value一般使用线程ID,在解锁时需要使用
* @Param expireTime 过期时间 单位秒
*/
public boolean lock(String key, String value, long expireTime) {
// 为了实现锁的可重入这里要自己封装一个lua脚本,如果不考虑可重入可以直接使用redisTemplate.opsForValue().setIfAbsent(K key, V value, long timeout, TimeUnit unit)
String lockLua = getLockLua();
boolean result = executeLua(lockLua, key, value, String.valueOf(expireTime));
// 当加锁成功且活跃锁key+value不在集合中则添加续期任务
if (result && !activeLockKeySet.contains(key + value)) {
// 将活跃锁key+value放入集合中
activeLockKeySet.add(key + value);
// 加锁成功添加续期任务
resetExpire(key, value, expireTime);
}
return result;
}
/**
* 获取加锁lua脚本
*/
private String getLockLua() {
// 封装加锁lua脚本 PS: 这个lua脚本应该是要定义成全局的,我这里为了演示定义成局部组装方便介绍每一步流程
// lua脚本参数介绍 KEYS[1]:传入的key ARGV[1]:传入的value ARGV[2]:传入的过期时间
// 在使用redisTemplate执行lua脚本时会传入key数组和参数数组,List keys, Object... args,在lua脚本中取key值和参数值时使用KEYS和ARGV,数组下标从1开始
StringBuilder lockLua = new StringBuilder();
// 通过SETNX命令设置锁,如果设置成功则添加一个过期时间并且返回1,否则判断是否为重入锁
lockLua.append("if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then\n");
lockLua.append(" redis.call('EXPIRE', KEYS[1], tonumber(ARGV[2]))\n");
lockLua.append(" return 1\n");
lockLua.append("else\n");
// 当锁已经存在时,判断传入的value是否相等,如果相等代表为重入锁返回1并且重置过期时间,否则返回0
lockLua.append(" if redis.call('GET', KEYS[1]) == ARGV[1] then\n");
lockLua.append(" redis.call('EXPIRE', KEYS[1], tonumber(ARGV[2]))\n");
lockLua.append(" return 1\n");
lockLua.append(" else\n");
lockLua.append(" return 0\n");
lockLua.append(" end\n");
lockLua.append("end");
return lockLua.toString();
}
/**
* 解锁
* @Param key
* @Param value 锁的value一般使用线程ID,在解锁时需要判断是当前线程才运行删除
*/
public boolean unlock(String key, String value) {
// 为了实现避免误删锁,这里要自己封装一个lua脚本
String unLockLua = getUnlockLua();
boolean result = executeLua(unLockLua, key, value);
if (result) {
// 将活跃锁key+value从集合中删除
activeLockKeySet.remove(key + value);
}
return result;
}
/**
* 获取解锁lua脚本
*/
private String getUnlockLua() {
// 封装解锁lua脚本 PS: 这个lua脚本应该是要定义成全局的,我这里为了演示定义成局部组装方便介绍每一步流程
// lua脚本参数介绍 KEYS[1]:传入的key ARGV[1]:传入的value
// 在使用redisTemplate执行lua脚本时会传入key数组和参数数组,List keys, Object... args,在lua脚本中取key值和参数值时使用KEYS和ARGV,数组下标从1开始
StringBuilder unlockLua = new StringBuilder();
// 判断传入的锁key是否存在,如果不存在则直接返回1,如果存在则判断传入的value值是否和获取到的value相等
unlockLua.append("if redis.call('EXISTS',KEYS[1]) == 0 then\n");
unlockLua.append(" return 1\n");
unlockLua.append("else\n");
// 判断传入的value值是否和获取到的value相等,如果相等则代表是当前线程删除锁,执行删除对应key逻辑,然后返回1,否则返回0
unlockLua.append(" if redis.call('GET',KEYS[1]) == ARGV[1] then\n");
unlockLua.append(" return redis.call('DEL',KEYS[1])\n");
unlockLua.append(" else\n");
unlockLua.append(" return 0\n");
unlockLua.append(" end\n");
unlockLua.append("end");
return unlockLua.toString();
}
/**
* 封装redisTemplate执行lua脚本返回boolean类型执行器
* @param scriptText lua脚本
* @param key 传入数组keys的第一个元素这里就是我们锁key
* @param args 传入数组args的第一个元素这里就是我们传入的value
*/
private boolean executeLua(String scriptText, String key, Object... args) {
// 通过 DefaultRedisScript 来执行 lua脚本
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
// Boolean 对应 lua脚本返回的 0 1
redisScript.setResultType(Boolean.class);
// 指定需要执行的 lua脚本
redisScript.setScriptText(scriptText);
// 注意 需要提供 List keys, Object... args 代表 keys 和 ARGV
return redisTemplate.execute(redisScript, Collections.singletonList(key), args);
}
/**
* 锁续期
* @Param key
* @Param value 锁的value一般使用线程ID,在解锁时需要使用
* @Param expireTime 过期时间 单位秒,
*/
private void resetExpire(String key, String value, long expireTime) {
// 如果key+value在集合中不存在,则不再进行续期操作
if (!activeLockKeySet.contains(key + value)) {
return;
}
//设置过期时间,推荐设置成过期时间的1/3时间续期一次,比如30s过期,10s续期一次
long delay = expireTime <= 3 ? 1 : expireTime / 3;
executorService.schedule(() -> {
System.out.println("自动续期 key="+key+ " value="+value);
// 执行续期操作,如果续期成功则再次添加续期任务,如果不成功则将不在进行任务续期,并且将活跃锁key+value从集合中删除
if (executeLua(getResetExpireLua(), key, value, String.valueOf(expireTime))) {
System.out.println("自动续期成功开启下一轮自动续期");
resetExpire(key, value, expireTime);
} else {
System.out.println("自动续期失败锁key已经删除或不是指定value持有的锁,取消自动续期");
activeLockKeySet.remove(key + value);
}
}, delay, TimeUnit.SECONDS);
}
/**
* 获取锁续期lua脚本
*/
private String getResetExpireLua() {
// 封装续期lua脚本 PS: 这个lua脚本应该是要定义成全局的,我这里为了演示定义成局部组装方便介绍每一步流程
// lua脚本参数介绍 KEYS[1]:传入的key ARGV[1]:传入的value ARGV[2]:传入的过期时间
// 在使用redisTemplate执行lua脚本时会传入key数组和参数数组,List keys, Object... args,在lua脚本中取key值和参数值时使用KEYS和ARGV,数组下标从1开始
StringBuilder resetExpireLua = new StringBuilder();
// 判断传入的锁key是否存在且获取到的value值是否和传入的value值相等,如果相等则重置过期时间,然后返回1,否则返回0
resetExpireLua.append("if redis.call('EXISTS',KEYS[1]) == 1 and redis.call('GET',KEYS[1]) == ARGV[1] then\n");
resetExpireLua.append(" redis.call('EXPIRE',KEYS[1],tonumber(ARGV[2]))\n");
resetExpireLua.append(" return 1\n");
resetExpireLua.append("else\n");
resetExpireLua.append(" return 0\n");
resetExpireLua.append("end");
return resetExpireLua.toString();
}
}
这个测试类中模拟1000个用户抢10个商品,测试是否会出现超卖情况,提供了两个方法,一个使用分布式锁,一个不使用分布式锁,当使用分布式锁是不会出现超卖,当没有使用分布式时肯定会出现超卖。
import com.redisscene.utils.RedisLockUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RedisSceneApplication.class)
public class RedisLockTest {
@Autowired
private RedisLockUtils redisLockUtils;
// 产品库存lock前缀
private final String productLockKeyPrefix = "PRODUCT_LOCK_KEY:";
// 模拟产品库存信息
private static Map<String, String> productMap = new HashMap<>();
static {
productMap.put("id", "P0001");
productMap.put("title", "分布式锁");
productMap.put("stock", "10");
productMap.put("sold", "0");
}
@Test
public void t1() throws InterruptedException {
// 定义一个线程池,队列根据需要设置
ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));
CountDownLatch countDownLatch = new CountDownLatch(1000);
// 模拟10000个人抢10个商品
for (int i = 0; i < 1000; i++) {
executor.execute(() -> {
// 加锁扣减库存
deductStockLock();
// 不加锁扣减库存
// deductStockNotLock();
countDownLatch.countDown();
});
}
countDownLatch.await();
executor.shutdown();
System.out.println("productMap=" + productMap.toString());
}
/**
* 扣减库存:使用分布式锁
*/
private void deductStockLock() {
// 通过UUID和线程ID组合成value,标识当前线程,解锁的时候判断是否是当前线程持有的锁
String uuidValue = UUID.randomUUID() + ":" + Thread.currentThread().getId();
// 组装锁key
String lockKey = productLockKeyPrefix + productMap.get("id");
boolean lock = false;
try {
// 获取锁
lock = redisLockUtils.lock(lockKey, uuidValue, 30);
// 测试锁续期效果 这里模拟业务处理时间40s超过
Thread.sleep(40000);
// 再次获取锁,测试可重入效果
lock = redisLockUtils.lock(lockKey, uuidValue, 30);
// 如果没有获取到锁则直接返回
if (!lock) {
// 这里直接响应失败,也可以进行重试
return;
}
System.out.println("获取锁成功 uuidValue=" + uuidValue);
// 获取到锁执行业务逻辑,处理库存信息,假设每个线程每次购买1个商品
Integer stock = Integer.valueOf(productMap.get("stock"));
if (stock <= 0) {
// System.out.println("库存不足");
return;
}
// 库存 - 1
productMap.put("stock", String.valueOf(Integer.valueOf(productMap.get("stock")) - 1));
// 已售 + 1
productMap.put("sold", String.valueOf(Integer.valueOf(productMap.get("sold")) + 1));
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock) {
// 解锁
redisLockUtils.unlock(lockKey, uuidValue);
}
}
}
/**
* 扣减库存:不使用分布式锁
*/
private void deductStockNotLock() {
// 判断库存是否足够,假设每个线程每次购买1个商品
Integer stock = Integer.valueOf(productMap.get("stock"));
if (stock <= 0) {
// System.out.println("库存不足");
return;
}
// 暂停10毫秒方便呈现不加锁超卖效果
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 库存 - 1
productMap.put("stock", String.valueOf(Integer.valueOf(productMap.get("stock")) - 1));
// 已售 + 1
productMap.put("sold", String.valueOf(Integer.valueOf(productMap.get("sold")) + 1));
}
}
在章节四中的分布式锁实现工具类中已经将下面的问题都解决,并且注释的也比较详细。
使用分布式锁最害怕的问题就是出现死锁,自己业务上写出的死锁这里不做说明,这里只介绍异常情况出现死锁应该如何解决,在业务异常没有处理好或者应用服务宕机没有解锁就会出现死锁问题,锁key一直存储在Redis中不会被释放,后续业务恢复去获取锁时因为锁已经存在一直都无法获取到锁,这就是死锁问题,但是死锁问题很好解决,只要给锁key加上一个过期时间即可。
// setIfAbsent方法等同于setnx,当这个key不存在时插入成功返回true,当key存在时不做处理返回false
boolean lock = redisTemplate.opsForValue().setIfAbsent(key1, value1);
// 加锁成功设置一个30s过期时间
if(lock){
redisTemplate.expire(key,30,TimeUnit.SECONDS);
}
lua
脚本来解决原子性问题。 在5.1中给锁key设置超时时间解决了死锁问题,但是因为是分为两个步骤操作,需要分别调用Redis不具备原子性,要保证原子性只要保证将两个步骤合并成一个Redis调用即可,核心思想是通过lua
脚本来实现,可以直接通过RedisTemplate操作Redis执行lua
脚本,Redis执行lua
脚本也是单线程的所以可以保证原子性。
lua
脚本实现加锁并且添加过期时间-- KEYS[1]:传入的key ARGV[1]:传入的value ARGV[2]:传入的过期时间
-- 使用setnx插入key,如果成功给key设置一个过期时间然后返回1,如果失败直接返回0
if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then
redis.call('EXPIRE', KEYS[1], tonumber(ARGV[2]))
return 1
else
return 0
end
RedisTemplate
的 setIfAbsent
方法,setIfAbsent
方法等同于setnx
,而且这个方法还实现了原子性给key添加过期时间操作,具体实现和我们上面lua
脚本类似// 当这个key不存在时插入成功并且设置一个超时时间然后返回true,当key存在时不做处理返回false
boolean lock = redisTemplate.opsForValue().setIfAbsent(key1, value1, 30, TimeUnit.SECONDS);
可重入锁又名递归锁:是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。如果是1个有 synchronized 修饰的递归调用方法,程序第2次进入被自己阻塞了,就很麻烦。所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁,分布式锁实现可重入也比较简单只要在保证原子性的lua脚本中在判断value值即可。
lua
脚本实现-- KEYS[1]:传入的key ARGV[1]:传入的value ARGV[2]:传入的过期时间
-- 通过SETNX命令设置锁,如果设置成功则添加一个过期时间并且返回1,否则判断是否为重入锁
if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then
redis.call('EXPIRE', KEYS[1], tonumber(ARGV[2]))
return 1
else
-- 当锁已经存在时,判断传入的value是否相等,如果相等代表为重入锁返回1并且重置过期时间,否则返回0
if redis.call('GET', KEYS[1]) == ARGV[1] then
redis.call('EXPIRE', KEYS[1], tonumber(ARGV[2]))
return 1
else
return 0
end
end
持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删,要想解决误删只需要判断一下这把锁是否属于自己,只能删除属于自己的锁,这里会将一个UUID + 线程ID 作为锁的 value,在删除锁的时候判断value值是否相同即可。
lua
脚本实现-- KEYS[1]:传入的key ARGV[1]:传入的value
-- 判断传入的锁key是否存在,如果不存在则直接返回1,如果存在则判断传入的value值是否和获取到的value相等
if redis.call('EXISTS',KEYS[1]) == 0 then
return 1
else
-- 判断传入的value值是否和获取到的value相等,如果相等则代表是当前线程删除锁,执行删除对应key逻辑,然后返回1,否则返回0
if redis.call('GET',KEYS[1]) == ARGV[1] then
return redis.call('DEL',KEYS[1])
else
return 0
end
end
假设我们给某个业务分布式锁设置了30s的过期时间,但是这个业务要执行40s,在30s后锁过期其它线程也可以获取到这把锁,但是前一个线程还没有执行完毕,这样显然是有问题的,要对还没有执行完的业务进行锁的自动续期操作。
我这里采用的是定时线程池ScheduledExecutorService
来实现,在加锁时同步给定时线程池添加一个定时任务,定时时间一般设置为过期时间的1/3,其中还有考虑重入问题会使用一个set集合存储key+value的组合值,每个key+value只能被添加一次,续期方法在 4.1、分布式锁实现工具类的resetExpire
方法中有详细描述。