• Java基于Redis实现的分布式锁---电商下单案例


    一、案例

    案例是一个简易版的电商项目中的下单功能,先简单分析下下单功能的一些基本步骤:
    1.校验库存 2.保存订单 3.生成快照 4.扣减库存 5.删除购物车

    二、问题

    在这个案例中不难发现,在没有枷锁的情况下,如果两个线程同时执行下单操作,可能会在校验库存后同时执行后面的业务导致同时扣减库存,会引发库存小于0的情况,也就是超卖问题

    在单体项目中,这个问题很容易解决,那就是加锁,使用一个 synchronized锁的可以很容易的解决

    但是在分布式的项目中,你的后端服务器有多个,而 synchronized锁只能保证在同一个JVM中不会有并发问题,所以此时用 synchronized锁就不能解决问题了,此时就需要分布式锁

    三、基于Redis实现分布式锁

    1.实现

    在这里插入图片描述
    如上图,通过Redis的sexnx(key不在,添加;key在,不添加)来实现加锁,key设置为商品id,value暂时可以随便设置。在每次执行下单前,先使用sexnx来对商品加锁,如果对应的商品id在redis中不存在,那么该商品就是无锁的,可以加锁然后执行后序下单业务,如果对应商品id在redis中存在,那么说明该商品已经被加锁了,有其他线程正在操作这个商品,那么此时就需要等待或者直接下单失败了

    代码实现:

     @Transactional
    public String saveOrder(int productId) {
        Boolean value = stringRedisTemplate.boundValueOps(productId + "").setIfAbsent("value");
        if (value) {
            try {
                //检验库存
                int stock = tbProductDao.querystock(productId);
                if (stock > 0) {
                    //保存订单
                    System.out.println("保存订单");
                    //修改库存
                    tbProductDao.redustock(productId);
                    //删除购物车
                    return "订单提交成功";
                } else {
                    return "商品不存在";
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                //解锁
                stringRedisTemplate.delete(productId+"");
            }
        }
        return "订单提交失败";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    2. 优化

    以上代码可以实现一个简单的分布式锁,但是仍然有一些可以优化的地方:
    1.获取锁失败时会直接返回订单提交失败,应该不断尝试获取锁
    2.无法保证锁的获取顺序是按照线程顺序来的

    1)阻塞锁和非阻塞锁

    对于问题1,可以通过阻塞锁或者非阻塞锁实现

    阻塞锁:不断尝试获取锁,直到获取到锁为止

            //阻塞锁实现
            while( b == false){
                b = stringRedisTemplate.boundValueOps(productId + "").setIfAbsent("value");
    
    • 1
    • 2
    • 3

    非阻塞锁:如果获取不到锁就放弃,但是可以支持在一段时间内重试

     //非阻塞锁实现
            int count = 1;
            while( b == false && count < 4){
                b = stringRedisTemplate.boundValueOps(productId + "").setIfAbsent("value");
                count ++;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    使用两种锁都可以实现
    我们通过非阻塞锁来实现

    2)公平锁和非公平锁

    对于问题2 可以使用公平锁来实现
    公平锁:按照线程的先后顺序获取锁
    非公平锁:多个正在等待的线程随机获取锁

    3.问题

    1)线程异常导致无法释放锁

    上述的分布式锁还有一个问题:
    在执行业务的过程中,如果当前线程出现异常(宕机)没有释放锁,那么就会导致死锁

    解决:
    可以对锁设置过期时间,当出现异常没有释放锁,在过期时间结束时也能自动释放锁

    2)T1过期释放T2锁

    对锁设置过期时间能够解决死锁问题,但是另一个问题随之而来:假设有两个线程T1、T2,T1首先获得锁,但是在锁过期之前并没有完成业务的执行,同时T2线程成功加锁,T1执行结束后又释放锁(此时释放的是T2的锁),这就会导致T2在无锁状态下执行

    解决:
    在加锁时为锁设置唯一的value,释放锁时要先获取对应的value

     String value = UUID.randomUUID().toString();
     boolean b = stringRedisTemplate.boundValueOps(productId + "")
                .setIfAbsent(value,3,TimeUnit.MINUTES);
    
    • 1
    • 2
    • 3

    如果获取的指与当前value相同则释放锁

    //查询操作
    String v = stringRedisTemplate.boundValueOps(productId + "").get();
    if(value.equals(v)){
        //删除操作
        stringRedisTemplate.delete(productId+"");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    简单来说就是每个线程只能释放自己加的锁

    但是此时又产生了新的问题,看释放锁的代码,释放锁是分为两步进行的:1.查询value 2.删除
    那么在这一步时也会产生并发问题,如果在查询完成后刚要删除锁,但是此时锁过期了并且其他线程成功加锁,那么也会导致本线程释放其他线程的锁

    解决:
    要解决这个问题本质是就是让查询和删除这两步操作是原子性的,可以使用lua脚本
    lua:

    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5

    配置Bean加载lua

    @Bean
    public DefaultRedisScript<List> defaultRedisScript(){
        DefaultRedisScript<List> defaultRedisScript = new DefaultRedisScript<>();
        defaultRedisScript.setResultType(List.class);
        defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
        return  defaultRedisScript;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    执行lua解锁

    @AutoWired
    private DefaultRedisScript defaultRedisScript;
    
    //执行lua脚本
    List<String> keys = new ArrayList<>();
    keys.add(skuId);
    List rs = stringRedisTemplate.execute(defaultRedisScript,keys , values.get(skuId));
    System.out.println(rs.get(0));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    分析以上的问题不难看出,对于过期时间的设置是一个很困难的问题,必须保证在这个时间内业务能够执行完成并且也不能等待太久
    对于这个问题可以使用看门狗机制

    4)看门狗机制

    看门狗线程工作原理:
    监听当前线程锁的过期时间,当锁即将过期时如果有任务没有执行结束,则重置锁的过期时间,保证有任务线程正常执行的过程中,锁不会过期

    4.Redisson

    基于以上的问题,我们可以使用Redisson------一个基于Redis+看门狗机制的分布式锁框架

    依赖:

    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.12.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    配置yml

    redisson:
      addr:
        singleAddr:
          host: redis://localhost:6380
          database: 0
    
    • 1
    • 2
    • 3
    • 4
    • 5

    RedissonConfig:

    @Configuration
    public class RedissonConfig {
    
    
        @Value("${redisson.addr.singleAddr.host}")
        private String host;
    
        @Value("${redisson.addr.singleAddr.database}")
        private int database;
    
    
        @Bean
        public RedissonClient redissonClient(){
            Config config = new Config();
            config
                    .useSingleServer()
                    .setAddress(host)
                    .setDatabase(database);
            return Redisson.create(config);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    使用:

    @Transactional
    public String saveOrder(int productId) {
            //获取公平锁
            RLock lock=redissonClient.getFairLock(productId+"");
            //非阻塞锁
            try {
                boolean b = lock.tryLock(3, TimeUnit.MINUTES);
                if(b){
    
                    System.out.println("业务执行");
                    return "订单提交成功";
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
            return "订单提交失败";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
  • 相关阅读:
    Fiori VS code 连接配置本地SDK yaml 文件配置
    AUTOSAR规范与ECU软件开发(实践篇)9.7 AUTOSAR安全机制的E2E保护
    弹跳的小球
    Flutter 创建自己的对话框,不使用任何包!
    tcp/ip协议2实现的插图,数据结构2 (9 - 章)
    IO多路转接
    弱网模拟工具
    C++ Qt开发:自定义Dialog对话框组件
    【大数据平台】从Hadoop到Spark安装配置教程
    oracle学习43-oracle导出空表
  • 原文地址:https://blog.csdn.net/qq_56892136/article/details/127716549