• 【Redis】3.详解分布式锁


    1. 什么是分布式锁

    分布式锁是指分布式系统或者不同系统之间共同访问共享资源的一种锁实现,其是互斥的,多个线程均可见。

    分布式锁的核心是大家都用同一个锁,不同服务之间锁是一样的,那么就能锁得住进程,不让进程进入锁住的代码块中。

    在这里插入图片描述

    为什么会使用分布式锁呢?使用ReentrantLock或synchronized不行吗?

    ReentrantLock和synchronized在分布式情况下,每个服务的锁对象都不一样,因为每个服务的锁对象都是不一样的,所以无法锁住不同服务的线程。


    2. 分布式锁的特点

    那么分布式锁有什么特点呢?

    1. 可见性:指所有线程都可见,无论是同一个服务还是不同服务,都可以感知到锁的变化
    2. 互斥:可以使得城西串行化
    3. 高可用:不容易崩溃,时时可用
    4. 高性能:由于加锁本身就让性能降低,所以对于分布式锁本身需要他就较高的加锁性能和释放锁性能
    5. 安全性:安全也是程序中必不可少的一环

    在这里插入图片描述


    3. 常见的分布式锁

    常见的分布式锁有以下三种:

    1. Mysql:mysql本身就有锁机制,但是由于mysql性能一般,因此很少使用mysql作为分布式锁
    2. Redis:redis作为分布式锁是企业里面很常用的方式。主要是使用sentnx方法,如果插入key成功,也就代表获得了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁
    3. Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,具体可看作者【单曲循环的寂寞】——基于zookeeper实现分布式锁
    MYSQLredisZookeeper
    互斥利用mysql本身的互斥锁机制利用sentnx这样的互斥命令利用节点的唯一性和有序性实现互斥
    高可用
    高性能一般一般
    安全性断开连接,自动释放锁利用锁的超时时间,到期释放临时节点,断开连接自动释放

    4. 实现分布式锁

    分布式锁实现的有以下两个很重要的步骤:

    • 获取锁:

      • 互斥:只能有一个线程获取锁
      • 非阻塞:尝试一次,成功返回true,失败返回false
    • 释放锁:

      • 手动释放
      • 超时释放:获取锁时添加一个超时时间

    核心思路

    • 利用redis的setNx方法,当很多线程进入抢夺锁的时候,线程1进入redis插入key,返回1,如果结果是1,证明获取锁成功,它就可以执行锁内的业务。当其他线程尝试获取锁的时候,无法获取成功,便会一定时间后重试。只有当线程1执行完业务,删掉该key之后,其他线程才能获取锁。

    在这里插入图片描述

    具体实现分布式锁的步骤如下:

    • 定义一个锁接口,接口里面有两个重要的方法

      • tryLock( long timeoutSec ):尝试获取锁,参数为锁持有的过期时间,过期后自动释放

      • unlock( ):释放锁

      • public interface ILock {
        
            /**
             * 尝试获取锁
             * @param timeoutSec 锁持有的超时时间,过期后自动释放
             * @return true代表获取锁成功; false代表获取锁失败
             */
            boolean tryLock(long timeoutSec);
        
            /**
             * 释放锁
             */
            void unlock();
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
    • 定义一个类,实现该接口

      • public class SimpleRedisLock implements ILock {
        
            private String name;
            private StringRedisTemplate stringRedisTemplate;
        
            public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
                this.name = name;
                this.stringRedisTemplate = stringRedisTemplate;
            }
        
            private static final String KEY_PREFIX = "lock:";
            //ture可以去掉uuid的横线
            private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
        
            @Override
            public boolean tryLock(long timeoutSec) {
                // 获取线程标示
                String threadId = ID_PREFIX + Thread.currentThread().getId();
                // 获取锁
                Boolean success = stringRedisTemplate.opsForValue()
                        .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
                //这里这样返回是因为防止拆箱的时候success为null,导致空指针异常
                return Boolean.TRUE.equals(success);
            }
        
        
            @Override
            public void unlock() {
                // 获取线程标示
                String threadId = ID_PREFIX + Thread.currentThread().getId();
                // 获取锁中的标示
                String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
                // 判断标示是否一致
                if(threadId.equals(id)) {
                    // 释放锁
                    stringRedisTemplate.delete(KEY_PREFIX + name);
                }
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39

    这里的分布式锁,是为了解决一人一单问题,也就是一个人只能下一单,因此key就是key前置+用户id。

    而value,则是uuid+线程id。为什么这样做呢?

    是因为如果单单用线程id作为value的话,在分布式情况下,用户发送多个请求过来,会出现分布式锁误删情况。当有一个线程1获取到锁(这把锁后面称为锁A),由于某种情况,线程1获取锁A之后出现了阻塞,导致锁A超时被释放。这时候线程2进来获取锁(这把锁后面称为锁B),这时候线程1反应过来,进行释放锁的操作,因为锁A已经超时释放了,这时候线程1释放的锁将会是线程2获取的锁B,因此出现了误删的情况。

    在这里插入图片描述

    因此,需要在每个线程释放锁之前,判断这把锁是否属于自己,如果属于自己则释放,如果不属于自己那么就不释放,防止误删。因为程序可能位于分布式的系统中,那么多个服务之间线程ID可能会出现一样,因此value不能单单使用线程ID,应该用uuid拼上线程ID,这样保证了分布式情况下value的唯一性。

    总而言之,解决分布式锁误删问题的解决方案如下:

    1. value使用UUID+线程ID。
    2. 在释放锁的时候,先判断该锁是否是自己的,也就是判断value。
      1. 如果该key对应的value是自己存进去的那个value,则释放锁。
      2. 否则不释放锁,防止锁误删。

    在这里插入图片描述

    一人一单的业务逻辑:

    • 首先判断秒杀是否开始

      • @Override
        public Result seckillVoucher(Long voucherId) {
            // 1.查询优惠券
            SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
            // 2.判断秒杀是否开始
            if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
                // 尚未开始
                return Result.fail("秒杀尚未开始!");
            }
            // 3.判断秒杀是否已经结束
            if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
                // 尚未开始
                return Result.fail("秒杀已经结束!");
            }
            // 4.判断库存是否充足
            if (voucher.getStock() < 1) {
                // 库存不足
                return Result.fail("库存不足!");
            }
        
            return createVoucherOrder(voucherId);
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
    • 秒杀开始之后创建订单

      • @Transactional
        public Result createVoucherOrder(Long voucherId) {
            // 5.一人一单
            Long userId = UserHolder.getUser().getId();
        
            // 创建锁对象
            SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
            // 尝试获取锁
            boolean isLock = redisLock.tryLock(1200);
            // 判断
            if (!isLock) {
                // 获取锁失败,直接返回失败或者重试
                return Result.fail("不允许重复下单!");
            }
        
            try {
                // 5.1.查询订单
                int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
                // 5.2.判断是否存在
                if (count > 0) {
                    // 用户已经购买过了
                    return Result.fail("用户已经购买过一次!");
                }
        
                // 6.扣减库存
                boolean success = seckillVoucherService.update()
                        .setSql("stock = stock - 1") // set stock = stock - 1
                        .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
                        .update();
                if (!success) {
                    // 扣减失败
                    return Result.fail("库存不足!");
                }
        
                // 7.创建订单
                VoucherOrder voucherOrder = new VoucherOrder();
                // 7.1.订单id
                long orderId = redisIdWorker.nextId("order");
                voucherOrder.setId(orderId);
                // 7.2.用户id
                voucherOrder.setUserId(userId);
                // 7.3.代金券id
                voucherOrder.setVoucherId(voucherId);
                save(voucherOrder);
        
                // 7.返回订单id
                return Result.ok(orderId);
            } finally {
                // 释放锁
                redisLock.unlock();
            }
        
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44
        • 45
        • 46
        • 47
        • 48
        • 49
        • 50
        • 51
        • 52
        • 53

    这里分布式锁的作用,主要是保证程序在创建订单,将订单数据插入数据库这一过程中,只有一个线程在代码块中执行,防止超买超卖的情况,有效解决一人一单。


    5.解决分布式锁中的原子性问题

    虽然上面解决的分布式锁误删的情况,但是会出现分布式锁的原子性问题。

    线程1尝试获取锁,获取锁成功(这把锁下面称为锁A),线程1执行完业务逻辑,准备释放锁。这时候其他线程进不来,所以这把锁肯定是线程1自己的,当线程1判断完标识为一致后,准备执行释放锁操作,这时候由于某种情况,线程1阻塞了,由于阻塞时间太长,锁A超时释放了。这时候线程2获取锁成功(这把锁后面称为锁B),这时候线程1不再阻塞,执行释放锁操作,因为锁A超时释放,因此线程1执行的释放锁操作释放的是线程2获取的锁B,因此依然出现了锁误删情况**,出现这一情况的原因是因为判断锁是否是自己的和释放锁这是两个操作**,不存在原子性。

    在这里插入图片描述

    解决方法也很简单,只需要保证判断锁和释放锁这两个操作是原子的就可以了。


    5.1 Lua脚本

    Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了。

    Redis提供的调用函数,语法如下:

    redis.call('命令名称', 'key', '其它参数', ...)
    
    • 1

    比如执行set name jack,则脚本如下:

    # 执行 set name jack
    redis.call('set', 'name', 'jack')
    
    • 1
    • 2

    使用redis命令执行脚本

    在这里插入图片描述

    上面是写死的情况,如果不想写死,可以使用参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数(注意数组是从1开始):

    在这里插入图片描述

    因此,上面释放锁的操作写成Lua脚本就可以这样写

    -- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
    -- 获取锁中的标示,判断是否与当前线程标示一致
    if (redis.call('GET', KEYS[1]) == ARGV[1]) then
      -- 一致,则删除锁
      return redis.call('DEL', KEYS[1])
    end
    -- 不一致,则直接返回
    return 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    5.2 使用Java代码调用Lua脚本实现原子性

    在RedisTemplate已经为我们封装好了一个execute方法用来执行Lua脚本

    public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {
        return this.scriptExecutor.execute(script, keys, args);
    }
    
    • 1
    • 2
    • 3
    1. script:Lua脚本
    2. keys:KEYS数组
    3. args:ARGV数组

    在这里插入图片描述

    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }
    
    public void unlock() {
        // 调用lua脚本
        stringRedisTemplate.execute(
            UNLOCK_SCRIPT,
            Collections.singletonList(KEY_PREFIX + name),
            ID_PREFIX + Thread.currentThread().getId());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    这里可以将Lua脚本写在一个Lua文件中,从而实现解耦,可以使用Spring的ClassPathResource读取Class类下的文件。


    参考:黑马程序员Redis入门到实战教程,深度透析redis底层原理+redis分布式锁+企业解决方案+黑马点评实战项目


  • 相关阅读:
    Python软件编程等级考试一级——20220915
    高数_证明_方向导数计算公式
    【无标题】
    【从测试小白到管理】这几点让我终身受益,没有人永远18岁,但永远有人18岁......
    java ssm创意设计分享系统
    leetCode 1035.不相交的线 动态规划 + 滚动数组 (最长公共子序列)
    【前端验证】被动响应型uvm_model环境搭建——以握手型ram_model为例
    树上倍增法
    30天Python入门(第六天:深入了解Python中的元组)
    张成方案——Span Programs
  • 原文地址:https://blog.csdn.net/weixin_51146329/article/details/127588196