
一、基于MySQL
在数据库中创建一个表,表中包含方法名等字段,并在方法名name字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入一条记录,成功插入则获取锁,删除对应的行就是锁释放。
//锁记录表
CREATE TABLE `lock_info` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`name` varchar(64) NOT NULL COMMENT '方法名',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_name` (`method_name`)
) ENGINE=InnoD
缺点:
二、基于Etcd
Etcd是一个Go语言实现的非常可靠的kv存储系统,常在分布式系统中存储着关键的数据,通常应用在配置中心、服务发现与注册、分布式锁等场景。
为什么这些特性就可以让Etcd实现分布式锁呢?因为Etcd这些特性可以满足实现分布式锁的以下要求:

func main() {
config := clientv3.Config{
Endpoints: []string{"xxx.xxx.xxx.xxx:2379"},
DialTimeout: 5 * time.Second,
}
// 获取客户端连接
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
// 1. 上锁(创建租约,自动续租,拿着租约去抢占一个key )
// 用于申请租约
lease := clientv3.NewLease(client)
// 申请一个10s的租约
leaseGrantResp, err := lease.Grant(context.TODO(), 10) //10s
if err != nil {
fmt.Println(err)
return
}
// 拿到租约的id
leaseID := leaseGrantResp.ID
// 准备一个用于取消续租的context
ctx, cancelFunc := context.WithCancel(context.TODO())
// 确保函数退出后,自动续租会停止
defer cancelFunc()
// 确保函数退出后,租约会失效
defer lease.Revoke(context.TODO(), leaseID)
// 自动续租
keepRespChan, err := lease.KeepAlive(ctx, leaseID)
if err != nil {
fmt.Println(err)
return
}
// 处理续租应答的协程
go func() {
select {
case keepResp := <-keepRespChan:
if keepRespChan == nil {
fmt.Println("lease has expired")
goto END
} else {
// 每秒会续租一次
fmt.Println("收到自动续租应答", keepResp.ID)
}
}
END:
}()
// if key 不存在,then设置它,else抢锁失败
kv := clientv3.NewKV(client)
// 创建事务
txn := kv.Txn(context.TODO())
// 如果key不存在
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job7"), "=", 0)).
Then(clientv3.OpPut("/cron/jobs/job7", "", clientv3.WithLease(leaseID))).
Else(clientv3.OpGet("/cron/jobs/job7")) //如果key存在
// 提交事务
txnResp, err := txn.Commit()
if err != nil {
fmt.Println(err)
return
}
// 判断是否抢到了锁
if !txnResp.Succeeded {
fmt.Println("锁被占用了:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2. 处理业务(锁内,很安全)
fmt.Println("处理任务")
time.Sleep(5 * time.Second)
// 3. 释放锁(取消自动续租,释放租约)
// defer会取消续租,释放锁
}
clientv3提供的concurrency包也实现了分布式锁,我们可以更便捷的实现分布式锁,不过内部实现逻辑差不多:
三、基于zookeeper
ZooKeeper 的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫做 Znode
加锁/释放锁的过程是这样的

ZooKeeper不需要考虑过期时间,而是用【临时节点】,Client拿到锁之后,只要连接不断,就会一直持有锁。即使Client崩溃,相应临时节点Znode也会自动删除,保证了锁释放。
Zookeeper 是怎么检测这个客户端是否崩溃的呢?
每个客户端都与 ZooKeeper 维护着一个 Session,这个 Session 依赖定期的心跳(heartbeat)来维持。
如果 Zookeeper 长时间收不到客户端的心跳,就认为这个 Session 过期了,也会把这个临时节点删除。
当然这也并不是完美的解决方案
以下场景中Client1和Client2在窗口时间内可能同时获得锁:
优点:
不需要考虑锁的过期时间,使用起来比较方便
watch 机制,加锁失败,可以 watch 等待锁释放,实现乐观锁
缺点:
性能不如 Redis
部署和运维成本高
客户端与 Zookeeper 的长时间失联,锁被释放问题
四、基于Redis
Redis 可以通过 setnx(set if not exists)命令实现分布式锁
通过执行结果是否为 1 可以判断是否成功获取到锁。设置成功就返回1,否则返回0

如何加锁?

当把key为lock的值设置为"Java"后,再设置成别的值就会失败,看上去很简单,也好像独占了锁,但有个致命的问题,就是key没有过期时间,这样一来,除非手动删除key或者获取锁后设置过期时间,不然其他线程永远拿不到锁。
给key加个过期时间,让线程获取锁的时候执行两步操作
SETNX Key 1
EXPIRE Key Seconds
这个方案也有问题,因为获取锁和设置过期时间分成两步了,不是原子性操作,有可能获取锁成功但设置时间失败,那样不就白干了吗。
Redis官方考虑到,用法SETEX key seconds value【将值 value 关联到 key ,并将 key 的生存时间设为 seconds (以秒为单位)。如果 key 已经存在,SETEX 命令将覆写旧值】

PSETEX ,用法PSETEX key milliseconds value
这个命令和SETEX命令相似,但它以毫秒为单位设置 key 的生存时间,而不是像SETEX命令那样,以秒为单位。
从Redis 2.6.12 版本开始,SET命令可以通过参数来实现和SETNX、SETEX、PSETEX 三个命令相同的效果‘
SET key value NX EX seconds
加上NX、EX参数后,效果就相当于SETEX,这也是Redis获取锁写法里面最常见的。
如何释放锁?
释放锁的命令就简单了,直接删除key就行,但我们前面说了,因为分布式锁必须由锁的持有者自己释放,所以我们必须先确保当前释放锁的线程是持有者,没问题了再删除,这样一来,就变成两个步骤了,似乎又违背了原子性了,怎么办呢?
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
KEYS[1]是当前key的名称,ARGV[1]可以是当前线程的ID(或者其他不固定的值,能识别所属线程即可),这样就可以防止持有过期锁的线程,或者其他线程误删现有锁的情况出现。

代码实现
public class RedisLockUtil{
private String LOCK_KEY = "reids_lock";
//key的持有时间5ms
private long EXPIRE_TIME = 5;
//等待超时时间,1s
private long TIME_OUT = 1000;
//redis命令参数,相当于nx和px的命令合集
private SetParams params = SetParams.setParams().nx().px(EXPIRE_TIME);
//连接本地redis客户端
JedisPool jedisPool = new JedisPool("127.0.0.1", 6379);
/**
加锁
*/
public boolean lock(String id){
long start = System.currentTimeMillis();
Jedis jedis = jedisPool.getResource();
try{
for(;;){
//set命令返回OK,证明获取锁成功
String lock = jedis.set(LOCK_KEY,id,params);
if("ok".equals(lock)){
return true;
}
//否则循环等待,在TIME_OUT时间内仍未获取锁,则获取失败
long l = System.currentTimeMillis() - start;
if(l >= TIME_OUT){
return false;
}
try{
//休眠一会,不然反复执行循环会一直失败
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}finally{
jedis.close();
}
}
/**
解锁
*/
public boolean unlock(String id){
Jedis jedis = jedisPool.getResource();
//删除key的lua脚本
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then" + " return redis.call('del',KEYS[1]) " + "else"
+ " return 0 " + "end";
try {
String result =
jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(id)).toString();
return "1".equals(result);
} finally {
jedis.close();
}
}
}
测试
public class RedisLockTest {
private static RedisLockUtil demo = new RedisLockUtil();
private static Integer NUM = 101;
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
new Thread(() -> {
String id = Thread.currentThread().getId() + "";
boolean isLock = demo.lock(id);
try {
// 拿到锁的话,就对共享参数减一
if (isLock) {
NUM--;
System.out.println(NUM);
}
} finally {
// 释放锁一定要注意放在finally
demo.unlock(id);
}
}).start();
}
}
}
我们创建100个线程来模拟并发的情况,执行后的结果是这样的:

分布式锁存在的问题:
解决死锁问题:
MySQL 中解决死锁问题是通过设置超时时间,Redis 也是如此
官方在 Redis 2.6.12 版本之后,新增了一个功能,我们可以使用一条命令既执行加锁操作,又设置超时时间:setnx 和 expire
第一条命令成功加锁,并设置 30 s 过期时间
第二条命令跟在第一条命令后,还没有超过 30s,所以获取失败

解决锁误删问题:
通过添加锁标识来解决,前面我们使用 set 命令的时候,只使用到了 key,那么可以给 value 设置一个标识,表示当前锁归属于那个线程,例如 value=thread1,value=thread2…
但是这样解决依然存在问题,因为新增锁标识之后,线程在释放锁的时候,需要执行两步操作了:
解决方案:
使用 lua 脚本来解决 (Redis 本身就能保证 lua 脚本里面所有命令都是原子性操作)
使用 Redisson 框架来解决(主流)

一、添加依赖
<dependency>
<groupId>org.redissongroupId>
<artifactId>redisson-spring-boot-starterartifactId>
<version>3.23.2version>
dependency>
二、创建RedissonClient对象
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// 如果有密码需要设置密码
return Redisson.create(config);
}
}
三、调用分布式锁
@RestController
public class LockController{
@Resource
private RedissonClient redissonClient;
@RequetMapping("/lock")
public String lockResource() throws InterruptedException{
String lockKey = "myLock";
//获取锁
RLock lock = redissonClient.getLock(lockKey);
try{
boolean isLocked = lock.tryLock(20,TimeUnit.SECONDS);
if(isLocked){
try{
TimeUnit.SECONDS.sleep(5);
return "成功获取到锁,并执行业务代码";
}catch(InterruptedException e){
e.printStackTrace();
}finally{
//释放锁
lock.unLock();
}
}else{
//获取锁失败
return "获取锁失败";
}
}catch(InterruptedException e){
e.printStackTrace();
}
return "获取锁成功";
}
}
启动项目,使用 8080 端口访问接口:

//加锁
public Boolean tryLock(String key,String value,long timeout,TimeUnit unit){
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}
//解锁,防止删除别人的锁,以uuid为value校验是否自己的锁
public void unlock(String lockName,String uuid){
if(uuid.equals(redisTemplate.opsForValue().get(lockName))){
redisTemplate.opsForValue().del(lockName);
}
}
// 结构
if(tryLock){
// todo
}finally{
unlock;
}
get和del操作非原子性,并发一旦大了,无法保证进程安全。
建议用Lua脚本
Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过redis的eval /evalsha 命令来运行,把操作封装成一个Lua脚本,如论如何都是一次执行的原子操作。
lockDel.lua
if redis.call('get', KEYS[1]) == ARGV[1]
then
-- 执行删除操作
return redis.call('del', KEYS[1])
else
-- 不成功,返回0
return 0
end
//解锁脚本
DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathReource("lockDDel.lua")));
//执行lua脚本解锁
redisTemplate.execute(unlockScript,Collections.singletonList(keyName),value);
加锁lock.lua
local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];
-- localname不存在
if(redis.call('exists',key) == 0) then
redis.call('hset',key,threadId,'1');
return 1;
end;
-- 当前线程id存在
if(redis.call('hexists',key,thread) == 1) then
redis.call('hincrby',key,threadId,'1');
redis.call('expire',key,releaseTime);
return 1;
end;
return 0;
解锁unlock.lua
local key = KEYS[1];
local threadId = ARGV[1];
-- lockname、threadId不存在
if(redis.call('hexists',key,threadId) == 0) then
return nil;
end;
-- 计数器-1
local count = redis.call('hincrby',key,threadId,-1);;
-- 删除lock
if(count == 0) then
redis.call('del',key);
return nil;
end;
代码进行解释.lua文件
@Getter
@Setter
public class RedisLock{
private RedisTemplate redisTemplate;
private DefaultRedisScript<Long> lockScript;
private DefaultRedisScript<Object> unlockScript;
public RedisLock(RedisTemplate redisTemplate){
this.redisTemplate = redisTemplate;
//加载加锁脚本
lockScript = new DefaultRedisScript<>();
this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua")));
this.lockScript.setResultType(Long.class);
//加载释放锁的脚本
unlockScript = new DefaultRedisScript<>();
this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
}
/**
获取锁
*/
public String tryLock(String lockName,long releaseTime){
//存入线程信息的前缀
String key = UUID.randomUUID().toString();
//执行脚本
Long result = (Long) redisTemplate.execute(lockScript,
Collections.singletonList(lockName),
key+Thread.currentThread().getId(),
releaseTime
);
if(result != null && result.intValue() == 1){
return key;
}else{
return null;
}
}
/**
解锁
*/
public void unlock(String lockName,String key){
redisTemplate.execute(unlockScript,Collections.singletonList(lockName),key+Thread.currentThread().getId());
}
}
至此已经完成了一把分布式锁,符合互斥、可重入、防死锁的基本特点。
比如A进程在获取到锁的时候,因业务操作时间太长,锁释放了但是业务还在执行,而此刻B进程又可以正常拿到锁做业务操作,两个进程操作就会存在依旧有共享资源的问题
而且如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态 。
<dependency>
<groupId>org.redissongroupId>
<artifactId>redissonartifactId>
<version>3.13.6version>
dependency>
<dependency>
<groupId>org.redissongroupId>
<artifactId>redisson-spring-boot-starterartifactId>
<version>3.13.6version>
dependency>
配置类
@Confituration
public class RedissionConfig{
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.password}")
private String password;
private int port = 6379;
@Bean
public RedissonCient getRedisson(){
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + redisHost + ":" + port)
.setPassword(password);
config.setCodec(new JsonJacksonCodec());
return Redisson.create(config);
}
}
启用分布式锁
@Resource
private RedissonClient redissonClient;
RLock rLock = redissonClient.getLock(lockName);
try{
boolean isLocked = rLock.tryLock(expireTime,TimeUnit.MILLISECONDS);
if(isLocked){
}
}catch(Exception e){
rLock.unlock();
}
RLock是Redisson分布式锁的最核心接口,继承了concurrent包的Lock接口和自己的RLockAsync接口
RLockAsync的返回值都是RFuture,是Redisson执行异步实现的核心逻辑,也是Netty发挥的主要阵地。
/**
RLock如何加锁?
从RLock进入,找到RedissonLock类,找到tryLock方法再递进到干事的tryAcquireOnceAsync方法
*/
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime,long leaseTime,TimeUnit unit,long threadId){
if(leaseTime != -1L){
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}else{
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog 的锁续约 (下文),一个注册了加锁事件的续约任务。
有过期时间tryLockInnerAsync 部分,evalWriteAsync是eval命令执行lua的入口
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
eval命令执行Lua脚本
-- 不存在该key时
if (redis.call('exists', KEYS[1]) == 0) then
-- 新增该锁并且hash中该线程id对应的count置1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 设置过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 线程重入次数++
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);

一、非原子操作(setnx + expire)
先用setnx来抢锁,如果抢到之后,再用expire给锁设置一个过期时间
if(jedis.setnx(lock_key,lock_value) == 1){ //加锁
jedis.expire(lock_key,timeout);//设置过期事件
doBusiness //业务处理
}
因为setnx和expire两个命令是分开写的,并不是原子操作!如果刚要执行完setnx加锁,正要执行expire设置过期时间时,进程crash或者要重启维护了,那么这个锁就“长生不老”了,别的线程永远获取不到锁啦。
二、被别的客户端请求覆盖(setnx + value 为过期时间)
为了解决:发生异常时,锁得不到释放的问题。
可以把过期时间放到setnx的value里面。如果加锁失败,再拿出value值和当前系统时间校验一下是否过期即可。

long expireTime = System.currentTimeMillis() + timeout;//系统时间+设置超时时间
String expireTimeStr = String.valueOf(expireTime);//转为String类型
//如果当前锁不存在,返回加锁成功
if(jedis.setnx(lock_key,expireTimeStr) == 1){
return true;
}
//如果锁存在,获取锁的过期时间
String oldExpireTimreStr = jedis.get(lock_key);
//如果获取到的老的预期过期时间,小于系统当前时间,表示已经过期了
if(oldExpireTimreStr != null && Long.parseLong(oldExpireTimreStr) < System.currentTimeMillis()){
//锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
String oldValueStr = jedis.getSet(lock_key,expireTimeStr);
if(oldValueStr != null && oldValueStr.equals(oldExpireTimReStr)){
//考虑多线程并发,只有一个线程的设置值和当前值相同,才可以加锁
return true;
}
}
//其他情况均返回加锁失败
return false;
这种实现的方案,也是有坑的:如果锁过期的时候,并发多个客户端同时请求过来,都执行jedis.getSet(),最终只能有一个客户端加锁成功,但是该客户端锁的过期时间,可能被别的客户端覆盖。
三、忘记设置过期时间
try{
if(jedis.setnx(lock_key,lock_value) == 1){//加锁
doBusiness //业务逻辑处理
return true;//加锁成功,处理完业务逻辑返回
}
return false;//加锁失败
}finally{
unlock(lockKey);//释放锁
}
忘记设置过期时间了。如果程序在运行期间,机器突然挂了,代码层面没有走到finally代码块,即在宕机前,锁并没有被删除掉,这样的话,就没办法保证解锁,所以这里需要给lockKey加一个过期时间。注意哈,使用分布式锁,一定要设置过期时间哈。
四、业务处理完,忘记释放锁
set 指令扩展参数 :SET key value[EX seconds][PX milliseconds][NX][XX]
- NX :表示key不存在的时候,才能set成功,也即保证只有第一个客户端请求才能获得锁,
而其他客户端请求只能等其释放锁,才能获取。
- EX seconds :设定key的过期时间,时间单位是秒
- PX milliseconds: 设定key的过期时间,单位为毫秒
- XX: 仅当key存在时设置值
if(jedis.set(lockKey,requestId,"NX","PX",expireTime) == 1){ //加锁
doBusiness //业务逻辑处理
return true;//加锁成功,处理完业务逻辑返回
}
return false;//加载失败
因为忘记释放锁了!如果每次加锁成功,都要等到超时时间才释放锁,是会有问题的。这样程序不高效,应当每次处理完业务逻辑,都要释放锁。
正例如下:
try{
if(jedis.set(lockKey,requestId,"NX","PX",expireTime) == 1){//加锁
doBusiness
return true;//加锁成功,处理完业务逻辑返回
}
return false;//加锁失败
}finally{
unlock(lockKey);//释放锁
}
五、B的锁被A给释放了
try{
if(jedis.set(lockKey, requestId, "NX", "PX",expireTime)==1){//加锁
doBusiness //业务逻辑处理
return true; //加锁成功,处理完业务逻辑返回
}
return false; //加锁失败
} finally {
unlock(lockKey); //释放锁
}
假设在这样的并发场景下:A、B两个线程来尝试给Redis的keylockKey加锁,A线程先拿到锁(假如锁超时时间是3秒后过期)。如果线程A执行的业务逻辑很耗时,超过了3秒还是没有执行完。这时候,Redis会自动释放lockKey锁。刚好这时,线程B过来了,它就能抢到锁了,开始执行它的业务逻辑,恰好这时,线程A执行完逻辑,去释放锁的时候,它就把B的锁给释放掉了。
正确的方式应该是,在用set扩展参数加锁时,放多一个这个线程请求的唯一标记,比如requestId,然后释放锁的时候,判断一下是不是刚刚的请求。
try{
if(jedis.set(lockKey, requestId, "NX", "PX",expireTime)==1){//加锁
doBusiness //业务逻辑处理
return true; //加锁成功,处理完业务逻辑返回
}
return false; //加锁失败
} finally {
if (requestId.equals(jedis.get(lockKey))) { //判断一下是不是自己的requestId
unlock(lockKey);//释放锁
}
}
六、释放锁时,不是原子性
if (requestId.equals(jedis.get(lockKey))) { //判断一下是不是自己的requestId
unlock(lockKey);//释放锁
}
因为判断是不是当前线程加的锁和释放锁不是一个原子操作。如果调用unlock(lockKey)释放锁的时候,锁已经过期,所以这把锁已经可能已经不属于当前客户端,会解除他人加的锁。
因此,这个坑就是:判断和删除是两个操作,不是原子的,有一致性问题。释放锁必须保证原子性,可以使用Redis+Lua脚本来完成,类似Lua脚本如下:
if rdis.call('get',KEYS[1]) ==ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end;
七、锁过期释放,业务没执行完
加锁后,如果超时了,Redis会自动释放清除锁,这样有可能业务还没处理完,锁就提前释放了。怎么办呢?
有些小伙伴认为,稍微把锁过期时间设置长一些就可以啦。其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放。
当前开源框架Redisson解决了这个问题。我们一起来看下Redisson底层原理图吧:

只要线程一加锁成功,就会启动一个watch dog看门狗,它是一个后台线程,会每隔10秒检查一下,如果线程一还持有锁,那么就会不断的延长锁key的生存时间。因此,Redisson就是使用Redisson解决了锁过期释放,业务没执行完问题。
八、Redis分布式锁和@transactional一起使用失效
@Transactional
public void updateDB(int lockKey) {
boolean lockFlag = redisLock.lock(lockKey);
if (!lockFlag) {
throw new RuntimeException(“请稍后再试”);
}
doBusiness //业务逻辑处理
redisLock.unlock(lockKey);
}
在事务中,使用了Redis分布式锁.这个方法一旦执行,事务生效,接着就Redis分布式锁生效,代码执行完后,先释放Redis分布式锁,然后再提交事务数据,最后事务结束。在这个过程中,事务没有提交之前,分布式锁已经被释放,导致分布式锁失效spring的Aop,会在updateDB方法之前开启事务,之后再加锁,当锁住的代码执行完成后,再提交事务,因此锁住的代码块执行是在事务之内执行的,可以推断在代码块执行完时,事务还未提交,锁已经被释放,此时其他线程拿到锁之后进行锁住的代码块,读取的库存数据不是最新的。
正确的实现方法,可以在updateDB方法之前就上锁,即还没有开事务之前就加锁,那么就可以保证线程的安全性.
九、锁可重入
【所谓的不可重入,就是当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,会阻塞,不可以再次获得锁。同一个人拿一个锁 ,只能拿一次不能同时拿2次。】
不可重入的分布式锁的话,是可以满足绝大多数的业务场景。但是有时候一些业务场景,我们还是需要可重入的分布式锁,大家实现分布式锁的过程中,需要注意一下,你当前的业务场景是否需要可重入的分布式锁。
Redis只要解决这两个问题,就能实现重入锁了:
实现一个可重入的分布式锁,我们可以参考JDK的ReentrantLock的设计思想。实际上,可以直接使用Redisson框架,它是支持可重入锁的。
十、Redis主从复制导致的坑
实现Redis分布式锁的话,要注意Redis主从复制的坑。因为Redis一般都是集群部署的:

如果线程一在Redis的master节点上拿到了锁,但是加锁的key还没同步到slave节点。恰好这时,master节点发生故障,一个slave节点就会升级为master节点。线程二就可以获取同个key的锁啦,但线程一也已经拿到锁了,锁的安全性就没了。
为了解决这个问题,Redis作者 antirez提出一种高级的分布式锁算法:Redlock。Redlock核心思想是这样的:
【搞多个Redis master部署,以保证它们不会同时宕掉。并且这些master节点是完全相互独立的,相互之间不存在数据同步。同时,需要确保在这多个master实例上,是与在Redis单实例,使用相同方法来获取和释放锁。】
我们假设当前有5个Redis master节点,在5台服务器上面运行这些Redis实例。

RedLock的实现步骤如下:
简化下步骤就是: