• 【Redis实战】分布式锁


    分布式锁

    synchronized只能保证单个JVM内部的线程互斥,不能保证集群模式下的多个JVM的线程互斥。

    分布式锁原理

    每个JVM内部都有自己的锁监视器,但是跨JVM,就会有多个锁监视器,就会有多个线程获取到锁,不能实现多JVM进程之间的互斥。
    我们不能使用JVM内部的锁监视器,我们必须让多个JVM去使用同一个锁监视器,所以肯定是一个独立于JVM内部的,多个JVM都可以看到的监视器。
    image.png
    过程
    image.png

    特性

    image.png

    多进程可见

    多个JVM都可以看到,比如Redis,MySQL等。JVM外部的基本都可以实现。

    互斥

    只能有一个人拿到锁

    高可用

    大多数情况下,获取锁都是成功的,而不是频繁失败

    高并发/高性能

    加锁本身就会影响性能,会变成串行执行,如果加锁本身也很慢,就不行了。

    安全性

    异常情况下,比如,获取锁完毕之后,锁无法释放,服务宕机了。
    死锁问题等等。

    功能性特性

    比如是否可重入,阻塞还是非阻塞的,公平还是非公平锁

    不同的分布式锁区别

    image.png

    MySQL

    • 互斥:通过事务的互斥锁来实现,事务提交锁释放,异常事务回滚
    • 高可用:依赖MySQL本身的高可用
    • 高性能:受限于MySQL的性能
    • 安全性:通过事务获取锁,断开链接的时候,锁会自动释放

    Redis

    • 互斥:通过setnx互斥命令来实现互斥
    • 高可用:Redis本身可以实现主从和集群模式,可用性高
    • 高性能:较高
    • 安全性:服务出现故障,锁无法释放,死锁,可以利用key的过期机制来实现

    Zookeeper

    • 互斥:利用内部节点的唯一性和有序性来实现,每个节点的id都是自增的,删除节点,另外一个节点就说最小的了
    • 高可用:支持集群
    • 高性能:保证强一致性,主从之间数据同步会消耗一定时间
    • 安全性:创建的是临时节点,服务宕机,锁会自动释放

    Redis实现分布式锁

    分布式锁需要实现两个最基本的方法

    获取锁

    互斥

    确保只能有一个线程执行成功。通过redis的setnx命令来实现,同时执行时,只有1个能执行成功,实现互斥。

    #获取锁
    setnx key value
    
    • 1
    • 2

    image.png

    • 添加锁的过期时间,避免服务宕机引起死锁。过期时间需要注意,业务还没处理完但是锁过期的问题
    #设置过期时间
    expire key 10
    
    • 1
    • 2

    image.png
    为了避免出现,setnx后,expire之前,服务宕机的问题,我们将两条命令合并为一条,保证原子性

    #添加锁 nx是互斥,ex是过期时间
    set key value ex 10 nx
    #或者
    set key value nx ex 10
    
    • 1
    • 2
    • 3
    • 4

    image.png

    非/阻塞式获取锁

    获取锁成功返回ok,失败返回nil,如果失败了,有两种解决方案,jdk中,有两种方案:一直阻塞式等待,另一种,获取锁失败即刻返回。
    非阻塞式获取锁,尝试一次,成功返回true,失败返回false!

    释放锁

    手动释放

    手动删除即可

    #释放锁
    del key
    
    • 1
    • 2

    image.png

    超时释放

    获取锁时,添加一个超时时间,避免出现服务宕机,锁无法被释放

    流程

    image.png

    分布式锁初级版

    执行流程

    image.png

    分布式锁代码

    接口

    /**
     * 分布式锁
     *
     * @author zhangzengxiu
     * @date 2023/10/9
     */
    public interface ILock {
    
        /**
         * 尝试去获取锁
         *
         * @param timeoutSc 过期时间,过期锁自动释放
         * @return 获取成功返回true,失败返回false
         */
        boolean tryLock(long timeoutSc);
    
        /**
         * 释放锁
         */
        void unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    实现

    import org.springframework.data.redis.core.StringRedisTemplate;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author zhangzengxiu
     * @date 2023/10/9
     */
    public class SimpleRedisLock implements ILock {
    
        private StringRedisTemplate stringRedisTemplate;
    
        /**
         * 锁统一前缀
         */
        public static final String KEY_PRE = "lock:";
    
        /**
         * 业务名称
         */
        private String name;
    
        public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
            this.stringRedisTemplate = stringRedisTemplate;
            this.name = name;
        }
    
        @Override
        public boolean tryLock(long timeoutSc) {
            //获取线程标识
            long threadId = Thread.currentThread().getId();
            String key = KEY_PRE + name;
            String value = String.valueOf(threadId);
            Boolean res = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeoutSc, TimeUnit.SECONDS);
            return Boolean.TRUE.equals(res);
        }
    
        @Override
        public void unlock() {
            String key = KEY_PRE + name;
            stringRedisTemplate.delete(key);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    业务代码

    image.png

    异常情况

    线程1尝试去获取锁,获取到锁之后,

    • 正常情况:业务执行完毕后,正常释放锁

    image.png

    • 异常情况:业务执行时间超过了锁的超时时间,锁被超时释放;

    image.png

    • 误删锁

    image.png

    • 线程1的锁由于业务阻塞被超时释放了,此时锁被线程2获取到了,此时线程1醒了,继续执行并释放了锁,此时被释放的锁是线程2的锁。
    • 这时线程3也获取到了被释放的锁,此时相当于多个线程在并行执行,线程并发安全问题依然存在。

    解决方案
    image.png
    释放锁的时候判断是不是自己的锁,是自己的锁才能释放,否则无法释放锁。
    image.png

    改进分布式锁(解决锁误删问题)

    image.png
    线程id是JVM内部递增的,集群模式下,每个JVM内部都会有自增的线程id,会出现线程id冲突的情况。
    如果只是使用线程id作为区分是不行的,还要区分JVM,我们可以使用UUID或者线程id拼接UUID的形式来实现。通过UUID来区分不同的JVM,再通过线程id来区分不同的线程。

    业务流程

    image.png

    分布式锁代码实现

    import cn.hutool.core.lang.UUID;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.data.redis.core.StringRedisTemplate;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author zhangzengxiu
     * @date 2023/10/9
     */
    public class SimpleRedisLock implements ILock {
    
        private StringRedisTemplate stringRedisTemplate;
    
        /**
         * 锁统一前缀
         */
        public static final String KEY_PRE = "lock:";
    
        /**
         * 锁的值的前缀
         */
        public static final String ID_PRE = UUID.randomUUID().toString(true) + "—";
    
        /**
         * 业务名称
         */
        private String name;
    
        public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
            this.stringRedisTemplate = stringRedisTemplate;
            this.name = name;
        }
    
        @Override
        public boolean tryLock(long timeoutSc) {
            //获取线程标识
            String value = ID_PRE + Thread.currentThread().getId();
            String key = KEY_PRE + name;
            Boolean res = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeoutSc, TimeUnit.SECONDS);
            return Boolean.TRUE.equals(res);
        }
    
        @Override
        public void unlock() {
            //获取线程标识
            String value = ID_PRE + Thread.currentThread().getId();
            String key = KEY_PRE + name;
            //获取锁中的标识
            String val = stringRedisTemplate.opsForValue().get(key);
            if (StringUtils.equals(value, val)) {
                //释放锁
                stringRedisTemplate.delete(key);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    image.png

    异常情况

    当前代码依然存在异常情况,比如:

    • 线程1操作结束,释放锁的时候,先判断是否是自己的锁,然后准备释放的时候,被阻塞了,可能是因为JVM的垃圾回收机制FullGC导致了阻塞,导致了线程1 的锁由于超时自动释放
    • 此时线程2获取到了锁,在执行业务代码的过程中,线程1结束了阻塞,此时直接去释放了锁,但是此时释放的锁却是线程2的锁;
    • 现在属于无锁状态,此时线程3获取到了锁,线程2和3就属于并行执行,线程安全问题再次出现。

    问题:判断锁和释放锁是两个操作,并不具有原子性!!!
    image.png

    Lua脚本解决原子性问题

    判断锁+释放锁在特殊情况下依然存在原子性问题,也可以通过Redis的事务+乐观锁机制来实现。
    image.png

    Lua

    Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
    借鉴网站:Lua 基本语法 | 菜鸟教程
    我们可以使用Redis提供的函数进行调用,

    redis.call('命令名称','key','其他参数',...);
    
    • 1

    示例代码:

    redis.call('set','key','value');
    
    • 1

    执行脚本

    EVAL "return redis.call('set','key','value')" 0
    
    • 1

    说明:
    其中双引号中的内容是脚本内容
    0:表示key类型参数的数量,我们可以将value设置为可传入的参数,不写死

    示例:
    不带参数的Lua脚本:
    image.png
    因为有些redis命令是可以一次性设置多个key value的,比如 mset
    如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放到KEYS数组中,其他参数会放到ARGV数组中,在脚本中可以从KEYS和ARGV数组中获取参数:

    lua语言中,数组的角标是从1开始而不是0

    image.png
    执行脚本:
    image.png

    分布式锁的释放锁的Lua脚本

    释放锁业务流程

    1、获取锁中的线程标识
    2、判断是否与指定的标识(当前线程标识)一致
    3、判断如果一致则释放锁(删除)
    4、如果不一致啥也不做

    Lua脚本
    -- 获取锁中线程标识(key传参)
    local key = KEYS[1]
    -- 获取当前线程的标识(其他参数传参)
    local threadId = ARGV[1]
    -- 获取锁中的线程标识
    local id = redis.call('get',key)
    -- 比较线程中标识和锁中的标识是否一致
    if (threadId == id) then
    	-- 释放锁
    	return redis.call('del',key)
    end
    return 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    简化写法:

    -- 比较线程中标识和锁中的标识是否一致
    if (ARGV[1] == redis.call('get',KEYS[1])) then
    	-- 释放锁
    	return redis.call('del',KEYS[1])
    end
    return 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Java语言调用Lua脚本

    image.png
    修改代码:
    修改前
    image.png
    修改后
    image.png

        @Override
        public void unlock() {
            //传入Lua脚本的KEYS数组
            List<String> keys = new ArrayList<>(Arrays.asList(KEY_PRE + name));
            //传入Lua的其他参数
            String arg = ID_PRE + Thread.currentThread().getId();
            //调用Lua脚本
            stringRedisTemplate.execute(UNLOCK_SCRIPT, keys, arg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    总结

    image.png

    基于Redis的分布式锁优化

    当前的分布式锁仍然存在一些问题

    存在的问题

    image.png

    不可重入

    同一个线程无法多次获取同一把锁。
    当线程1拿到锁,A方法调用B方法时,A方法需要锁,B方法也需要锁,但是,A在调B时,锁还没有释放,还在A手里,B就迟迟拿不到锁,A也无法释放锁,此时就会出现死锁

    不可重试

    获取锁,只重试一次,只要没获取到,立即返回false,没有进行重试

    超时释放

    如果锁超时时间过短,业务还没执行完,锁就被释放了,也会有问题。
    如果锁超时时间过长,一但出了问题,需要很长一段时间才能自动释放锁。

    主从一致性

    主节点和从节点之间存在延迟,极端情况下,如果锁通过set写入到主节点,但是主节点还没来得及同步到从节点,这个时候主节点就宕机了,从节点里是没有这个锁的标识的。
    此时,重新选举的主节点,是没有锁的,这个时候其他线程就会获取到锁。

    如果你是用的单节点,其实也不用去理会这个问题。

    以上这些问题,要实现起来其实很麻烦,我们可以通过现有的工具来进行实现。

    Redisson

    image.png

    快速入门

    引入依赖
            
            <dependency>
                <groupId>org.redissongroupId>
                <artifactId>redissonartifactId>
                <version>3.13.6version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    配置Redisson客户端

    官方有提供来Redisson的SpringBoot的stater,但是会替代Spring官方提供的配置和实现,不建议使用,建议自己去配置。

    import org.redisson.Redisson;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.redisson.config.SingleServerConfig;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author zhangzengxiu
     * @date 2023/10/10
     */
    @Configuration
    public class RedisConfig {
    
        /**
         * redis的主机
         */
        @Value("${spring.redis.host}")
        private String redisHost;
    
        /**
         * redis的端口
         */
        @Value("${spring.redis.port}")
        private String redisPort;
    
        /**
         * redis的密码
         */
        @Value("${spring.redis.password}")
        private String redisPassword;
    
        /**
         * redis协议
         */
        public static final String REDIS_PRE = "redis://";
    
        @Bean
        public RedissonClient getRedissonClient() {
            //配置类
            Config config = new Config();
            //配置单节点的Redis
            SingleServerConfig ssc = config.useSingleServer();
            //配置集群 需要配置多个Redis地址
            //SingleServerConfig ssc = config.useClusterServers();
            ssc.setAddress(REDIS_PRE + redisHost + ":" + redisPort);
            ssc.setPassword(redisPassword);
            //创建客户端
            return Redisson.create(config);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    使用分布式锁
    	@Autowired
        private RedissonClient redissonClient;
    
    	@Test
        public void testRedissonLock() throws InterruptedException {
            //获取锁(可重入)
            RLock lock = redissonClient.getLock("orderLock");
            /**
             * 尝试获取锁
             * 无参:失败直接返回
             * 有参:
             * 1:获取锁的最大等待时间,在此期间,获取锁失败了就会等待一段时间再去重试,超过这个最大等待时间才会返回false
             * 10:自动释放的时间,服务出现宕机的情况下,自动释放的时间
             * TimeUnit.SECONDS:时间单位
             */
            boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
            if (isLock) {
                try {
                    System.out.println("");
                } finally {
                    lock.unlock();
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    修改之前的业务代码
    image.png

    Redisson可重入锁原理

    可重入
    流程

    image.png
    不可重入原因
    image.png
    当method1调用method2执行时,需要再次执行setnx,但是setnx是互斥的,所以无法再次获取这把锁。
    我们可以参考JDK提供的ReentrantLock�来实现锁的可重入,在获取锁的同时去判断是否是当前线程,每次获取锁就进行+1操作,释放锁就-1。所以使用redis的string类型就不满足要求了。
    我们可以通过hash结构来实现:
    string类型可以通过set nx ex这样的命令来实现,但是hash并没有这样的组合命令,只能将命令拆开来实现。
    image.png

    获取锁Lua脚本

    image.png

    释放锁Lua脚本

    image.png
    查看Redisson获取锁的源码:
    Lua脚本是通过字符串的形式来直接写死的。
    image.png
    释放锁
    image.png

    可重试

    源码
    image.png

    image.png

    image.pngtime就是:设置的超时时间-前面第一次获取锁消耗的时间所得到的剩余时间
    重试等待:利用了信号量+消息订阅机制
    不是while(true)无休止的等待,是等每次订阅到之后才进行重试。
    image.png
    至此,重试问题已经解决了。

    超时释放

    获取锁成功了,但是业务还没执行完,锁到期了,锁被释放了???

    timeout超时任务进行自动续约,每过一段时间就重置时间,一直执行

    image.png
    新的任务没有更新有效期的任务,所以需要调用renewExpiration方法,旧的任务已经有了这个刷新有效期的任务,就不需要再调用一次了。
    image.png
    image.png
    image.png
    锁释放的时间?是在unlock的时候才释放锁
    image.png

    总结

    image.png
    image.png

    主从一致性问题

    获取到锁之后,主节点宕机
    image.png
    重新选举出来的新的主节点,出现数据丢失,锁失效
    image.png

    解决方案

    联合节点(最少3个节点
    简单粗暴,那就不要主从节点,每个节点都获取锁成功,才算成功!
    image.png
    如果后期其中一个节点宕机了,他自己的从节点数据丢失,那么此时并不是所有的节点都持有这把锁。
    因为只有每一个节点都拿到锁,才算获取锁成功。
    只要有1个节点是存活的状态,那么就不会有其他线程拿到锁,就不会有锁失效的问题。
    image.png
    我们可以单独使用几个节点,但是不建立主从关系就可以。
    3个独立节点配置方式:
    image.png
    image.png
    源码:
    image.png

    image.png

    最终总结

    image.png

  • 相关阅读:
    vue滚动到页首
    【错误:No package snapd available.】在 CentOS 上启用 snap 并安装 snapd
    MongoDB索引与查询优化
    Rust所有权(非常重要)
    变量的一般命名原则
    【java期末复习题】第5章 继承与多态
    音频分词逻辑完善
    添加过的PDF注释可以修改吗?怎么修改PDF注释?
    微信小程序代驾系统源码(含未编译前端,二开无忧) v2.5
    【滤波跟踪】基于Huber函数和最大相关熵的抗差滤波算法实现GNSS导航定位粗差处理附matlab代码
  • 原文地址:https://blog.csdn.net/zhangzengxiu/article/details/133781803