• 分布式锁工具Redisson(Lua脚本)


    分布式锁

    • 数据库:通过创建一条唯一记录来表示一个锁,唯一记录添加成功,锁就创建成功,释放锁的话需要删除记录,但是很容易出现性能瓶颈,因此基本上不会使用数据库作为分布式锁。
    • Redis提供了高效的获取锁和释放锁的操作,而且结合Lua脚本,Redission等,有比较好的异常情况处理方式,因为是基于内存的,读写效率也是非常高。
    • 利用租约(Lease),Watch,Revision机制,提供了一种简单实现的分布式锁方式,集群模式让Etcd能处理大量读写,性能出色,但是配置复杂,一致性问题也存在。
    • 利用ZooKeeper提供的节点同步功能来实现分布式锁,而且不用设置过期时间,可以自动的处理异常情况下的锁释放。

    在这里插入图片描述

    一、基于MySQL

    数据库中创建一个表,表中包含方法名等字段,并在方法名name字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入一条记录,成功插入则获取锁,删除对应的行就是锁释放。

    //锁记录表
    CREATE TABLE `lock_info` (
      `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
      `name` varchar(64) NOT NULL COMMENT '方法名',
      PRIMARY KEY (`id`),
      UNIQUE KEY `idx_name` (`method_name`) 
    ) ENGINE=InnoD
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    缺点:

    • 数据库是单点,非常依赖数据库的可用性
    • 需要额外自己维护TTL
    • 在高并发常见下数据库读写是非常缓慢

    二、基于Etcd

    Etcd是一个Go语言实现的非常可靠的kv存储系统,常在分布式系统中存储着关键的数据,通常应用在配置中心、服务发现与注册、分布式锁等场景。

    • Lease机制:即租约机制(TTL,Time To Live),etcd可以为存储的kv对设置租约,当租约到期,kv将失效删除;同时也支持续约,keepalive
    • Revision机制:每个key带有一个Revision属性值,etcd每进行一次事务对应的全局Revision值都会+1,因此每个key对应的Revision属性值都是全局唯一的。通过比较Revision的大小就可以知道进行写操作的顺序
    • 在实现分布式锁时,多个程序同时抢锁,根据Revision值大小依次获得锁,避免“惊群效应”,实现公平锁
    • Prefix机制:也称为目录机制,可以根据前缀获得该目录下所有的key及其对应的属性值
    • Watch机制:watch支持watch某个固定的key或者一个前缀目录,当watch的key发生变化,客户端将收到通知

    为什么这些特性就可以让Etcd实现分布式锁呢?因为Etcd这些特性可以满足实现分布式锁的以下要求:

    • 租约机制(Lease):用于支撑异常情况下的锁自动释放能力
    • 前缀和 Revision 机制:用于支撑公平获取锁和排队等待的能力
    • 监听机制(Watch):用于支撑抢锁能力
    • 集群模式:用于支撑锁服务的高可用
      在这里插入图片描述
    func main() {
        config := clientv3.Config{
            Endpoints:   []string{"xxx.xxx.xxx.xxx:2379"},
            DialTimeout: 5 * time.Second,
        }
     
        // 获取客户端连接
        client, err := clientv3.New(config)
        if err != nil {
            fmt.Println(err)
            return
        }
     
        // 1. 上锁(创建租约,自动续租,拿着租约去抢占一个key )
        // 用于申请租约
        lease := clientv3.NewLease(client)
     
        // 申请一个10s的租约
        leaseGrantResp, err := lease.Grant(context.TODO(), 10) //10s
        if err != nil {
            fmt.Println(err)
            return
        }
     
        // 拿到租约的id
        leaseID := leaseGrantResp.ID
     
        // 准备一个用于取消续租的context
        ctx, cancelFunc := context.WithCancel(context.TODO())
     
        // 确保函数退出后,自动续租会停止
        defer cancelFunc()
            // 确保函数退出后,租约会失效
        defer lease.Revoke(context.TODO(), leaseID)
     
        // 自动续租
        keepRespChan, err := lease.KeepAlive(ctx, leaseID)
        if err != nil {
            fmt.Println(err)
            return
        }
     
        // 处理续租应答的协程
        go func() {
            select {
            case keepResp := <-keepRespChan:
                if keepRespChan == nil {
                    fmt.Println("lease has expired")
                    goto END
                } else {
                    // 每秒会续租一次
                    fmt.Println("收到自动续租应答", keepResp.ID)
                }
            }
        END:
        }()
     
        // if key 不存在,then设置它,else抢锁失败
        kv := clientv3.NewKV(client)
        // 创建事务
        txn := kv.Txn(context.TODO())
        // 如果key不存在
        txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job7"), "=", 0)).
            Then(clientv3.OpPut("/cron/jobs/job7", "", clientv3.WithLease(leaseID))).
            Else(clientv3.OpGet("/cron/jobs/job7")) //如果key存在
     
        // 提交事务
        txnResp, err := txn.Commit()
        if err != nil {
            fmt.Println(err)
            return
        }
     
        // 判断是否抢到了锁
        if !txnResp.Succeeded {
            fmt.Println("锁被占用了:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
            return
        }
     
        // 2. 处理业务(锁内,很安全)
     
        fmt.Println("处理任务")
        time.Sleep(5 * time.Second)
     
        // 3. 释放锁(取消自动续租,释放租约)
        // defer会取消续租,释放锁
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    clientv3提供的concurrency包也实现了分布式锁,我们可以更便捷的实现分布式锁,不过内部实现逻辑差不多:

    • 首先concurrency.NewSession方法创建Session对象
    • 然后Session对象通过concurrency.NewMutex 创建了一个Mutex对象
    • 加锁和释放锁分别调用Lock和UnLock

    三、基于zookeeper

    ZooKeeper 的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫做 Znode
    加锁/释放锁的过程是这样的
    在这里插入图片描述

    1. Client尝试创建一个 znode 节点,比如/lock,比如Client1先到达就创建成功了,相当于拿到了锁
    2. 其它的客户端会创建失败(znode 已存在),获取锁失败。
    3. Client2可以进入一种等待状态,等待当/lock 节点被删除的时候,ZooKeeper 通过 watch 机制通知它
    4. 持有锁的Client1访问共享资源完成后,将 znode 删掉,锁释放掉了
    5. Client2继续完成获取锁操作,直到获取到锁为止

    ZooKeeper不需要考虑过期时间,而是用【临时节点】,Client拿到锁之后,只要连接不断,就会一直持有锁。即使Client崩溃,相应临时节点Znode也会自动删除,保证了锁释放。

    Zookeeper 是怎么检测这个客户端是否崩溃的呢?

    每个客户端都与 ZooKeeper 维护着一个 Session,这个 Session 依赖定期的心跳(heartbeat)来维持。
    如果 Zookeeper 长时间收不到客户端的心跳,就认为这个 Session 过期了,也会把这个临时节点删除。
    当然这也并不是完美的解决方案

    以下场景中Client1和Client2在窗口时间内可能同时获得锁:

    1. Client 1 创建了 znode 节点/lock,获得了锁。
    2. Client 1 进入了长时间的 GC pause。(或者网络出现问题、或者 zk 服务检测心跳线程出现问题等等)
    3. Client 1 连接到 ZooKeeper 的 Session 过期了。znode 节点/lock 被自动删除。
    4. Client 2 创建了 znode 节点/lock,从而获得了锁。
    5. Client 1 从 GC pause 中恢复过来,它仍然认为自己持有锁。

    优点:

    1. 不需要考虑锁的过期时间,使用起来比较方便

    2. watch 机制,加锁失败,可以 watch 等待锁释放,实现乐观锁

    缺点:

    1. 性能不如 Redis

    2. 部署和运维成本高

    3. 客户端与 Zookeeper 的长时间失联,锁被释放问题

    四、基于Redis

    Redis 可以通过 setnx(set if not exists)命令实现分布式锁
    通过执行结果是否为 1 可以判断是否成功获取到锁。设置成功就返回1,否则返回0

    • setnx mylock true 加锁
    • del mylock 释放锁
      在这里插入图片描述
      虽然setnx是原子性的,但是setnx + expire就不是了,也就是说setnx和expire是分两步执行的
      【加锁和超时】两个操作是分开的,如果expire执行失败了,那么锁同样得不到释放。

    如何加锁?
    在这里插入图片描述
    当把key为lock的值设置为"Java"后,再设置成别的值就会失败,看上去很简单,也好像独占了锁,但有个致命的问题,就是key没有过期时间,这样一来,除非手动删除key或者获取锁后设置过期时间,不然其他线程永远拿不到锁。

    给key加个过期时间,让线程获取锁的时候执行两步操作

    SETNX Key 1
    EXPIRE Key Seconds
    
    • 1
    • 2

    这个方案也有问题,因为获取锁和设置过期时间分成两步了,不是原子性操作,有可能获取锁成功但设置时间失败,那样不就白干了吗。

    Redis官方考虑到,用法SETEX key seconds value【将值 value 关联到 key ,并将 key 的生存时间设为 seconds (以秒为单位)。如果 key 已经存在,SETEX 命令将覆写旧值】
    在这里插入图片描述

    PSETEX ,用法PSETEX key milliseconds value

    这个命令和SETEX命令相似,但它以毫秒为单位设置 key 的生存时间,而不是像SETEX命令那样,以秒为单位。
    从Redis 2.6.12 版本开始,SET命令可以通过参数来实现和SETNX、SETEX、PSETEX 三个命令相同的效果‘

    SET key value NX EX seconds
    
    • 1

    加上NX、EX参数后,效果就相当于SETEX,这也是Redis获取锁写法里面最常见的。

    如何释放锁?

    释放锁的命令就简单了,直接删除key就行,但我们前面说了,因为分布式锁必须由锁的持有者自己释放,所以我们必须先确保当前释放锁的线程是持有者,没问题了再删除,这样一来,就变成两个步骤了,似乎又违背了原子性了,怎么办呢?

    if redis.call("get",KEYS[1]) == ARGV[1]
    then
        return redis.call("del",KEYS[1])
    else
        return 0
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    KEYS[1]是当前key的名称,ARGV[1]可以是当前线程的ID(或者其他不固定的值,能识别所属线程即可),这样就可以防止持有过期锁的线程,或者其他线程误删现有锁的情况出现。
    在这里插入图片描述
    代码实现

    public class RedisLockUtil{
    	
    	private String LOCK_KEY = "reids_lock";
    
    	//key的持有时间5ms
    	private long EXPIRE_TIME = 5;
    
    	//等待超时时间,1s
    	private long TIME_OUT = 1000;
    
    	//redis命令参数,相当于nx和px的命令合集
    	private SetParams params = SetParams.setParams().nx().px(EXPIRE_TIME);
    
    	//连接本地redis客户端
    	JedisPool jedisPool = new JedisPool("127.0.0.1", 6379);
    
    	/**
    		加锁
    	*/
    	public boolean lock(String id){
    	
    		long start = System.currentTimeMillis();
    		Jedis jedis = jedisPool.getResource();
    		try{
    			for(;;){
    			//set命令返回OK,证明获取锁成功
    			String lock = jedis.set(LOCK_KEY,id,params);
    			if("ok".equals(lock)){
    				return true;
    			}
    			//否则循环等待,在TIME_OUT时间内仍未获取锁,则获取失败
    			long l = System.currentTimeMillis() - start;
    			if(l >= TIME_OUT){
    				return false;
    			}
    			try{
    				//休眠一会,不然反复执行循环会一直失败
    				Thread.sleep(100);
    			}catch(InterruptedException e){
    				e.printStackTrace();
    			}
    		}
    		}finally{
    			jedis.close();
    		}
    	}
    
    	/**
    		解锁
    	*/
    	public boolean unlock(String id){
    		
    		Jedis jedis = jedisPool.getResource();
    		//删除key的lua脚本
    		String script = "if redis.call('get',KEYS[1]) == ARGV[1] then" + "   return redis.call('del',KEYS[1]) " + "else"
                + "   return 0 " + "end";
            try {
                String result =
                    jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(id)).toString();
                return "1".equals(result);
            } finally {
                jedis.close();
            }
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    测试

    public class RedisLockTest {
        private static RedisLockUtil demo = new RedisLockUtil();
        private static Integer NUM = 101;
    
        public static void main(String[] args) {
            for (int i = 0; i < 100; i++) {
                new Thread(() -> {
                    String id = Thread.currentThread().getId() + "";
                    boolean isLock = demo.lock(id);
                    try {
                     // 拿到锁的话,就对共享参数减一
                        if (isLock) {
                            NUM--;
                            System.out.println(NUM);
                        }
                    } finally {
                     // 释放锁一定要注意放在finally
                        demo.unlock(id);
                    }
                }).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    我们创建100个线程来模拟并发的情况,执行后的结果是这样的:
    在这里插入图片描述

    分布式锁存在的问题:

    1. 死锁问题,未设置过期时间,锁忘记释放,加锁后还没来得及释放锁就宕机了,都会导致死锁问题
    2. 锁误删问题,设置了超时时间,但是线程执行超时时间后误删问题

    解决死锁问题:
    MySQL 中解决死锁问题是通过设置超时时间,Redis 也是如此
    官方在 Redis 2.6.12 版本之后,新增了一个功能,我们可以使用一条命令既执行加锁操作,又设置超时时间:setnx 和 expire

    第一条命令成功加锁,并设置 30 s 过期时间
    第二条命令跟在第一条命令后,还没有超过 30s,所以获取失败
    在这里插入图片描述
    解决锁误删问题:

    通过添加锁标识来解决,前面我们使用 set 命令的时候,只使用到了 key,那么可以给 value 设置一个标识,表示当前锁归属于那个线程,例如 value=thread1,value=thread2…
    但是这样解决依然存在问题,因为新增锁标识之后,线程在释放锁的时候,需要执行两步操作了:

    • 判断锁是否属于自己
    • 如果是,就删除锁
      这样就不能保证原子性了,那该怎么办?

    解决方案:
    使用 lua 脚本来解决 (Redis 本身就能保证 lua 脚本里面所有命令都是原子性操作)
    使用 Redisson 框架来解决(主流)

    在这里插入图片描述

    如何使用Redisson锁

    一、添加依赖

    <dependency>
        <groupId>org.redissongroupId>
        <artifactId>redisson-spring-boot-starterartifactId>
        <version>3.23.2version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    二、创建RedissonClient对象

    @Configuration
    public class RedissonConfig {
        @Bean
        public RedissonClient redissonClient() {
            Config config = new Config();
            config.useSingleServer().setAddress("redis://127.0.0.1:6379");
            // 如果有密码需要设置密码
            return Redisson.create(config);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    三、调用分布式锁

    @RestController
    public class LockController{
    	
    	@Resource
    	private RedissonClient redissonClient;
    
    	@RequetMapping("/lock")
    	public String lockResource() throws InterruptedException{
    		
    		String lockKey = "myLock";
    
    		//获取锁
    		RLock lock = redissonClient.getLock(lockKey);
    		try{
    			boolean isLocked = lock.tryLock(20,TimeUnit.SECONDS);
    			if(isLocked){
    				try{
    					TimeUnit.SECONDS.sleep(5);
    					return "成功获取到锁,并执行业务代码";
    				}catch(InterruptedException e){
    					e.printStackTrace();
    				}finally{
    					//释放锁
    					lock.unLock();
    				}
    			}else{
    				//获取锁失败
    				return "获取锁失败";
    			}
    		}catch(InterruptedException e){
    			e.printStackTrace();
    		}
    		return "获取锁成功";
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    启动项目,使用 8080 端口访问接口:
    在这里插入图片描述

    分布式锁

    //加锁
    public Boolean tryLock(String key,String value,long timeout,TimeUnit unit){
    	
    	return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
    }
    
    //解锁,防止删除别人的锁,以uuid为value校验是否自己的锁
    public void unlock(String lockName,String uuid){
    	
    	if(uuid.equals(redisTemplate.opsForValue().get(lockName))){
    		
    		redisTemplate.opsForValue().del(lockName);
    	}
    }
    
    // 结构
    if(tryLock){
        // todo
    }finally{
        unlock;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    get和del操作非原子性,并发一旦大了,无法保证进程安全。
    建议用Lua脚本
    Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过redis的eval /evalsha 命令来运行,把操作封装成一个Lua脚本,如论如何都是一次执行的原子操作。

    lockDel.lua

    if redis.call('get', KEYS[1]) == ARGV[1] 
        then 
     -- 执行删除操作
            return redis.call('del', KEYS[1]) 
        else 
     -- 不成功,返回0
            return 0 
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    //解锁脚本
    DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
    unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathReource("lockDDel.lua")));
    
    //执行lua脚本解锁
    redisTemplate.execute(unlockScript,Collections.singletonList(keyName),value);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    加锁lock.lua

    local key = KEYS[1];
    local threadId = ARGV[1];
    local releaseTime = ARGV[2];
    
    -- localname不存在
    if(redis.call('exists',key) == 0) then
    
    	redis.call('hset',key,threadId,'1');
    	return 1;
    end;
    
    -- 当前线程id存在
    if(redis.call('hexists',key,thread) == 1) then
    	redis.call('hincrby',key,threadId,'1');
    	redis.call('expire',key,releaseTime);
    	return 1;
    end;
    
    return 0;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    解锁unlock.lua

    local key = KEYS[1];
    local threadId = ARGV[1];
    
    -- lockname、threadId不存在
    if(redis.call('hexists',key,threadId) == 0) then
    	return nil;
    end;
    
    -- 计数器-1
    local count = redis.call('hincrby',key,threadId,-1);;
    
    -- 删除lock
    if(count == 0) then
    	redis.call('del',key);
    	return nil;
    end;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    代码进行解释.lua文件

    @Getter
    @Setter
    public class RedisLock{
    	
    	private RedisTemplate redisTemplate;
    	private DefaultRedisScript<Long> lockScript;
    	private DefaultRedisScript<Object> unlockScript;
    
    	public RedisLock(RedisTemplate redisTemplate){
    		
    		this.redisTemplate = redisTemplate;
    
    		//加载加锁脚本
    		lockScript = new DefaultRedisScript<>();
    		this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua")));
    		this.lockScript.setResultType(Long.class);
    
    		//加载释放锁的脚本
    		unlockScript = new DefaultRedisScript<>();
    		this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
    	}
    
    	/**
    		获取锁
    	*/
    	public String tryLock(String lockName,long releaseTime){
    		
    		//存入线程信息的前缀
    		String key = UUID.randomUUID().toString();
    	
    		//执行脚本
    		Long result = (Long) redisTemplate.execute(lockScript,
    											Collections.singletonList(lockName),
    											key+Thread.currentThread().getId(),
    											releaseTime	
    											);
    		if(result != null && result.intValue() == 1){
    			return key;
    		}else{
    			return null;
    		}
    	}
    
    	/**
    		解锁
    	*/
    	public void unlock(String lockName,String key){
    		redisTemplate.execute(unlockScript,Collections.singletonList(lockName),key+Thread.currentThread().getId());
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    至此已经完成了一把分布式锁,符合互斥、可重入、防死锁的基本特点。
    比如A进程在获取到锁的时候,因业务操作时间太长,锁释放了但是业务还在执行,而此刻B进程又可以正常拿到锁做业务操作,两个进程操作就会存在依旧有共享资源的问题
    而且如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态 。

    Redisson分布式锁

    
    <dependency>
        <groupId>org.redissongroupId>
        <artifactId>redissonartifactId>
        <version>3.13.6version>
    dependency>
    
    
    <dependency>
        <groupId>org.redissongroupId>
        <artifactId>redisson-spring-boot-starterartifactId>
        <version>3.13.6version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    配置类

    @Confituration
    public class RedissionConfig{
    	
    	@Value("${spring.redis.host}")
    	private String redisHost;
    
    	@Value("${spring.redis.password}")
    	private String password;
    
    	private int port = 6379;
    
    	@Bean
    	public RedissonCient getRedisson(){
    		Config config = new Config();
    		config.useSingleServer()
    			.setAddress("redis://" + redisHost + ":" + port)
    			.setPassword(password);
    
    		config.setCodec(new JsonJacksonCodec());
    		return Redisson.create(config);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    启用分布式锁

    @Resource
    private RedissonClient redissonClient;
    
    RLock rLock = redissonClient.getLock(lockName);
    try{
    	boolean isLocked = rLock.tryLock(expireTime,TimeUnit.MILLISECONDS);
    	if(isLocked){
    	}
    }catch(Exception e){
    	rLock.unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    RLock

    RLock是Redisson分布式锁的最核心接口,继承了concurrent包的Lock接口和自己的RLockAsync接口

    RLockAsync的返回值都是RFuture,是Redisson执行异步实现的核心逻辑,也是Netty发挥的主要阵地。

    /**
    	RLock如何加锁?
    	从RLock进入,找到RedissonLock类,找到tryLock方法再递进到干事的tryAcquireOnceAsync方法
    	
    */
    private RFuture<Boolean> tryAcquireOnceAsync(long waitTime,long leaseTime,TimeUnit unit,long threadId){
    	
    	if(leaseTime != -1L){
    		return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    	}else{
    		RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
                ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                    if (e == null) {
                        if (ttlRemaining) {
                            this.scheduleExpirationRenewal(threadId);
                        }
    
                    }
                });
                return ttlRemainingFuture;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog 的锁续约 (下文),一个注册了加锁事件的续约任务。

    有过期时间tryLockInnerAsync 部分,evalWriteAsync是eval命令执行lua的入口

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            this.internalLockLeaseTime = unit.toMillis(leaseTime);
            return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
        }
    
    • 1
    • 2
    • 3
    • 4

    eval命令执行Lua脚本

    -- 不存在该key时
    if (redis.call('exists', KEYS[1]) == 0) then 
      -- 新增该锁并且hash中该线程id对应的count置1
      redis.call('hincrby', KEYS[1], ARGV[2], 1); 
      -- 设置过期时间
      redis.call('pexpire', KEYS[1], ARGV[1]); 
      return nil; 
    end; 
    
    -- 存在该key 并且 hash中线程id的key也存在
    if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
      -- 线程重入次数++
      redis.call('hincrby', KEYS[1], ARGV[2], 1); 
      redis.call('pexpire', KEYS[1], ARGV[1]); 
      return nil; 
    end; 
    return redis.call('pttl', KEYS[1]);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    分布式锁的10个坑

    在这里插入图片描述

    一、非原子操作(setnx + expire)

    先用setnx来抢锁,如果抢到之后,再用expire给锁设置一个过期时间

    if(jedis.setnx(lock_key,lock_value) == 1){ //加锁
    
    	jedis.expire(lock_key,timeout);//设置过期事件
    	doBusiness //业务处理
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    因为setnx和expire两个命令是分开写的,并不是原子操作!如果刚要执行完setnx加锁,正要执行expire设置过期时间时,进程crash或者要重启维护了,那么这个锁就“长生不老”了,别的线程永远获取不到锁啦。

    二、被别的客户端请求覆盖(setnx + value 为过期时间)

    为了解决:发生异常时,锁得不到释放的问题。
    可以把过期时间放到setnx的value里面。如果加锁失败,再拿出value值和当前系统时间校验一下是否过期即可。
    在这里插入图片描述

    long expireTime = System.currentTimeMillis() + timeout;//系统时间+设置超时时间
    String expireTimeStr = String.valueOf(expireTime);//转为String类型
    
    //如果当前锁不存在,返回加锁成功
    if(jedis.setnx(lock_key,expireTimeStr) == 1){
    	return true;
    }
    
    //如果锁存在,获取锁的过期时间
    String oldExpireTimreStr = jedis.get(lock_key);
    
    //如果获取到的老的预期过期时间,小于系统当前时间,表示已经过期了
    if(oldExpireTimreStr != null && Long.parseLong(oldExpireTimreStr) < System.currentTimeMillis()){
    	
    	//锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
    	String oldValueStr = jedis.getSet(lock_key,expireTimeStr);
    
    	if(oldValueStr != null && oldValueStr.equals(oldExpireTimReStr)){
    		//考虑多线程并发,只有一个线程的设置值和当前值相同,才可以加锁
    		return true;
    	}
    }
    
    //其他情况均返回加锁失败
    return false;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    这种实现的方案,也是有坑的:如果锁过期的时候,并发多个客户端同时请求过来,都执行jedis.getSet(),最终只能有一个客户端加锁成功,但是该客户端锁的过期时间,可能被别的客户端覆盖。

    三、忘记设置过期时间

    try{
    	if(jedis.setnx(lock_key,lock_value) == 1){//加锁
    		doBusiness //业务逻辑处理
    		return true;//加锁成功,处理完业务逻辑返回
    	}
    	return false;//加锁失败
    }finally{
    	unlock(lockKey);//释放锁
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    忘记设置过期时间了。如果程序在运行期间,机器突然挂了,代码层面没有走到finally代码块,即在宕机前,锁并没有被删除掉,这样的话,就没办法保证解锁,所以这里需要给lockKey加一个过期时间。注意哈,使用分布式锁,一定要设置过期时间哈。

    四、业务处理完,忘记释放锁

    set 指令扩展参数 :SET key value[EX seconds][PX milliseconds][NX][XX]
    
     - NX :表示key不存在的时候,才能set成功,也即保证只有第一个客户端请求才能获得锁,
      而其他客户端请求只能等其释放锁,才能获取。
     -  EX seconds :设定key的过期时间,时间单位是秒
     - PX milliseconds: 设定key的过期时间,单位为毫秒
     - XX: 仅当key存在时设置值
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    if(jedis.set(lockKey,requestId,"NX","PX",expireTime) == 1){ //加锁
    	doBusiness //业务逻辑处理
    	return true;//加锁成功,处理完业务逻辑返回
    }
    return false;//加载失败
    
    • 1
    • 2
    • 3
    • 4
    • 5

    因为忘记释放锁了!如果每次加锁成功,都要等到超时时间才释放锁,是会有问题的。这样程序不高效,应当每次处理完业务逻辑,都要释放锁。

    正例如下:

    try{
    	if(jedis.set(lockKey,requestId,"NX","PX",expireTime) == 1){//加锁
    		doBusiness
    		return true;//加锁成功,处理完业务逻辑返回
    	}
    	return false;//加锁失败
    }finally{
    	unlock(lockKey);//释放锁
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    五、B的锁被A给释放了

    try{
      if(jedis.set(lockKey, requestId, "NX", "PX",expireTime)==1){//加锁
         doBusiness //业务逻辑处理
         return true; //加锁成功,处理完业务逻辑返回
      }
      return false; //加锁失败
    } finally {
        unlock(lockKey); //释放锁
    }  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    假设在这样的并发场景下:A、B两个线程来尝试给Redis的keylockKey加锁,A线程先拿到锁(假如锁超时时间是3秒后过期)。如果线程A执行的业务逻辑很耗时,超过了3秒还是没有执行完。这时候,Redis会自动释放lockKey锁。刚好这时,线程B过来了,它就能抢到锁了,开始执行它的业务逻辑,恰好这时,线程A执行完逻辑,去释放锁的时候,它就把B的锁给释放掉了。

    正确的方式应该是,在用set扩展参数加锁时,放多一个这个线程请求的唯一标记,比如requestId,然后释放锁的时候,判断一下是不是刚刚的请求。

    try{
      if(jedis.set(lockKey, requestId, "NX", "PX",expireTime)==1){//加锁
         doBusiness //业务逻辑处理
         return true; //加锁成功,处理完业务逻辑返回
      }
      return false; //加锁失败
    } finally {
        if (requestId.equals(jedis.get(lockKey))) { //判断一下是不是自己的requestId
          unlock(lockKey);//释放锁
        }   
    }  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    六、释放锁时,不是原子性

    if (requestId.equals(jedis.get(lockKey))) { //判断一下是不是自己的requestId
          unlock(lockKey);//释放锁
        }   
    
    • 1
    • 2
    • 3

    因为判断是不是当前线程加的锁和释放锁不是一个原子操作。如果调用unlock(lockKey)释放锁的时候,锁已经过期,所以这把锁已经可能已经不属于当前客户端,会解除他人加的锁。

    因此,这个坑就是:判断和删除是两个操作,不是原子的,有一致性问题。释放锁必须保证原子性,可以使用Redis+Lua脚本来完成,类似Lua脚本如下:

    if rdis.call('get',KEYS[1]) ==ARGV[1] then
    	return redis.call('del',KEYS[1])
    else
    	return 0
    end;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    七、锁过期释放,业务没执行完

    加锁后,如果超时了,Redis会自动释放清除锁,这样有可能业务还没处理完,锁就提前释放了。怎么办呢?

    有些小伙伴认为,稍微把锁过期时间设置长一些就可以啦。其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放。

    当前开源框架Redisson解决了这个问题。我们一起来看下Redisson底层原理图吧:
    在这里插入图片描述
    只要线程一加锁成功,就会启动一个watch dog看门狗,它是一个后台线程,会每隔10秒检查一下,如果线程一还持有锁,那么就会不断的延长锁key的生存时间。因此,Redisson就是使用Redisson解决了锁过期释放,业务没执行完问题。

    八、Redis分布式锁和@transactional一起使用失效

    @Transactional
    public void updateDB(int lockKey) {
      boolean lockFlag = redisLock.lock(lockKey);
      if (!lockFlag) {
        throw new RuntimeException(“请稍后再试”);
      }
       doBusiness //业务逻辑处理
       redisLock.unlock(lockKey);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在事务中,使用了Redis分布式锁.这个方法一旦执行,事务生效,接着就Redis分布式锁生效,代码执行完后,先释放Redis分布式锁,然后再提交事务数据,最后事务结束。在这个过程中,事务没有提交之前,分布式锁已经被释放,导致分布式锁失效spring的Aop,会在updateDB方法之前开启事务,之后再加锁,当锁住的代码执行完成后,再提交事务,因此锁住的代码块执行是在事务之内执行的,可以推断在代码块执行完时,事务还未提交,锁已经被释放,此时其他线程拿到锁之后进行锁住的代码块,读取的库存数据不是最新的。

    正确的实现方法,可以在updateDB方法之前就上锁,即还没有开事务之前就加锁,那么就可以保证线程的安全性.

    九、锁可重入

    【所谓的不可重入,就是当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,会阻塞,不可以再次获得锁。同一个人拿一个锁 ,只能拿一次不能同时拿2次。】
    不可重入的分布式锁的话,是可以满足绝大多数的业务场景。但是有时候一些业务场景,我们还是需要可重入的分布式锁,大家实现分布式锁的过程中,需要注意一下,你当前的业务场景是否需要可重入的分布式锁。

    Redis只要解决这两个问题,就能实现重入锁了:

    • 怎么保存当前持有的线程
    • 怎么维护加锁次数(即重入了多少次)

    实现一个可重入的分布式锁,我们可以参考JDK的ReentrantLock的设计思想。实际上,可以直接使用Redisson框架,它是支持可重入锁的。

    十、Redis主从复制导致的坑

    实现Redis分布式锁的话,要注意Redis主从复制的坑。因为Redis一般都是集群部署的:
    在这里插入图片描述
    如果线程一在Redis的master节点上拿到了锁,但是加锁的key还没同步到slave节点。恰好这时,master节点发生故障,一个slave节点就会升级为master节点。线程二就可以获取同个key的锁啦,但线程一也已经拿到锁了,锁的安全性就没了。

    为了解决这个问题,Redis作者 antirez提出一种高级的分布式锁算法:Redlock。Redlock核心思想是这样的:
    【搞多个Redis master部署,以保证它们不会同时宕掉。并且这些master节点是完全相互独立的,相互之间不存在数据同步。同时,需要确保在这多个master实例上,是与在Redis单实例,使用相同方法来获取和释放锁。】

    我们假设当前有5个Redis master节点,在5台服务器上面运行这些Redis实例。
    在这里插入图片描述

    RedLock的实现步骤如下:

    • 获取当前时间,以毫秒为单位。
    • 按顺序向5个master节点请求加锁。客户端设置网络连接和响应超时时间,并且超时时间要小于锁的失效时间。(假设锁自动失效时间为10秒,则超时时间一般在5-50毫秒之间,我们就假设超时时间是50ms吧)。如果超时,跳过该master节点,尽快去尝试下一个master节点。
    • 客户端使用当前时间减去开始获取锁时间(即步骤1记录的时间),得到获取锁使用的时间。当且仅当超过一半(N/2+1,这里是5/2+1=3个节点)的Redis master节点都获得锁,并且使用的时间小于锁失效时间时,锁才算获取成功。(如上图,10s> 30ms+40ms+50ms+4m0s+50ms)
    • 如果取到了锁,key的真正有效时间就变啦,需要减去获取锁所使用的时间。
    • 如果获取锁失败(没有在至少N/2+1个master实例取到锁,有或者获取锁时间已经超过了有效时间),客户端要在所有的master节点上解锁(即便有些master节点根本就没有加锁成功,也需要解锁,以防止有些漏网之鱼)。

    简化下步骤就是:

    • 按顺序向5个master节点请求加锁
    • 根据设置的超时时间来判断,是不是要跳过该master节点。
    • 如果大于等于3个节点加锁成功,并且使用的时间小于锁的有效期,即可认定加锁成功啦。
    • 如果获取锁失败,解锁!
  • 相关阅读:
    域内令牌窃取伪造
    21级数据结构与算法实验8——排序
    一阶微分形式不变性
    【MySQL】深入理解MySQL索引优化器原理(MySQL专栏启动)
    1.QML Hello world
    [附源码]Python计算机毕业设计Django校园快递柜存取件系统
    json配置文件读入redis - 包含命令行解析示例
    angular 实现模块共享
    【Mac】破解死循环,成功安装 Homebrew、curl、wget,快速配置 zsh
    Android WebView由于重定向造成的goBack()无效的问题解决
  • 原文地址:https://blog.csdn.net/usa_washington/article/details/132925932