• springboot引入redisson分布式锁及原理


    1.引入依赖

    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.13.6</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.配置类创建bean

    /**
     * @author qujingye
     * @Classname RedissonConfig
     * @Description TODO
     * @Date 2023/11/16 16:27
     */
    @Configuration
    public class RedissonConfig {
    
        @Value("${spring.redis.host}")
        private String host;
    
        @Value("${spring.redis.port}")
        private String port;
    
        @Value("${spring.redis.password}")
        private String redisPassword;
    
    
    
        @Bean
        public RedissonClient redissonClient(){
            Config config = new Config();
            //单机模式  依次设置redis地址和密码
            config.useSingleServer().
                    setAddress("redis://" + host + ":" + port).
                    setPassword(redisPassword);
            return Redisson.create(config);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    3.yml配置

    redis:
      database: 0
      host: localhost
      lettuce:
          pool:
            max-active: 8   #最大连接数据库连接数,设 0 为没有限制
            max-idle: 8     #最大等待连接中的数量,设 0 为没有限制
            max-wait: -1ms  #最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制。
            min-idle: 0     #最小等待连接中的数量,设 0 为没有限制
          shutdown-timeout: 100ms
        password: 123456
        port: 6379
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4.使用

    @Resource
    private RedissonClient redissonClient;
    
    // 创建锁对象
    RLock redisLock = redissonClient.getLock("lock:xxxxx");
    // 尝试获取锁
    boolean isLock = redisLock.tryLock();
    // 判断
    if (!isLock) {
        // 获取锁失败,直接返回失败
        throw new CommonException(-1, "监测到文件" + originalFilename + ",正在导入请稍后在试!");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    @Resource
    private RedissonClient redissonClient;
    @Test
    void testRedisson() throws InterruptedException {
        // 获取锁(可重入),指定锁的名称
        RLock lock = redissonClient.getLock("anyLock"); 
        // 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
        boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
        // 判断释放获取成功
        if(isLock){
            try {
                System.out.println("执行业务");
            }finally {
                // 释放锁
                lock.unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    5.api方法简介

     //1. 普通的可重入锁
        RLock lock = redissonClient.getLock("generalLock");
    
        // 拿锁失败时会不停的重试
        // 具有Watch Dog 自动延期机制 默认续30s 每隔30/3=10 秒续到30s
        lock.lock();
    
        // 尝试拿锁10s后停止重试,返回false
        // 具有Watch Dog 自动延期机制 默认续30s
        boolean res1 = lock.tryLock(10, TimeUnit.SECONDS);
    
        // 拿锁失败时会不停的重试
        // 没有Watch Dog ,10s后自动释放
        lock.lock(10, TimeUnit.SECONDS);
    
        // 尝试拿锁100s后停止重试,返回false
        // 没有Watch Dog ,10s后自动释放
        boolean res2 = lock.tryLock(100, 10, TimeUnit.SECONDS);
    
        //2. 公平锁 保证 Redisson 客户端线程将以其请求的顺序获得锁
        RLock fairLock = redissonClient.getFairLock("fairLock");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    6.redisson原理

    首先分布式锁要考虑
    分布式锁需要考虑
    互斥性 setnx
    防死锁(过期,锁续命)
    可重复性
    高性能
    而Redisson满足 实现了 锁续命 锁错删 可重入
    [图片]

    "if (redis.call('exists', KEYS[1]) == 0) then " +
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
            "end; " +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
            "end; " +
            "return redis.call('pttl', KEYS[1]);",
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    客户端线程在底层是如何实现加锁的:
    第一步,redisson.getLock(“mylock”);可以获得一个 Redisson 的分布式锁对象,可以使用该对象进行加锁、释放锁、判断锁状态等操作。
    第二步加锁,加锁的底层逻辑是通过lua脚本实现的,如果客户端线程第一次去加锁的话,
    执行 ‘hincrby’ 会在key对应的hash数据结构当中,添加线程标识。HashKey 是UUID:TreadID value

    1. 来指定该线程当前对这个key已经加锁一次了,并且设置锁的过期时间为30秒
    2. 客户端线程是如何维持加锁的,当加锁成功后,此时会对加锁的结果设置一个监听器,如果监听到加锁成功了,也就是返回的结果时null,此时就会在后台通过watchdog 看门狗机制,
      启动一个后台的定时任务,每隔10秒执行一次,检查当前key依旧存在,就会重置key的存活时间为30秒,维持加锁,底层就是通过后台这样一个,线程定时刷新存活时间维系的(renewExpiration自己掉自己)
    3. 相同的客户端线程是如何实现可重入加锁的
      第一次加锁时 会往key对应的hash数据结构中 设置 UUID:ThreadID 1 表示当前线程对key 加锁一次
      如果相同线程再来对这个key加锁,只需要将UUID:ThreadID 持有锁的次数加1 即可 就为
      UUID:ThreadID 2 了,redisson底层就是通过这样的数据结构 来表示重入锁的
    4. 其他线程加锁失败时,底层是如何实现阻塞的
      通过key对应的hash结构当中的UUID:ThreadID 判断是否为当前线程id 如果不是则线程加锁失败
      如果没有获取锁的超时时间 此时就会进入一个 while 的死循环中 一直尝试加锁 直到加锁成功才会返回
    5. 客户端宕机了 锁如何释放的
      客户端宕机后 相应的watchdog 后台定时任务 当然已经没了 此时就无法对key 进行定时续期 那么当指定存活时间过后 key就会自动失效 锁就当然自动释放了
    6. 客户端如何主动释放持有的锁
    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
            "else " +
            "redis.call('del', KEYS[1]); " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
            "end; " +
            "return nil;",
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    客户端主动释放底层,也是通过执行lua脚本的方式来实现的
    如果判断当前释放的key存在,并且在key的hash结构当中,存在当前线程的加锁信息,那么此时就会减扣当前线程,对这个key的重入锁次数,减扣线程的重入锁次数之后,如果当前线程在这个key的重入次数为0,此时就会直接释放锁,如果当前线程,在这个key中重入锁次数依然大于0。此时就直接重置一下,key的续期时间为30秒
    7. 客户端尝试获取锁超时时间的机制底层是如何实现的

    boolean isLock2 = redisLock.tryLock(1,TimeUnit.MINUTES);
    
    • 1

    如果在加锁时就指定 尝试获取锁的超时时间 如果获取锁失败 此时就不会永无止境的在while循环里面一直等待 ,而是根据你指定的锁超时时间,在这段时间范围获取不到锁,那么就会标记为获取锁失败,直接返回false
    8. 客户端锁超时自动释放锁机制底层是怎么实现的

    redisLock.lock(1, TimeUnit.MINUTES);
    
    • 1

    如果在加锁的时候指定了锁的超时时间,那么就算你获取锁成功了,也不会开启watch dog的定时任务,此时就将当前持有的这把锁的过期时间,设置为你指定的超时时间,那么你指定的时间到了之后,key失效被删除了,key对应的锁相应的也就自动释放了

  • 相关阅读:
    Python实操案例五
    学习6大步
    ScrollView 源码注解
    UDP的可靠性传输
    抖音账号矩阵系统开发源码
    【超万卡GPU集群关键技术深度分析 2024】_构建10万卡gpu集群的技术挑战
    deepstream·在python中安装pyds包
    【JavaEE】网络编程---TCP数据报套接字编程
    hive和hbase的使用问题
    shiro授权
  • 原文地址:https://blog.csdn.net/QJY1437758743/article/details/134466337