• redis分布式锁、介绍、具体实现,调用、原理、使用场景


    一、作用

    redis分布式锁:可以分为两点:1.分布式 2.加锁

    主要作用是,在多副本部署服务的情况下(或者高并发时),相同时间点内,对业务代码进行加锁,业务代码只能被一个线程执行
    用了分布式锁,相当于强制将多副本(或者单副本高并发时)并行改成串行执行,其他副本直接返回或者阻塞等待(排队执行)

    由于是多副本部署服务, JVM锁某些情况下不能用,诸如synchronized或ReentrantLock只能是锁定当前副本, 分布式锁就能解决锁定全部副本服务

    缺点:并行改成串行后,对高并发不友好,处理能力降低


    二、使用场景

    1.DB操作扣减/增加商品库存数量,DB操作扣减/增加财务金额;按顺序记录变动前,变动后,变动值
    使用mysql时,如果想知道扣减/增加商品库存数量,mysql不能通过一句sql知道数量变动前, 变动后,变动值,所以可使用加锁后,先查询,在更新的方式。
    点赞Sql Server可以使用update处理

    2.接口防重; 创建订单、支付订单,可以使用用户id进行加锁,防止重复提交; 同一时刻用户只能创建一笔订单或支付一次

    3.防止机器高频刷接口;可以使用颁发给前端的token进行加锁

    三、redis分布式锁的实现原理

    1.单线程执行命令
    不考虑redis集群时, 单体redis服务是单线程执行命令的(get、set、delete等等命令), 命令会排队执行,并不存在多个命令同时执行.
    redis服务其实也是多线程,但在执行命令时候是单线程的,所以我们经常说它是单线程。
    redis在6.0的版本中引入了多线程, 多线程处理了网络I/O,用来提高性能, 但是执行命令还是保留单线程,这个经常是面试重点.

    redis提供底层setnx命令;setnx是一个原子性操作;进行加锁
    若key不存在时,才会set值且填充过期时间,返回 1 。
    若key已存在时,不做任何动作,返回 0。

    redis提供底层del命令;进行释放锁
    执行成功返回 1 ; 否则返回 0。


    四、使用StringRedisTemplate操作redis

    如何使用stringRedisTemplate参考:
    https://preparedata.blog.csdn.net/article/details/126249210

    引入org.springframework.data.redis.core
    使用redis的字符串对象opsForValue进行封装加锁、释放锁

    实现加锁,对应redis的setnx命令, 具体实现参考下文
    stringRedisTemplate.opsForValue().setIfAbsent(K key, V value, long timeout, TimeUnit unit)

    为保证释放的锁是自己加的锁,使用lua脚本,保证原子性的, 具体实现参考下文
    stringRedisTemplate.execute();

    借助lua脚本释放锁时,只有获取到的value是客户端自己的value, 才会去删除锁
    所以设置锁的value时,尽可能需要全局唯一的value
    KEYS[1] 表示redis中的key, ARGV[1] 表示redis中的value; 具体实现参考下文


    五、分布式锁的过期时间

    过期时间主要是为了防止释放锁异常,导致死锁;设置了过期时间redis可以自动删除锁。保证后续可以加锁。

    过期时间具体值是多长时间,可有开发人员来衡量。下文中代码没默认了10分钟;

    如果加锁后,10分种内程序都没运行完,由于又有过期时间,所以会被自动释放,可能导致分布式锁没有唯一性。
    可以在加锁之后开启一个子线程进行异步周期性地续时。当释放锁时,再中断结束这个续时线程。这个过程下文代码中并未实现。

    过期时间默认10分钟,业务代码还没执行完,这时候就应该优化业务代码,而不是分布锁。什么业务代码可以一次性执行10分钟都没完成??


    六、锁的重试机制 retryLock

    1.等待时间;设置5秒就可,由开发人员来衡量设置

    如果时间设置的太长,用户就会等待太久才能得到响应结果
    如果时间设置的太短,太短程序退出,就没有了重试的意义

    2.重试间隔时间;

    如果时间设置的太长或太短,都会造成重试成功概率减小;
    主要依据业务代码的执行时间,如果被锁的业务代码大概500毫秒能执行完,重试间隔时间就可以设置或小于500毫秒,比如400毫秒


    七、java代码实现

    默认java项目已经安装和配置redis服务,也已经引用StringRedisTemplate

    1.定义接口

    package com.xxx.redis;
    
    public interface RedisLockService {
    
        /**
         * 重试获取锁。
         *
         * 第一次获取锁失败后,在重试时间retryTimeout时间内,会挂起线程睡眠一定时间,不断重试,
         * 如果重试成功,则直接返回成功;
         * 如果重试失败,直到超时时间结束,返回失败
         *
         * @param key   锁的key
         * @param value  锁的值; 需要一个唯一值, 可以用UUID来产生, 唯一性可确保加锁和释放琐是同一操作人
         * @param retryTimeout   重试超时时间,时间内,不断重试
         * @return 锁获取成功,返回true;否则,返回false
         */
        boolean retryLock(String key, String value, int retryTimeout);
    
        /**
         * 获取锁。
         *
         * @param key   锁的key
         * @param value 锁的值; 需要一个唯一值, 可以用UUID来产生, 唯一性可确保加锁和释放琐是同一操作人
         * @return 锁获取成功,返回true;否则,返回false
         */
        boolean lock(String key, String value);
    
        /**
         * 释放锁
         *
         * @param key   锁的key
         * @param value 锁的值; 需要一个唯一值, 可以用UUID来产生, 唯一性可确保加锁和释放琐是同一操作人
         */
        boolean unlock(String key, String value);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    2.实现接口

    package com.xxx.redis;
    
    import org.apache.commons.lang.StringUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.ReturnType;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Service;
    
    import java.util.concurrent.TimeUnit;
    
    @Service
    public class RedisLockServiceImpl implements RedisLockService {
    
        /**
         * 600000毫秒 = 10分钟; 单位毫秒
         *
         * redis过期时间,过期后自动被删除; 此参数也可使用方法参数来传递进来
         */
        final int expireTime = 1000 * 60 * 10;
    
        /**
         * 200毫秒;单位毫秒
         *
         * 重试锁,重试间隔时间; 此参数也可使用方法参数来传递进来
         */
        final int retryIntervalTime = 200;
    
        /**
         * lua脚本,释放锁, lua脚本命令执行具有原子性
         *
         * 保证只会释放客户端自己的锁
         * 说明
         * 1. if redis.call('get', KEYS[1]) == ARGV[1]  获取到的value是客户端自己的value, 才会去删除锁;
         * 2. 基于上面逻辑,设置锁的value时,尽可能需要全局唯一的value
         * 3. KEYS[1] 表示redis中的key
         * 4. ARGV[1] 表示redis中的value
         *
         */
        final String SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] " +
                "then " +
                "       return redis.call('del', KEYS[1]) " +
                "else " +
                "       return 0 " +
                "end";
    
        @Autowired
        private  StringRedisTemplate redisTemplate;
    
        @Override
        public boolean retryLock(String key, String requestId, int retryTimeout) {
            if(StringUtils.isBlank(key) || StringUtils.isBlank(requestId)){
                return false;
            }
    
            long endTime = System.currentTimeMillis() + retryTimeout;
            while (endTime >= System.currentTimeMillis()) {
                boolean lock = this.lock(key, requestId);
                if (lock) {
                    return true;
                }
                try {
                    //重试锁,睡眠间隔后,再次获取锁,直到成功或超时失败
                    Thread.sleep(retryIntervalTime);
                } catch (InterruptedException e) {
                    return false;
                }
            }
    
            return false;
        }
    
        @Override
        public boolean lock(String key, String requestId) {
            if(StringUtils.isBlank(key) || StringUtils.isBlank(requestId)){
                return false;
            }
            // 若key不存在时,才会set值且填充过期时间,返回 1 。
            // 若key已存在时,不做任何动作,返回 0。
            return redisTemplate.opsForValue().setIfAbsent(this.getLockKey(key), requestId, expireTime, TimeUnit.MILLISECONDS);
        }
    
        @Override
        public boolean unlock(String key, String requestId) {
            if(StringUtils.isBlank(key) || StringUtils.isBlank(requestId)){
                return false;
            }
    
            String finalLockKey = this.getLockKey(key);
            Long result = redisTemplate.execute((RedisCallback<Long>) connection ->
                    connection.eval(SCRIPT.getBytes(), ReturnType.INTEGER, 1, finalLockKey.getBytes(), requestId.getBytes()));
            if(result.equals(1)){
                return true;
            }
    
            return false;
        }
    
        /**
         * 获取加锁Key
         *
         * 自行定义锁的前置key
         *
         * @param key
         * @return
         */
        private String getLockKey(String key){
            String buffer = "LOCK_EKY:" + key;
            return buffer.toUpperCase();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111

    3.调用

    使用try finally主要想保证在异常时,finally也会释放锁unlock

    package com.xxx.controller;
    
    import cn.hutool.core.util.IdUtil;
    import com.xxx.redis.RedisLockService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequestMapping("/v1/test")
    public class TestController {
    
        @Autowired
        private RedisLockService redisLockService;
    
        /**
         * 下单接口; 简单分布锁
         * @return
         */
        @PostMapping("/createOrder")
        public String createOrder() {
            // redis的key; 示例:用户id = 1214648798765413; 只要保证每次加锁的key唯一就行,可以动态生成key
            String key = "CREATE_ORDER:1214648798765413";
            // redis的value; UUID即可; 也可以使用 用户id = 1214648798765413, 只要保证每次加锁的value唯一就行
            String requestId = IdUtil.simpleUUID();
            // 加锁成功或失败
            boolean lock = false;
            try {
                //redis分布式加锁
                lock = redisLockService.lock(key, requestId);
                if (lock) {
                    //TODO 填充加锁的业务代码
    
                    //举例业务代码,模拟业务代码执行时间
                    Thread.sleep(4000L);
    
    
                    return "业务处理完成";
                } else {
                    //获取锁失败,表示别的线程已经占用了锁,正在执行上面业务代码
                    //TODO 处理资源已经被占用
                    return "业务被锁,请稍后重试";
                }
            } catch (Exception e) {
                //TODO 处理异常
                return "业务报错了";
            } finally {
                //只要获取锁成功,业务代码如出现异常,finally中强制释放锁
                if(lock) redisLockService.unlock(key, requestId);
            }
        }
    
        /**
         * 下单接口; 重试分布锁
         * @return
         */
        @PostMapping("/createOrder2")
        public String createOrder2() {
            // redis的key; 示例:用户id = 1214648798765413; 只要保证每次加锁的key唯一就行
            String key = "CREATE_ORDER:1214648798765413";
            // redis的value; UUID即可; 也可以使用 用户id = 1214648798765413, 只要保证每次加锁的value唯一就行
            String requestId = IdUtil.simpleUUID();
            // 重试等待时间:6秒; 6秒内不断尝试获取锁, 直至获取成功或超时
            int retryTimeout = 6 * 1000;
            // 加锁成功或失败
            boolean lock = false;
            try {
                //redis分布式加锁, 重试锁
                lock = redisLockService.retryLock(key, requestId, retryTimeout);
                if (lock) {
                    //TODO 填充加锁的业务代码
    
                    //举例业务代码,模拟业务代码执行时间
                    Thread.sleep(4000L);
    
                    return "业务处理完成";
                } else {
                    //获取锁失败,表示别的线程已经占用了锁,正在执行上面业务代码
                    //TODO 处理资源已经被占用
                    return "业务被锁,请稍后重试";
                }
            } catch (Exception e) {
                //TODO 处理异常
                return "业务报错了";
            } finally {
                //只要获取锁成功,业务代码如出现异常,finally中强制释放锁
                if(lock) redisLockService.unlock(key, requestId);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
  • 相关阅读:
    Linux 文件特殊权限和ACL权限设置
    【Linux】NTP时间服务器Chrony配置详解
    键盘出口欧洲地区需要做哪些检测认证?
    Mac电池管理软件 Batteries for Mac v2.2.9直装版
    Mutisim仿真软件使用
    项目第六天
    原型设计模式
    Linux下socket客户端连接服务器端发送数据
    Springboot旅游网站管理系统毕业设计、Springboot旅游线路和景点网站设计与实现 毕设作品参考
    Redis分布式锁最牛逼的实现(Java 版,最牛逼的实现方式)
  • 原文地址:https://blog.csdn.net/sinat_16998945/article/details/126349529