• Redis分布式锁


    Redis分布式锁

    1.问题描述

    ​ 随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

    分布式锁主流的实现方案:

    1. 基于数据库实现分布式锁

    2. 基于缓存(Redis等)

    3. 基于Zookeeper

    每一种分布式锁解决方案都有各自的优缺点:

    1. 性能:redis最高

    2. 可靠性:zookeeper最高

    这里,我们就基于redis实现分布式锁。

    2.安装工具 ab 模拟测试

    1. 说明: 工具 ab 可以模拟并发发出 Http 请求, (模拟并发 http 请求工具还有jemeter, postman)
    2. 安装指令: yum install httpd-tools (提示: 保证当前 linux 是可以联网的)
    3. 如果你不能联网, 可以使用 rpm 安装

    ab并发工具的使用

    ab -n 1000 -c 100 -p ~/postfile -T application/x-www-form-urlencoded
    http://192.168.79.1:8080/seckill/secKillServlet
    
    • 1
    • 2

    指令说明

    (1) ab 是并发工具程序
    (2) -n 1000 表示一共发出 1000 次 http 请求
    (3) -c 100 表示并发时 100 次, 你可以理解 1000 次请求, 会在 10 次发送完毕
    (4) -p ~/postfile 表示发送请求时, 携带的参数从当前目录的 postfile 文件读取 (这个你
    事先要准备好)
    (5) -T application/x-www-form-urlencoded 就是发送数据的编码是 基于表单的 url 编
    码
    (6) ~的含义: https://blog.csdn.net/m0_67401134/article/details/123973115
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.解决方案:redis实现分布式锁

    1.思路分析

    setnx:只有建不存在时,才对建进行设置操作。设置成功返回true设置失败返回false

    image-20220823073830682

    1.当多个客户端同时请求时,都会先去获取锁 setnx

    2.获取成功,则执行业务逻辑,执行完成之后删除锁(del(“lock”))已完成锁的释放。

    3.其他客户端等待重试,比如线程等待几秒后再次尝试获取锁

    2.代码实现

    1.创建SpringBoot工程,引入相关依赖

    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0modelVersion>
    
    	<groupId>com.llpgroupId>
    	<artifactId>Springboot-redisartifactId>
    	<version>1.0-SNAPSHOTversion>
    	
    	<parent>
    		<artifactId>spring-boot-starter-parentartifactId>
    		<groupId>org.springframework.bootgroupId>
    		<version>2.5.3version>
    	parent>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.bootgroupId>
    			<artifactId>spring-boot-starter-webartifactId>
    		dependency>
    		
    		<dependency>
    			<groupId>org.springframework.bootgroupId>
    			<artifactId>spring-boot-starter-data-redisartifactId>
    		dependency>
    		
    		<dependency>
    			<groupId>org.apache.commonsgroupId>
    			<artifactId>commons-pool2artifactId>
    		dependency>
    	dependencies>
    
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    2.application.yaml

    • redis集群配置,redis集群是不支持lua脚本的
    spring:
      application:
        name: spring-boot-redis
      redis:
      #redis集群配置,这里是在一台主机上模拟
        cluster:
          nodes:
              - 192.168.79.201:6379
              - 192.168.79.201:6380
              - 192.168.79.201:6381
              - 192.168.79.201:6390
              - 192.168.79.201:6391
              - 192.168.79.201:6389
        connect-timeout: 6000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • redis单机配置
    spring:
      redis:
        host: 192.168.79.201
        port: 6379
        #Redis 数据库索引(默认为 0)
        database: 0
        #连接超时时间(毫秒)
        timeout: 1800000
        lettuce:
          pool:
            #连接池最大连接数(使用负值表示没有限制)
            max-active: 20
            #最大阻塞等待时间(负数表示没限制)
            max-wait: -1
            #连接池中的最大空闲连接
            min-idle: 0
            #密码
        #password: foobared
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    3.redis序列化配置

    @EnableCaching
    @Configuration
    public class RedisConfig {
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            RedisSerializer<String> redisSerializer = new StringRedisSerializer();
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            template.setConnectionFactory(factory);
            //key序列化方式
            template.setKeySerializer(redisSerializer);
            //value序列化
            template.setValueSerializer(jackson2JsonRedisSerializer);
            //value hashmap序列化
            template.setHashValueSerializer(jackson2JsonRedisSerializer);
            return template;
        }
    
        @Bean
        public CacheManager cacheManager(RedisConnectionFactory factory) {
            RedisSerializer<String> redisSerializer = new StringRedisSerializer();
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            //解决查询缓存转换异常的问题
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            // 配置序列化(解决乱码的问题),过期时间600秒
            RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                    .entryTtl(Duration.ofSeconds(600))
                    .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                    .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                    .disableCachingNullValues();
            RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                    .cacheDefaults(config)
                    .build();
            return cacheManager;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    4.创建测试类,实现分布式锁

    @RestController
    public class RedisTestController {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @GetMapping("/testLock")
        public void testLock(){
            //1获取锁,setnx 如果key存在值返回false设置值失败,如果key不存在则设置值成功返回true
            Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "ok");
            //2获取锁成功、查询num的值
            if(lock){
    
                /**********业务逻辑start**********/
                //从缓存中获取num值
                Object value = redisTemplate.opsForValue().get("num");
                //如果值为空则直接返回
                if(ObjectUtils.isEmpty(value)){
                    return;
                }
                //如果有值则转成int
                int num = Integer.parseInt(value+"");
                //把redis的num加1
                redisTemplate.opsForValue().set("num", ++num);
                /**********业务逻辑end**********/
    
                //释放锁
                redisTemplate.delete("lock");
            }else{
                try {
                    //如果获取锁失败,则等待三秒再次尝试获取锁
                    Thread.sleep(3000);
                    testLock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    使用ab工具进行测试

    ab -n 1000 -c 100 http://192.168.79.1:8080/redisTest/testLock
    
    • 1

    3.存在的问题

    问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放。

    以上面的代码为例,当redis缓存中不存在num时,value为空那么程序就不会取调用 redisTemplate.delete("lock");删除锁,进而导致锁一直得不到释放。当别的请求打进来 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "ok");始终返回false,无法正常执行业务逻辑。

         /**********业务逻辑start**********/
                //从缓存中获取num值
                Object value = redisTemplate.opsForValue().get("num");
                //如果值为空则直接返回
                if(ObjectUtils.isEmpty(value)){
                    return;
                }
                //如果有值则转成int
                int num = Integer.parseInt(value+"");
                //把redis的num加1
                redisTemplate.opsForValue().set("num", ++num);
                /**********业务逻辑end**********/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    解决:设置过期时间,自动释放锁。

    4.优化之设置锁的过期时间

    设置过期时间有两种方式:

    1. 首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

    2. 在setnx时指定过期时间(推荐)具备原子性

    image-20220823083657873

    设置过期时间:

    压力测试肯定也没有问题。自行测试

    **问题:**可能会释放其他服务器的锁。

    **场景:**如果业务逻辑的执行时间是7s。执行流程如下

    1. index1业务逻辑没执行完,3秒后锁被自动释放。

    2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。

    3. index3获取到锁,执行业务逻辑

    4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。

    最终等于没锁的情况。

    **解决:**setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁

    @RestController
    public class RedisTestController {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @GetMapping("/testLock")
        public void testLock(){
            //1获取锁,setnx 如果key存在值返回false设置值失败,如果key不存在则设置值成功返回true 
            //设置过期时间的长短根据业务执行的时间而定
            Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "ok",3, TimeUnit.SECONDS);
            //2获取锁成功、查询num的值
            if(lock){
    
                /**********业务逻辑start**********/
                //从缓存中获取num值
                Object value = redisTemplate.opsForValue().get("num");
                //如果值为空则直接返回
                if(ObjectUtils.isEmpty(value)){
                    return;
                }
                //如果有值则转成int
                int num = Integer.parseInt(value+"");
                //把redis的num加1
                redisTemplate.opsForValue().set("num", ++num);
                /**********业务逻辑end**********/
    
                //释放锁
                redisTemplate.delete("lock");
            }else{
                try {
                    //如果获取锁失败,则等待三秒再次尝试获取锁
                    Thread.sleep(3000);
                    testLock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    存在的问题

    image-20230204133149690

    • 思路分析
    1. 在获取锁的时候, 给锁设置的值是唯一的 uuid
    2. 在释放锁时,判断释放的锁是不是同一把锁.
    3. 造成这个问题的本质原因, 是因为删除操作缺乏原子性

    5.优化之UUID防误删

    image-20220823084416381

    @RestController
    public class RedisTestController {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @GetMapping("/testLock")
        public void testLock(){
            String uuid = UUID.randomUUID().toString();
            //1获取锁,setnx 如果key存在值返回false设置值失败,如果key不存在则设置值成功返回true
            //设置过期时间的长短根据业务执行的时间而定
            Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);
            //2获取锁成功、查询num的值
            if(lock){
    
                /**********业务逻辑start**********/
                //从缓存中获取num值
                Object value = redisTemplate.opsForValue().get("num");
                //如果值为空则直接返回
                if(ObjectUtils.isEmpty(value)){
                    return;
                }
                //如果有值则转成int
                int num = Integer.parseInt(value+"");
                //把redis的num加1
                redisTemplate.opsForValue().set("num", ++num);
                /**********业务逻辑end**********/
                //释放各自的锁
                if(uuid.equals((String) redisTemplate.opsForValue().get("lock"))){
    				this.redisTemplate.delete("lock")
                }
            }else{
                try {
                    //如果获取锁失败,则等待三秒再次尝试获取锁
                    Thread.sleep(3000);
                    testLock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    问题:删除操作缺乏原子性。

    image-20230204134438437

    1. 删除操作缺乏原子性
    2. 使用 Lua 脚本保证删除原子性

    6.优化之LUA脚本保证删除的原子性

        //编写方法,使用Redis分布式锁,完成对 key为num的+1操作
        @GetMapping("/testLock")
        public void testLock() {
    
            //得到一个uuid值,作为锁的值
            String uuid = UUID.randomUUID().toString();
    
            //1. 获取锁/设置锁 key->lock : setnx
            Boolean lock =
                    redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
            if (lock) {//true, 说明获取锁/设置锁成功
                //这个key为num的数据,事先要在Redis初始化
                Object value = redisTemplate.opsForValue().get("num");
                //1.判断返回的value是否有值
                if (value == null || !StringUtils.hasText(value.toString())) {
                    return;
                }
                //2.有值,就将其转成int
                int num = Integer.parseInt(value.toString());
                //3.将num+1,再重新设置回去
                redisTemplate.opsForValue().set("num", ++num);
                //释放锁-lock
    
    
                //为了防止误删除其它用户的锁,先判断当前的锁是不是前面获取到的锁,如果相同,再释放
    
                //=====使用lua脚本, 控制删除原子性========
                // 定义lua 脚本
                String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                // 使用redis执行lua执行
                DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
                redisScript.setScriptText(script);
                // 设置一下返回值类型 为Long
                // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
                // 那么返回字符串与0 会有发生错误。
                redisScript.setResultType(Long.class);
                // 第一个是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值
                // Arrays.asList("lock") 会传递给 script 的 KEYS[1] , uuid 会传递给ARGV[1]
                redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
    
    
                //if (uuid.equals((String) redisTemplate.opsForValue().get("lock"))) {
                //    //...
                //    redisTemplate.delete("lock");
                //}
    
                //redisTemplate.delete("lock");
    
            } else { //获取锁失败,休眠100毫秒,再重新获取锁/设置锁
    
                try {
                    Thread.sleep(100);
                    testLock();//重新执行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    7.注意事项和细节

    1、定义锁的 key, key 可以根据业务, 分别设置,比如操作某商品, key 应该是为每个 sku 定
    义的,也就是每个 sku 有一把锁
    2、为了确保分布式锁可用,要确保锁的实现同时满足以下四个条件:

    • 互斥性。在任意时刻,只有一个客户端能持有锁。
    • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续
      其他客户端能加锁。
    • 加锁和解锁必须是同一个客户端,A 客户端不能把 B 客户端加的锁给解了
    • 加锁和解锁必须具有原子性
  • 相关阅读:
    Java中的多线程
    云渲染的“公”“私”技术!
    Vue-router创建子路由的方法【路由嵌套】
    无损以太网的ROCE革命,队列的缓存空间优化分析
    Linux常用的一些shell脚本操作记录
    【Spring传播机制底层原理】
    技术分享 | 如何写好测试用例?
    12.面试题——Spring Boot
    知识图谱从入门到应用——知识图谱的发展
    selenium打开火狐浏览器
  • 原文地址:https://blog.csdn.net/qq_44981526/article/details/126477562