• Redis分布式锁


    先来说一个分布式锁的业务场景吧!在 GuliMall 项目的秒杀服务中,有个扣库存的逻辑。在秒杀服务上线前,会提前从数据库从预留库存,然后保存进 Redis 中,商品id对应着信号量。我们在秒杀成功时会减除相应数量的库存,在单体架构上我们只需要填加一个同步锁即可。但在分布式情况下,有若干个服务,同步锁只能锁住它当前所在的服务。在并发量高的情况下仍然会存在错扣库存的场景,比如说服务A、服务B都拿到库存50,然后去扣库存。

    方案一:setex+ expire

    ​ 不知道大家还记不记得 setex 这个命令,语法setnx key value 。(如果 key 不存在,则创建键值对。如果 可以 存在,则创建键值对是失败)。那么我们是不是可以使用设置一把锁,多台服务器使用 setex 去抢占它,那样在某个时刻只会有一个服务抢占到锁。抢占到锁必然要去释放锁,我们可以在 finally中编写一个锁释放的逻辑。这里是不是什么问题呢?

    ​ 没错,尚若在执行“扣库存”业务代码时服务宕机了,那么永远执行不了释放锁的逻辑。可以使用expire给锁设置一个过期时间,防止锁忘记了释放。不过这里还存在个问题 setnx 和 expire 两个命令分开了,「不是原子操作」。如果执行完setnx加锁,正要执行expire设置过期时间时,进程crash(崩溃)或者要重启维护了,那么这个锁就“长生不老”了,「别的线程永远获取不到锁啦」。我们可以使用setex key 时间 value 令抢锁并设置初始值,保证它的原子性。或者使用Lua脚本(包含SETNX + EXPIRE两条指令)去保证原子性,如:

    if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then
       redis.call('expire',KEYS[1],ARGV[2])
    else
       return 0
    end;
    

    那么初代的分布式锁就编写完成啦~

    @RestController
    @RequestMapping("seckill")
    @Slf4j
    public class SeckillController {
    
        final String LOCKKEY = "STOCKLOCKKEY_";
        final String STOCK = "STOCK_";
    
        @Autowired
        StringRedisTemplate stringRedisTemplate;
    
        /**
         * 减库存逻辑
         * @param orderId   订单ID,这里应该拼接生成,这里简化
         * @param productId 活动商品ID
         * @param num       购买数量
         * @return
         */
        @RequestMapping("/lessInventory")
        public String lessInventory(String orderId, String productId, int num) {
            try {
                /**
                 * 加分布式锁,使用 setIfAbsent 保证是个原子操作。分为两个步骤:setnx命令来抢锁,再用expire给锁设置一个过期时间(防止死锁)
                 *  1. setnx命令的原理是:
                 *    + 如果key不存在,则创建键值对
                 *    + 如果key存在,则不创建键值对
                 *    Boolean seize = stringRedisTemplate.opsForValue().setIfAbsent(LOCKKEY + orderId, "seize");
                 *  2.expire给key设置一个过期时间
                 *    stringRedisTemplate.expire(LOCKKEY + orderId, 10, TimeUnit.SECONDS);
                 */
                Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCKKEY + orderId, "jiasuo", 10, TimeUnit.SECONDS);
                if (!result) {
                    return "系统繁忙,请重试!";
                }
                // 相关减库存逻辑
                // 1. 获取相关活动商品的库存
                int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK + productId));
                if (stock > 0) {
                    // 2. 若库存还有,减库存
                    int realStock = stock-num;
                    stringRedisTemplate.opsForValue().set(STOCK + productId, String.valueOf(realStock));
                    log.info("减库存成功!");
                    return "减库存成功";
                } else {
                    log.info("库存不足!");
                    return "库存不足";
                }
            } catch (Exception e) {
                return "系统故障,程序员小哥努力抢修!";
            } finally {
                // 锁释放
              stringRedisTemplate.delete(LOCKKEY+orderId);
            }
        }
    }
    

    动动小脑壳,为什么叫“ 初代 ” 呢?

    这里仍然存在两个问题,

    1、「锁被别的线程误删」假设线程a执行完后,去释放锁。但是它不知道当前的锁可能是线程b持有的(线程a去释放锁时,有可能过期时间已经到了,此时线程b进来占有了锁)。那线程a就把线程b的锁释放掉了,但是线程b临界区业务代码可能都还没执行完呢。

    ​ 我们可以给value值设置一个标记当前线程唯一的随机数(UUID),在finally快中删除锁的时候校验一下,是否是当前锁的唯一数。

    @RestController
    @RequestMapping("seckill")
    @Slf4j
    public class SeckillController {
    
        final String LOCKKEY = "STOCKLOCKKEY_";
        final String STOCK = "STOCK_";
    
        @Autowired
        StringRedisTemplate stringRedisTemplate;
    
        /**
         * 减库存逻辑
         * @param orderId   订单ID,这里应该拼接生成,这里简化
         * @param productId 活动商品ID
         * @param num       购买数量
         * @return
         */
        @RequestMapping("/lessInventory")
        public String lessInventory(String orderId, String productId, int num) {
            // 唯一随机值,再删除 防止误删
            String clientId = UUID.randomUUID().toString();
            try {
                /**
                 * 加分布式锁,使用 setIfAbsent 保证是个原子操作。分为两个步骤:setnx命令来抢锁,再用expire给锁设置一个过期时间(防止死锁)
                 *  1. setnx命令的原理是:
                 *    + 如果key不存在,则创建键值对
                 *    + 如果key存在,则不创建键值对
                 *    Boolean seize = stringRedisTemplate.opsForValue().setIfAbsent(LOCKKEY + orderId, "seize");
                 *  2.expire给key设置一个过期时间
                 *    stringRedisTemplate.expire(LOCKKEY + orderId, 10, TimeUnit.SECONDS);
                 */
                Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCKKEY + orderId, clientId, 10, TimeUnit.SECONDS);
                if (!result) {
                    return "系统繁忙,请重试!";
                }
                // 相关减库存逻辑
                // 1. 获取相关活动商品的库存
                int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK + productId));
                if (stock > 0) {
                    // 2. 若库存还有,减库存
                    int realStock = stock-num;
                    stringRedisTemplate.opsForValue().set(STOCK + productId, String.valueOf(realStock));
                    log.info("减库存成功!");
                    return "减库存成功";
                } else {
                    log.info("库存不足!");
                    return "库存不足";
                }
            } catch (Exception e) {
                return "系统故障,程序员小哥努力抢修!";
            } finally {
                // 3、校验唯一随机值,再删除 防止误删
                if (clientId.equals(stringRedisTemplate.opsForValue().get(LOCKKEY + orderId))) {
                    stringRedisTemplate.delete(LOCKKEY+orderId);
                }
            }
        }
    }
    

    2、「锁过期释放了,业务还没执行完」。假设线程a获取锁成功,一直在执行临界区的代码。但是100s过去后,它还没执行完。但是,这时候锁已经过期了,此时线程b又请求过来。显然线程b就可以获得锁成功,也开始执行临界区的代码。那么问题就来了,临界区的业务代码都不是严格串行执行的啦。

    ​ 可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放。这里逻辑很清晰,但实现起来还是有太多因素要考虑。


    方案二:Redisson

    ​ 一般都是使用 Redisson去实现,只要线程一加锁成功,就会启动一个watch dog看门狗,它是一个后台线程,会每隔10秒检查一下,如果线程1还持有锁,那么就会不断的延长锁key的生存时间。实现如下~

    public String lessInventory(String orderId, String productId, int num) {
        RLock lock = redisson.getLock(LOCKKEY + orderId);
        try {
            // 1、加锁
            lock.lock();
            // 相关减库存逻辑
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK + productId));
            if (stock > 0) {
                int realStock = stock-num;
                stringRedisTemplate.opsForValue().set(STOCK + productId, String.valueOf(realStock));
                log.info("减库存成功!");
                return "减库存成功";
            } else {
                log.info("库存不足!");
                return "库存不足";
            }
        } catch (Exception e) {
            return "系统故障,程序员小哥努力抢修!";
        } finally {
            // 2、释放锁
            lock.unlock();
        }
    }
    

    对于一些分布式锁的优化,我们可以考虑使用 分段锁。让我们大家一起去探索吧~

  • 相关阅读:
    2024 年如何复用 ChatGPT 从头开始​​快速学习 Python
    线扫相机DALSA--常见问题一:软件安装顺序
    《游戏系统设计十五》游戏房间服的设计
    2022软考网络工程师学习笔记——计算机组成与结构(Day2)
    神经网络怎么看训练效果,神经网络结果图如何看
    静态常量如何使用?
    Java成员方法的声明和调用
    ASP.NET第五章 Application、Session和Cookie对象
    如何设计高性能架构
    Linux动态库
  • 原文地址:https://blog.csdn.net/m0_49183244/article/details/126952345