• Redis分布式锁


    1、利用Watch实现Redis乐观锁

            乐观锁基于CAS(Compare And Swap)思想(比较并替换),是不具有互斥性,不会产生锁等待而消耗资源,但是需要反复的重试,但也是因为重试的机制,能比较快的响应。因此我们可以利用redis来实现乐观锁。具体思路如下:

    1. 利用redis的watch功能,监控这个redisKey的状态值
    2. 获取redisKey的值
    3. 创建redis事务
    4. 给这个key的值+1
    5. 然后去执行这个事务,如果key的值被修改过则回滚,key不加1

    Redis乐观锁实现秒杀

    1. public class Second {
    2. public static void main(String[] arg) {
    3. String redisKey = "lock";
    4. ExecutorService executorService = Executors.newFixedThreadPool(20);
    5. try {
    6. Jedis jedis = new Jedis("127.0.0.1", 6378);
    7. // 初始值
    8. jedis.set(redisKey, "0");
    9. jedis.close();
    10. } catch (Exception e) {
    11. e.printStackTrace();
    12. }
    13. for (int i = 0; i < 1000; i++) {
    14. executorService.execute(() -> {
    15. Jedis jedis1 = new Jedis("127.0.0.1", 6378);
    16. try {
    17. jedis1.watch(redisKey);
    18. String redisValue = jedis1.get(redisKey);
    19. int valInteger = Integer.valueOf(redisValue);
    20. String userInfo = UUID.randomUUID().toString();
    21. // 没有秒完
    22. if (valInteger < 20) {
    23. Transaction tx = jedis1.multi();
    24. tx.incr(redisKey);
    25. List list = tx.exec();
    26. // 秒成功 失败返回空list而不是空
    27. if (list != null && list.size() > 0) {
    28. System.out.println("用户:" + userInfo + ",秒杀成功!当前成功人数:" + (valInteger + 1));
    29. }
    30. // 版本变化,被别人抢了。
    31. else {
    32. System.out.println("用户:" + userInfo + ",秒杀失败");
    33. }
    34. }
    35. // 秒完了
    36. else {
    37. System.out.println("已经有20人秒杀成功,秒杀结束");
    38. }
    39. } catch (Exception e) {
    40. e.printStackTrace();
    41. } finally {
    42. jedis1.close();
    43. }
    44. });
    45. }
    46. executorService.shutdown();
    47. }
    48. }

    2、setnx实现悲观锁

    实现原理

    • 共享资源互斥
    • 共享资源串行化

    单应用中使用锁:(单进程多线程):synchronized、ReentrantLock

    分布式应用中使用锁:(多进程多线程):

    • 分布式锁是控制分布式系统之间同步访问共享资源的一种方式。
    • 利用Redis的单线程特性对共享资源进行串行化处理

    2.2、实现方式

    获取锁:

    方式1(使用set命令实现)--推荐

    1. /**
    2. * 使用redis的set命令实现获取分布式锁
    3. *
    4. * @param lockKey 可以就是锁
    5. * @param requestId 请求ID,保证同一性 uuid+threadID
    6. * @param expireTime 过期时间,避免死锁
    7. * @return
    8. */
    9. public boolean getLock(String lockKey,String requestId,int expireTime){
    10. //NX:保证互斥性
    11. // hset 原子性操作 只要lockKey有效 则说明有进程在使用分布式锁
    12. String result=jedis.set(lockKey,requestId,"NX","EX",expireTime);
    13. if("OK".equals(result)){
    14. return true;
    15. }
    16. return false;
    17. }

    方式2(使用setnx命令实现) -- 并发会产生问题

    1. public boolean getLock(String lockKey,String requestId,int expireTime){
    2. Long result=jedis.setnx(lockKey,requestId);
    3. if(result==1){
    4. //成功设置 进程down 永久有效 别的进程就无法获得锁
    5. jedis.expire(lockKey,expireTime);
    6. return true;
    7. }
    8. return false;
    9. }

    释放锁

    方式1(del命令实现) -- 并发

    1. /**
    2. * 释放分布式锁
    3. * @param lockKey
    4. * @param requestId
    5. */
    6. public static void releaseLock(String lockKey,String requestId) {
    7. if (requestId.equals(jedis.get(lockKey))) {
    8. jedis.del(lockKey);
    9. }
    10. }

            问题在于如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了。

    方式2(redis+lua脚本实现)--推荐

    1. public static boolean releaseLock(String lockKey, String requestId) {
    2. String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    3. Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
    4. if (result.equals(1L)) {
    5. return true;
    6. }
    7. return false;
    8. }

    存在问题

    • 单机:无法保证高可用
    • 主--从:无法保证数据的强一致性,在主机宕机时会造成锁的重复获得。

    • 无法续租:超过expireTime后,不能继续使用 

    本质分析:

    CAP模型分析:

    • 在分布式环境下不可能满足三者共存,只能满足其中的两者共存,在分布式下P不能舍弃(舍弃P就是单机了)。所以只能是CP(强一致性模型)和AP(高可用模型)。
    • 分布式锁是CP模型,Redis集群是AP模型。 (base)
    • Redis集群不能保证数据的随时一致性,只能保证数据的最终一致性。

    为什么还可以用Redis实现分布式锁?

    • 当业务不需要数据强一致性时,比如:社交场景,就可以使用Redis实现分布式锁
    • 当业务必须要数据的强一致性,即不允许重复获得锁,比如金融场景(重复下单,重复转账)就不要使用,可以使用CP模型实现,比如:zookeeper和etcd

    3、Redisson分布式锁的使用

    • Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。
    • Redisson在基于NIO的Netty框架上,生产环境使用分布式锁。

    3.1、加入jar包的依赖

    1. <dependency>
    2. <groupId>org.redissongroupId>
    3. <artifactId>redissonartifactId>
    4. <version>2.7.0version>
    5. dependency>

    3.2、配置Redisson

    1. public class RedissonManager {
    2. private static Config config = new Config();
    3. //声明redisso对象
    4. private static Redisson redisson = null;
    5. //实例化redisson
    6. static {
    7. config.useClusterServers()
    8. // 集群状态扫描间隔时间,单位是毫秒
    9. .setScanInterval(2000)
    10. //cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用)
    11. .addNodeAddress("redis://127.0.0.1:6379")
    12. .addNodeAddress("redis://127.0.0.1:6380")
    13. .addNodeAddress("redis://127.0.0.1:6381")
    14. .addNodeAddress("redis://127.0.0.1:6382")
    15. .addNodeAddress("redis://127.0.0.1:6383")
    16. .addNodeAddress("redis://127.0.0.1:6384");
    17. //得到redisson对象
    18. redisson = (Redisson) Redisson.create(config);
    19. }
    20. //获取redisson对象的方法
    21. public static Redisson getRedisson() {
    22. return redisson;
    23. }
    24. }

    3.3、锁的获取和释放

    1. public class DistributedRedisLock {
    2. //从配置类中获取redisson对象
    3. private static Redisson redisson = RedissonManager.getRedisson();
    4. private static final String LOCK_TITLE = "redisLock_";
    5. //加锁
    6. public static boolean acquire(String lockName) {
    7. //声明key对象
    8. String key = LOCK_TITLE + lockName;
    9. //获取锁对象
    10. RLock mylock = redisson.getLock(key);
    11. //加锁,并且设置锁过期时间3秒,防止死锁的产生 uuid+threadId
    12. mylock.lock(2, 3, TimeUtil.SECOND);
    13. //加锁成功
    14. return true;
    15. }
    16. //锁的释放
    17. public static void release(String lockName) {
    18. //必须是和加锁时的同一个key
    19. String key = LOCK_TITLE + lockName;
    20. //获取所对象
    21. RLock mylock = redisson.getLock(key);
    22. //释放锁(解锁)
    23. mylock.unlock();
    24. }
    25. }

    3.4、业务逻辑中使用分布式锁

    1. public String discount() throws IOException{
    2. String key = "lock001";
    3. //加锁
    4. DistributedRedisLock.acquire(key);
    5. //执行具体业务逻辑
    6. dosoming
    7. //释放锁
    8. DistributedRedisLock.release(key);
    9. //返回结果
    10. return soming;
    11. }

    3.5、Redisson分布式锁的实现原理

    加锁机制 

    • 如果该客户端面对的是一个redis cluster集群,他首先会根据hash节点选择一台机器。
    • 发送lua脚本到redis服务器上,脚本如下:
      1. "if (redis.call('exists',KEYS[1])==0) then "+ --看有没有锁
      2. "redis.call('hset',KEYS[1],ARGV[2],1) ; "+ --无锁 加锁
      3. "redis.call('pexpire',KEYS[1],ARGV[1]) ; "+
      4. "return nil; end ;" +
      5. "if (redis.call('hexists',KEYS[1],ARGV[2]) ==1 ) then "+ --我加的锁
      6. "redis.call('hincrby',KEYS[1],ARGV[2],1) ; "+ --重入锁
      7. "redis.call('pexpire',KEYS[1],ARGV[1]) ; "+
      8. "return nil; end ;" +
      9. "return redis.call('pttl',KEYS[1]) ;" --不能加锁,返回锁的时间

    lua的作用:保证这段复杂业务逻辑执行的原子性。

    lua的解释:

    • KEYS[1]) : 加锁的key
    • ARGV[1] : key的生存时间,默认为30秒
    • ARGV[2] : 加锁的客户端ID (UUID.randomUUID()) + “:” + threadId)

         第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。如何加锁呢?很简单,用下面的命令:

    hset myLock 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

      通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:

    myLock :{"8743c9c0-0795-4907-87fd-6c719a6b4586:1":1 }

      上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端对“myLock”这个锁key完成了加锁。

    接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒。

    锁互斥机制

    那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢?

    • 很简单,第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。
    • 接着第二个if判断,判断一下,myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。
    • 所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间。比如还剩15000毫秒的生存时间。
    • 此时客户端2会进入一个while循环,不停的尝试加锁。

    自动延时机制

        只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

    可重入锁机制

    • 第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。
    • 第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”
    • 此时就会执行可重入加锁的逻辑,他会用:

      incrby myLock 8743c9c0-0795-4907-87fd-6c71a6b4586:1 1

    • 通过这个命令,对客户端1的加锁次数,累加1。数据结构会变成:

      myLock :{"8743c9c0-0795-4907-87fd-6c719a6b4586:1":2 }

    释放锁机制

    执行lua脚本如下:

    1. #如果key已经不存在,说明已经被解锁,直接发布(publish)redis消息
    2. "if (redis.call('exists', KEYS[1]) == 0) then " +
    3. "redis.call('publish', KEYS[2], ARGV[1]); " +
    4. "return 1; " +
    5. "end;" +
    6. # key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。 不是我加的锁 不能解锁
    7. "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
    8. "return nil;" +
    9. "end; " +
    10. # 将value减1
    11. "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
    12. # 如果counter>0说明锁在重入,不能删除key
    13. "if (counter > 0) then " +
    14. "redis.call('pexpire', KEYS[1], ARGV[2]); " +
    15. "return 0; " +
    16. # 删除key并且publish 解锁消息
    17. "else " +
    18. "redis.call('del', KEYS[1]); " + #删除锁
    19. "redis.call('publish', KEYS[2], ARGV[1]); " +
    20. "return 1; "+
    21. "end; " +
    22. "return nil;",
    • – KEYS[1] :需要加锁的key,这里需要是字符串类型。
    • – KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lockchannel{” + getName() + “}”
    • – ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。
    • – ARGV[2] :锁的超时时间,防止死锁
    • – ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId
    • 如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。
    • 其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。
    • 如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:
    • “del myLock”命令,从redis里删除这个key。
    • 然后呢,另外的客户端2就可以尝试完成加锁了。

    4、分布式锁特性

    • 互斥性:任意时刻,只能有一个客户端获取锁,不能同时有两个客户端获取到锁。
    • 同一性:锁只能被持有该锁的客户端删除,不能由其它客户端删除。
    • 可重入性:持有某个锁的客户端可继续对该锁加锁,实现锁的续租
    • 容错性:锁失效后(超过生命周期)自动释放锁(key失效),其他客户端可以继续获得该锁,防止死锁

    5、分布式锁的实际应用

    数据并发竞争:利用分布式锁可以将处理串行化,前面已经讲过了。

    防止库存超卖

    • 订单1下单前会先查看库存,库存为10,所以下单5本可以成功;
    • 订单2下单前会先查看库存,库存为10,所以下单8本可以成功;
    • 订单1和订单2 同时操作,共下单13本,但库存只有10本,显然库存不够了,这种情况称为库存超卖。 

    可以采用分布式锁解决这个问题。

         订单1和订单2都从Redis中获得分布式锁(setnx),谁能获得锁谁进行下单操作,这样就把订单系统下单的顺序串行化了,就不会出现超卖的情况了。伪码如下: 

    1. //加锁并设置有效期
    2. if(redis.lock("RDL",200)){
    3. //判断库存
    4. if (orderNum
    5. //加锁成功 ,可以下单
    6. order(5);
    7. //释放锁
    8. redis,unlock("RDL");
    9. }
    10. }

    注意此种方法会降低处理效率,这样不适合秒杀的场景,秒杀可以使用CAS和Redis队列的方式。

    6、Zookeeper分布式锁的对比

    • 基于Redis的set实现分布式锁
    • 基于zookeeper临时节点的分布式锁

    • 基于etcd实现 

    三者的对比,如下表

    Rediszookeeperetcd
    一致性算法paxos(ZAB)raft
    CAPAPCPCP
    高可用主从集群n+1 (n至少为2)n+1
    接口类型客户端客户端http/grpc
    实现setNXcreateEphemeralrestful API
  • 相关阅读:
    AirTag追踪汽车
    Windows 11 如何同步文件到OneDrive ?
    ArduinoUNO实战-第二十二章-红外遥控实验
    智慧园区内涝积水解决方案
    父子组件通信的属性验证 validator
    【LINUX】 LINUX | 提取U盘映像
    设备搭建(waf、蜜罐、ids和ips)
    网络编程/计算机网络
    机器学习基础篇(3)之图像运算和变换
    c\c++ windows自动打开cmd并进入mysql
  • 原文地址:https://blog.csdn.net/weixin_52851967/article/details/127729398