• 【Redis】Redis 的学习教程(十一)之使用 Redis 实现分布式锁


    1. 分布式锁概念

    多线程环境下,为了保证数据的线程安全,锁保证同一时刻,只有一个可以访问和更新共享数据。在单机系统我们可以使用 synchronized 锁、Lock 锁保证线程安全。

    synchronized 锁是 Java 提供的一种内置锁,在单个 JVM 进程中提供线程之间的锁定机制,控制多线程并发。只适用于单机环境下的并发控制。

    想要在多个节点中提供锁定,在分布式系统并发控制共享资源,确保同一时刻只有一个访问可以调用,避免多个调用者竞争调用和数据不一致问题,保证数据的一致性,就需要分布式锁

    分布式锁:控制分布式系统不同进程访问共享资源的一种锁的机制。不同进程之间调用需要保持互斥性,任意时刻,只有一个客户端能持有锁。

    共享资源包含:

    • 数据库
    • 文件硬盘
    • 共享内存

    分布式锁特性:

    • 互斥性:锁只能被持有的客户端删除,不能被其他客户端删除
    • 锁超时释放:持有锁超时,可以释放,防止不必要的资源浪费,也可以防止死锁
    • 可重入性:一个线程如果获取了锁之后,可以再次对其请求加锁。
    • 高性能和高可用:加锁和解锁需要开销尽可能低,同时也要保证高可用,避免分布式锁失效
    • 安全性:锁只能被持有的客户端删除,不能被其他客户端删除

    模拟并发环境下单

    ①:添加 Redis 依赖:

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-data-redisartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    ②:添加配置:

    spring:
      redis:
        host: localhost
        port: 6379
        password:
        timeout: 2000s
        # 配置文件中添加 lettuce.pool 相关配置,则会使用到lettuce连接池
        lettuce:
          pool:
            max-active: 8  # 连接池最大连接数(使用负值表示没有限制) 默认为8
            max-wait: -1ms # 接池最大阻塞等待时间(使用负值表示没有限制) 默认为-1ms
            max-idle: 8    # 连接池中的最大空闲连接 默认为8
            min-idle: 0    # 连接池中的最小空闲连接 默认为 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    ③:Redis 配置类:

    @Configuration
    public class RedisConfig {
    
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            template.setConnectionFactory(redisConnectionFactory);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            // json 序列化配置
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            // String 序列化
            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
            // 所有的 key 采用 string 的序列化
            template.setKeySerializer(stringRedisSerializer);
            // 所有的 value 采用 jackson 的序列化
            template.setValueSerializer(jackson2JsonRedisSerializer);
            // hash 的 key 采用 string 的序列化
            template.setHashKeySerializer(stringRedisSerializer);
            // hash 的 value 采用 jackson 的序列化
            template.setHashValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    Redis 工具类:

    @Component
    public class RedisUtil {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        // 普通缓存获取
        public Object get(String key) {
            return key == null ? null : redisTemplate.opsForValue().get(key);
        }
    
        // 普通缓存放入
        public boolean set(String key, Object value) {
            try {
                redisTemplate.opsForValue().set(key, value);
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }
    
        public boolean setIfAbsent(String key, Object value, long timeout, TimeUnit unit) {
            return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit));
        }
    
        public void del(String... key) {
            if (key != null && key.length > 0) {
                if (key.length == 1) {
                    redisTemplate.delete(key[0]);
                } else {
                    //springboot2.4后用法
                    redisTemplate.delete(Arrays.asList(key));
                }
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    ④:添加下单接口

    @Slf4j
    @RestController
    @RequestMapping("/order")
    public class OrderController {
    
        @Autowired
        private RedisUtil redisUtil;
    
        @GetMapping("/initProductStock")
        public void initProductStock() {
            redisUtil.set("stock", 100);
        }
    
        @GetMapping("/create_order")
        public void createOrder() {
            // 获取当前库存
            int stock = (Integer) redisUtil.get("stock");
            if (stock > 0) {
                // 减库存
                int realStock = stock - 1;
                redisUtil.set("stock", realStock);
                // TODO 添加订单记录
                log.info("扣减成功,剩余库存:" + realStock);
                return;
            }
            log.error("扣减失败,库存不足");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    接口说明:

    • /order/initProductStock:先向 Redis 中初始化一个库存
    • /order/create_order:下单接口:先从缓存获取库存,如果库存大于 0,则库存减 1

    ⑤:并发测试

    使用 JMeter 进行并发环境测试,10 个线程,循环 5 次。

    在这里插入图片描述

    ⑥:测试结果,打印日志如下:

    在这里插入图片描述
    使用 JMeter 调用了 50 次接口后,按照正常情况下,库存应该为:50 = 100 - 50。

    但通过日志显示,最终库存为:95。

    这是因为在并发环境下,多个线程下单操作,前面的线程还未更新库存,后面的线程已经请求进来,并获取到了未更新的库存,后续扣减库存都不是扣减最近的库存。线程越多,扣减的库存越少。这就是在高并发场景下发生的超卖问题。

    很明显,上述问题是出现了线程安全的问题,我们首先能想到的肯定是给它加 synchronized 锁。

    是的,没问题,但是我们知道,synchronized 锁是属于JVM 级别的,也就是我们所谓的“单机锁”,如果是多机部署的环境中,还能保证数据的一致性吗?

    答案肯定是不能的。这个时候,就需要用到了我们 Redis 分布式锁

    用 Redis 实现分布式锁的几种方案,都是用 SETNX 命令(设置 key 等于某 value)。只是高阶方案传的参数个数不一样,以及考虑了异常情况

    SETNX 是SET IF NOT EXISTS的简写。命令格式:SETNX key value,如果 key不存在,则 SETNX 成功返回 1,如果这个 key 已经存在了,则返回 0

    setIfAbsent() 是 setnx + expire 的合并命令

    2. Redis分布式锁方案一:SETNX + EXPIRE

    问题:为什么要加过期时间

    如果在释放锁之前 Redis 宕机了,就会造成一直死锁。

    setnx 命令 和 expire 命令一定要是原子操作。

    伪代码如下:

    if(jedis.setnx(key_resource_id,lock_value) == 1{ //加锁
        expire(key_resource_id,100; //设置过期时间
        try {
            do something  //业务请求
        }catch(){
      }
      finally {
           jedis.del(key_resource_id); //释放锁
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    setnxexpire 两个命令分开了,「不是原子操作」。如果执行完 setnx 加锁,正要执行 expire 设置过期时间时,进程 crash 或者要重启维护了,那么这个锁就“长生不老”了,「别的线程永远获取不到锁啦」

    @GetMapping("/create_order")
    public void createOrder() {
        String key = "lock_key";
        // 1.加锁
        boolean lock = tryLock(key, 1, 60L, TimeUnit.SECONDS);
        if (lock) {
            try {
                // 获取当前库存
                int stock = (Integer) redisUtil.get("stock");
                if (stock > 0) {
                    // 减库存
                    int realStock = stock - 1;
                    redisUtil.set("stock", realStock);
                    // TODO 添加订单记录
                    log.info("扣减成功,剩余库存:" + realStock);
                    return;
                }
                log.error("扣减失败,库存不足");
            } catch (Exception e) {
                log.error("扣减库存失败");
            } finally {
                // 3.解锁
                unlock(key);
            }
        } else {
            log.info("未获取到锁...");
        }
    }
    
    public boolean tryLock(String key, Object value, long timeout, TimeUnit unit) {
        return redisUtil.setIfAbsent(key, value, timeout, unit);
    }
    
    public void unlock(String key) {
        redisUtil.del(key);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    使用 JMeter 运行后,结果如下:

    在这里插入图片描述

    获取到锁的线程已成功扣除库存,没有获取到锁的线程只打印日志。

    3. Redis分布式锁方案二:SETNX + EXPIRE + 校验唯一随机值

    方案一还是有一定的缺陷的:假设线程 A 获取锁成功,一直在执行业务逻辑,但是 60s 过去了,还没执行完。但是,此时,锁已经过期了。线程 B 又请求过来了,显然,线程 B 也可以获取锁成功,也开始执行业务逻辑代码。那么问题就来了:在线程 B 执行过程中,线程 A 已经执行完了,就会把线程 B 的锁给释放掉!

    既然锁可能被别的线程误删,那我们给 value 值设置一个标记当前线程唯一的随机数,在删除的时候,校验一下,就OK了

    @GetMapping("/create_order")
    public void createOrder() {
        String key = "lock_key";
        String value = "ID_PREFIX" + Thread.currentThread().getId();
        // 1.加锁
        boolean lock = tryLock(key, value, 60L, TimeUnit.SECONDS);
        if (lock) {
            // ...
    }
    
    public void unlock(String key, String value) {
        String currentValue = (String)redisUtil.get(key);
        if (StringUtils.hasText(currentValue) && currentValue.equals(value)) {
            redisUtil.del(key);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    这里需要注意的是:释放锁时,先 get 再删除,这并不是原子操作,无法保证进程安全。为了更严谨,这里用 lua 脚本代替

    lua 脚本

    Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过 redis 的 eval/evalsha 命令来运行,把操作封装成一个 Lua 脚本,如论如何都是一次执行的原子操作

    lockDel.lua如下:resources/lua/lockDel.lua

    if redis.call('get', KEYS[1]) == ARGV[1]
        then
      -- 执行删除操作
            return redis.call('del', KEYS[1])
        else
      -- 不成功,返回0
            return 0
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    public void unlock(String key, String value) {
        // 解锁脚本
        DefaultRedisScript<Long> unlockScript = new DefaultRedisScript<>();
        unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/lockDel.lua")));
        unlockScript.setResultType(Long.class);
        // 执行lua脚本解锁
        Long execute = redisTemplate.execute(unlockScript, Collections.singletonList(key), value);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    或者:

    public void unlock(String key, String value) {
        // 解锁脚本
        String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
        redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Collections.singletonList(key), value);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    4. Redis分布式锁方案三:Redisson

    方案二还存在问题:「锁过期释放,业务没执行完」。 如果设置的超时时间比较短,而业务执行的时间比较长。比如超时时间设置5s,而业务执行需要10s,此时业务还未执行完,其他请求就会获取到锁,两个请求同时请求业务数据,不满足分布式锁的互斥性,无法保证线程的安全

    4.1 Redisson 概念

    其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放。当前开源框架 Redisson 解决了这个问题。Redisson 底层原理图如下:

    在这里插入图片描述

    只要线程一加锁成功,就会启动一个 watch dog 看门狗,它是一个后台线程,会每隔 10 秒检查一下,如果线程 1 还持有锁,那么就会不断的延长锁 key 的生存时间。因此,Redisson 就是使用 watch dog 解决了「锁过期释放,业务没执行完」问题

    Redis 虽然作为分布式锁来说,性能是最好的。但是也是最复杂的。上面总结 Redis 主要有下面几个问题:

    • 未设置过期时间,会死锁
    • 设置过期时间
      • 锁误删
      • 业务还继续执行,导致多个线程并发执行

    线上都是用 Redission 实现分布式锁,Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。Redisson 是基于 netty 通信框架实现的,所以支持非阻塞通信,性能优于 Jedis。

    Redisson 分布式锁四层保护:

    • 防死锁
    • 防误删
    • 可重入(一个线程可以在获取锁之后再次获取同一个锁,而不需要等待锁释放)
    • 自动续期

    Redisson 实现 Redis 分布式锁,支持单机和集群模式

    4.2 Redisson 实现

    Redisson 文档目录:Redisson 文档目录

    使用 Redission 分布式锁,分成三个步骤:

    1. 获取锁 redissonClient.getLock("lock")
    2. 加锁 rLock.lock()
    3. 解锁 rLock.unlock()

    引入依赖:

    <dependency>
        <groupId>org.redissongroupId>
        <artifactId>redissonartifactId>
        <version>3.20.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Redisson 配置类:

    @Configuration
    public class RedissionConfig {
    
        @Value("${spring.redis.host}")
        private String redisHost;
    
        @Value("${spring.redis.password}")
        private String password;
    
        private int port = 6379;
    
        @Bean
        public RedissonClient getRedisson() {
            Config config = new Config();
            // 单机版
            config.useSingleServer()
                    .setAddress("redis://" + redisHost + ":" + port);
            //.setPassword(password);
            config.setCodec(new JsonJacksonCodec());
            return Redisson.create(config);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    集群版:

    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        config.useClusterServers()
                .setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒
                //可以用"rediss://"来启用SSL连接
                .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
                .addNodeAddress("redis://127.0.0.1:7002");
        return Redisson.create(config);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    下单接口:

    @Slf4j
    @RestController
    @RequestMapping("/order")
    public class OrderController {
    
        @Autowired
        private RedissonClient redissonClient;
    
        @GetMapping("/create_order")
        public void createOrder() {
            String key = "lock_key";
            RLock rLock = redissonClient.getLock(key);
            // 1.加锁
            rLock.lock();
            try {
                // 获取当前库存
                int stock = (Integer) redisUtil.get("stock");
                if (stock > 0) {
                    // 减库存
                    int realStock = stock - 1;
                    redisUtil.set("stock", realStock);
                    // TODO 添加订单记录
                    log.info("扣减成功,剩余库存:" + realStock);
                    return;
                }
                log.error("扣减失败,库存不足");
            } catch (Exception e) {
                log.error("扣减库存失败");
            } finally {
                // 3.解锁
                rLock.unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    使用 JMeter 并发运行后:

    在这里插入图片描述

    Redission 实现的分布式锁,直接调用,不需要锁异常、超时并发、锁删除等问题,它把处理上面的问题的代码都封装好了,直接调用即可

    4.3 Redisson 原理

    RLock 是 Redisson 分布式锁的最核心接口,继承了concurrent包的 Lock 接口和自己的 RLockAsync 接口,RLockAsync 的返回值都是 RFuture,是 Redisson 执行异步实现的核心逻辑,也是 Netty 发挥的主要阵地

    4.3.1 RLock 如何加锁?

    rLock.lock(); 方法最后调用重载方法:void lock(long leaseTime, TimeUnit unit, boolean interruptibly);

    在这里插入图片描述

    再调用 tryAcquire() 方法,返回 ttl 过期时间

    在这里插入图片描述

    tryAcquireAsync() 方法:

    在这里插入图片描述

    根据 leaseTime 时间判断的2个分支,实际上就是加锁时是否设置过期时间;未设置过期时间(-1)时则会有 watchDog 的锁续约,一个注册了加锁事件的续约任务

    tryLockInnerAsync() 方法:执行 lua 脚本

    在这里插入图片描述

    lua 脚本内容:

    -- 不存在该 key 时或者存在该 key 并且 hash 中线程id的key也存在
    if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then 
      -- 若不存在key,则是新增该锁并且hash中该线程id对应的count置1
      -- 若存在key,则是线程重入次数++
      redis.call('hincrby', KEYS[1], ARGV[2], 1); 
      -- 设置过期时间
      redis.call('pexpire', KEYS[1], ARGV[1]); 
      return nil; 
    end; 
    -- 返回过期时间
    return redis.call('pttl', KEYS[1]);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    加锁逻辑:

    1. 判断该锁是否已经有对应hash表存在
      • 没有对应的hash表:则 set 该 hash 表中一个 key 为 lock_key,item 为线程 id,value为 1,之后设置该 hash 表失效时间为 leaseTime
      • 存在对应的hash表:则将该lockName的value执行+1操作,也就是计算进入次数,再设置失效时间leaseTime
    2. 最后返回这把锁的 ttl 剩余时间
    4.3.1.1 锁续约 ------ watchDog

    在这里插入图片描述

    这块代码的表面意义:在执行异步加锁的操作后,加锁成功则根据加锁完成返回的 ttl 是否过期来确认是否执行一段定时任务

    这段定时任务的就是 watchDog 的核心。

    void scheduleExpirationRenewal(long threadId) 方法:

    在这里插入图片描述

    renewExpiration() 方法:

    在这里插入图片描述

    这段连续嵌套且冗长的代码实际上做了几步

    1. 添加一个netty的Timeout回调任务,每(internalLockLeaseTime / 3)毫秒执行一次,执行的方法是 renewExpirationAsync()
    2. renewExpirationAsync() 方法重置了锁超时时间,又注册一个监听器,监听回调又执行了 renewExpiration() 方法

    renewExpirationAsync() 方法的 lua 如下:重新设置了超时时间

    if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
      redis.call('pexpire', KEYS[1], ARGV[1]); 
      return 1; 
    end; 
    return 0;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Redisson 加这段逻辑的目的是什么?

    目的是为了某种场景下保证业务不影响,如任务执行超时但未结束,锁已经释放的问题

    当一个线程持有了一把锁,由于并未设置超时时间 leaseTime,Redisson 默认配置了 30S,开启 watchDog,每 10S 对该锁进行一次续约,维持 30S 的超时时间,直到任务完成再删除锁。

    这就是 Redisson 的锁续约,也就是 WatchDog 实现的基本思路

    4.3.2 解锁

    unlockInnerAsync() 方法:

    在这里插入图片描述

    lua 脚本:

    -- 不存在key
    if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
    return nil;
    end; 
    -- 计数器 -1
    local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
    if (counter > 0) then 
    	-- 过期时间重设
      redis.call('pexpire', KEYS[1], ARGV[2]); 
      return 0; 
    else 
      -- 删除并发布解锁消息
      redis.call('del', KEYS[1]); 
      redis.call('publish', KEYS[2], ARGV[1]); 
      return 1; 
    end; 
    return nil;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    步骤如下:

    1. 如果该锁不存在则返回nil;
    2. 如果该锁存在则将其线程的hash key计数器-1,
    3. 计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;

    其中 unLock() 的时候使用到了 Redis 发布订阅 PubSub 完成消息通知

    而订阅的步骤就在 RedissonLock 的加锁入口的 lock() 方法里:

    在这里插入图片描述

    当锁被其他线程占用时,通过监听锁的释放通知(在其他线程通过RedissonLock释放锁时,会通过发布订阅pub/sub功能发起通知),等待锁被其他线程释放,也是为了避免自旋的一种常用效率手段

    4.3.2.1 解锁通知消息

    LockPubSub 类,这里只有一个明显的监听方法 onMessage() ,其订阅和信号量的释放都在父类 PublishSubscribe,我们只关注监听事件的实际操作

    在这里插入图片描述

    发现一个是默认解锁消息,一个是读锁解锁消息,因为redisson是有提供读写锁的,而读写锁读读情况和读写、写写情况互斥情况不同,我们只看上面的默认解锁消息 unlockMessage 分支

    LockPubSub监听最终执行了2件事

    • runnableToExecute.run() 执行监听回调
    • value.getLatch().release(); 释放信号量

    Redisson通过 LockPubSub 监听解锁消息,执行监听回调和释放信号量通知等待线程可以重新抢锁

  • 相关阅读:
    集货运输优化:数学建模步骤,Python实现蚁群算法(解决最短路径问题), 蚁群算法解决旅行商问题(最优路径问题),节约里程算法
    分库分表之sharding-proxy
    牛客P15428 约数个数定理,暴力枚举,贪心剪枝
    亚马逊关键词搜索API接口(item_search-按关键字搜索亚马逊商品接口),亚马逊API接口
    物联网中的智能网关
    面试经典150题
    关于 eBPF 安全可观测性,你需要知道的那些事儿
    QSS的应用
    微服务架构中间件安装部署
    计算机毕业设计Java高等数学试卷系统(源码+系统+mysql数据库+lw文档)
  • 原文地址:https://blog.csdn.net/sco5282/article/details/132988178