• 使用 Redis 实现分布式锁案例


    1.整合redis到工程

    
    <dependency>
      <groupId>org.springframework.bootgroupId>
      <artifactId>spring-boot-starter-data-redisartifactId>
    dependency>
    
    
    <dependency>
      <groupId>org.apache.commonsgroupId>
      <artifactId>commons-pool2artifactId>
      <version>2.6.0version>
    dependency>
    
    
    <dependency>
      <groupId>org.redissongroupId>
      <artifactId>redissonartifactId>
      <version>3.11.2version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    server:
      port: 8206
    spring:
      redis:
        host: xxxxx
        port: 6379
        database: 0
        timeout: 1800000
        password:
        lettuce:
          pool:
            max-active: 20 #最大连接数
            max-wait: -1    #最大阻塞等待时间(负数表示没限制)
            max-idle: 5    #最大空闲
            min-idle: 0     #最小空闲
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1.1添加redis配置类

    /**
     * Redis配置类
     */
    @Configuration
    @EnableCaching
    public class RedisConfig {
    
        /**
         * 使用默认标签做缓存
         * @return
         */
        @Bean
        public KeyGenerator wiselyKeyGenerator() {
            return new KeyGenerator() {
                @Override
                public Object generate(Object target, Method method, Object... params) {
                    StringBuilder sb = new StringBuilder();
                    sb.append(target.getClass().getName());
                    sb.append(method.getName());
                    for (Object obj : params) {
                        sb.append(obj.toString());
                    }
                    return sb.toString();
                }
            };
        }
    
    
        @Bean
        public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            // 设置序列化对象,固定写法
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
            //  将Redis 中 string ,hash 数据类型,自动序列化!
            redisTemplate.setKeySerializer(new StringRedisSerializer());
            redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
            redisTemplate.setHashKeySerializer(new StringRedisSerializer());
            redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
    
            redisTemplate.afterPropertiesSet();
            return redisTemplate;
        }
    
        @Bean
        public CacheManager cacheManager(RedisConnectionFactory factory) {
            RedisSerializer<String> redisSerializer = new StringRedisSerializer();
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
    
            //解决查询缓存转换异常的问题
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
    
            // 配置序列化(解决乱码的问题),过期时间600秒
            RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                    .entryTtl(Duration.ofSeconds(600))
                    .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                    .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                    .disableCachingNullValues();
    
            RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                    .cacheDefaults(config)
                    .build();
            return cacheManager;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    二、分布式锁

    2.1 本地锁的局限性

    2.1.1 编写测试代码

    说明:通过reids客户端设置 num = 0

    set num 0
    
    • 1
    @RestController
    @RequestMapping("test")
    public class TestController {
    
        @Autowired
        private TestService testService;
    
        @GetMapping("testLock")
        public Result testLock() {
            testService.testLock();
            return Result.ok();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    public interface TestService {
    
        /**
         * 测试本地锁
         */
        void testLock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    @Service
    public class TestServiceImpl implements TestService {
    
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        /**
         * 在缓存中存储一个num,初始值为0
         * 利用缓存中的StringRedisTemplate,获取到当前的num数据值
         * 如果num不为空,则需要对当前值+1操作
         * 如果num为空,返回即可
         */
        @Override
        public void testLock() {
            // 利用缓存中的StringRedisTemplate,获取到当前的num数据值
            String num = redisTemplate.opsForValue().get("num");
            if (StringUtils.isEmpty(num)) {
                return;
            }
            // 如果num不为空,则需要对当前值+1操作
            int numValue = Integer.parseInt(num);
            // 写回缓存
            redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    2.1.2 使用ab工具测试

    使用 ab 测试工具:httpd-tools(yum install -y httpd-tools)

    ab -n(一次发送的请求数) -c(请求的并发数) 访问路径

    测试如下:5000请求,100并发
    ab -n 5000 -c 100 http://127.0.0.1:8206/test/testLock

    结果应该为:5000

    查看redis中的值:
    在这里插入图片描述

    2.1.3 使用本地锁

    在这里插入图片描述

    使用ab工具压力测试:5000次请求,并发100。

    查看redis中的结果:
    在这里插入图片描述

    2.1.4 本地锁问题演示锁

    接下来启动 8206 8216 8226 三个运行实例

    运行多个service实例:

    1. server.port=8216
    2. server.port=8226

    通过网关压力测试:
    ab -n 5000 -c 100 http://127.0.0.1:8206/test/testLock

    查看redis中的值:
    在这里插入图片描述

    以上测试,可以发现:本地锁只能锁住同一工程内的资源,在分布式系统里面都存在局限性,此时需要分布式锁。

    2.2 分布式锁实现的解决方案

    随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的 Java API 并不能提供分布式锁的能力。为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。

    分布式锁主流的实现方案:

    1. 基于数据库实现分布式锁
    2. 基于缓存(Redis等)
    3. 基于Zookeeper

    每一种分布式锁解决方案都有各自的优缺点:

    1. 性能:redis最高
    2. 可靠性:zookeeper最高

    2.3 使用redis实现分布式锁

    1. 多个客户端同时获取锁(setnx
    2. 获取成功,执行业务逻辑 {从db获取数据,放入缓存} ,执行完成释放锁(del
    3. 其他客户端等待重试

    在这里插入图片描述

    2.3.1 编写代码

    @Override
        public void testLock() {
            // 使用setnx命令
            // setnx lock ok
            Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", "OK");
            if (flag) {
                // flag = true:表示获取到锁
                // 执行业务逻辑
                String num = redisTemplate.opsForValue().get("num");
                if (StringUtils.isEmpty(num)) {
                    return;
                }
                int numValue = Integer.parseInt(num);
                redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
                // 释放锁
                redisTemplate.delete("lock");
            } else {
                // 没有获取到锁
                try {
                    Thread.sleep(100);
                    // 每隔1秒钟回调一次,再次尝试获取锁(自旋)
                    testLock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    重启,服务集群,通过网关压力测试。

    查看redis中num的值:
    在这里插入图片描述

    问题:setnx 刚好获取到锁,业务逻辑出现异常,导致锁无法释放。

    解决:设置过期时间,自动释放锁。

    2.3.2 优化之设置锁的过期时间

    设置过期时间有两种方式:

    1. 首先想到通过 expire 设置过期时间(缺乏原子性:如果在 setnx 和 expire 之间出现异常,锁也无法释放)
    2. 在 set 时指定过期时间(推荐)

    设置过期时间:
    在这里插入图片描述
    问题:可能会释放其他服务器的锁。

    场景:如果业务逻辑的执行时间是7s,执行流程如下:

    1. index1业务逻辑没执行完,3秒后锁被自动释放;
    2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放;
    3. index3获取到锁,执行业务逻辑;
    4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁, 导致index3的业务只执行1s就被别人释放,最终等于没锁的情况。

    解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁。

    2.3.3 优化之UUID防误删

    在这里插入图片描述
    在这里插入图片描述

    问题:删除操作缺乏原子性。

    场景:

    1. index1执行删除时,查询到的 lock 值确实和 uuid 相等;
    2. index1执行删除前,lock 刚好过期时间已到,被 redis 自动释放,在redis中没有了锁;
    3. index2获取了lock,index2线程获取到了cpu的资源,开始执行方法;
    4. index1执行删除,此时会把 index2 的 lock 删除。

    index1 因为已经在方法中了,所以不需要重新上锁,index1有执行的权限。index1已经比较完成了,这个时候,开始执行删除的index2的锁。

    2.3.4 优化之LUA脚本保证删除的原子性

    @Override
        public void testLock() {
            // 使用setnx命令
            // setnx lock ok
            String uuid = UUID.randomUUID().toString();
            Boolean flag = redisTemplate.opsForValue().setIfAbsent(
                    "lock", uuid, 3, TimeUnit.SECONDS);
            if (flag) {
                // flag = true:表示获取到锁
                // 执行业务逻辑
                String num = redisTemplate.opsForValue().get("num");
                if (StringUtils.isEmpty(num)) {
                    return;
                }
                int numValue = Integer.parseInt(num);
                redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
                // 定义一个lua脚本
                String secript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                //  准备执行lua 脚本
                DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
                // 设置lua脚本
                redisScript.setScriptText(secript);
                // 设置DefaultRedisScript 这个对象的泛型
                redisScript.setResultType(Long.class);
                //  redis调用lua脚本
                redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
            } else {
                // 没有获取到锁
                try {
                    Thread.sleep(100);
                    // 每隔1秒钟回调一次,再次尝试获取锁(自旋)
                    testLock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    2.3.5 总结

    为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

    1. 互斥性。在任意时刻,只有一个客户端能持有锁;
    2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁;
    3. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了;
    4. 加锁和解锁必须具有原子性

    2.4 使用redisson 解决分布式锁

    Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

    2.4.1 实现代码

    
    <dependency>
       <groupId>org.redissongroupId>
       <artifactId>redissonartifactId>
       <version>3.11.2version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    配置类

    /**
    * redisson配置信息
    */
    @Data
    @Configuration
    @ConfigurationProperties("spring.redis")
    public class RedissonConfig {
        
        private String host;
        
        private String addresses;
        
        private String password;
        
        private String port;
        
        private int timeout = 3000;
        private int connectionPoolSize = 64;
        private int connectionMinimumIdleSize = 10;
        private int pingConnectionInterval = 60000;
        private static String ADDRESS_PREFIX = "redis://";
        
        /**
        * 自动装配
        */
        @Bean
        RedissonClient redissonSingle() {
            Config config = new Config();
            // 判断地址是否为空
            if (StringUtils.isEmpty(host)) {
                throw new RuntimeException("host is empty");
            }
            SingleServerConfig serverConfig = config.useSingleServer()
                // //redis://127.0.0.1:6379
                .setAddress(ADDRESS_PREFIX + this.host + ":" + port)
                .setTimeout(this.timeout)
                .setPingConnectionInterval(pingConnectionInterval)
                .setConnectionPoolSize(this.connectionPoolSize)
                .setConnectionMinimumIdleSize(this.connectionMinimumIdleSize);
            // 是否需要密码
            if (!StringUtils.isEmpty(this.password)) {
                serverConfig.setPassword(this.password);
            }
            // RedissonClient redisson = Redisson.create(config);
            return Redisson.create(config);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    @Service
    public class TestServiceImpl implements TestService {
    
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        @Autowired
        private RedissonClient redissonClient;
    
        @Override
        public void testLock() {
            RLock lock = redissonClient.getLock("lock");
            // 开始加锁
            lock.lock();
            try {
                String value = redisTemplate.opsForValue().get("num");
                if (StringUtils.isNotEmpty(value)) {
                    return;
                }
                int num = Integer.parseInt(value);
                redisTemplate.opsForValue().set("num", String.valueOf(++num));
            } catch (NumberFormatException e) {
                e.printStackTrace();
            } finally {
                // 解锁:
                lock.unlock();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2.4.2 可重入锁(Reentrant Lock)

    基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
    另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间,超过这个时间后锁便自动解开了。

    最常见的使用:

    RLock lock = redisson.getLock("anyLock");
    // 最常使用
    lock.lock();
    // 加锁以后10秒钟自动解锁
    // 无需调用unlock方法手动解锁
    lock.lock(10, TimeUnit.SECONDS);
    
    // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
    boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
    if (res) {
       try {
         ...
       } finally {
           lock.unlock();
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.4.3 读写锁(ReadWriteLock)

    基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
    分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

    RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
    // 最常见的使用方法
    rwlock.readLock().lock();
    // 或
    rwlock.writeLock().lock();
    
    // 10秒钟以后自动解锁
    // 无需调用unlock方法手动解锁
    rwlock.readLock().lock(10, TimeUnit.SECONDS);
    // 或
    rwlock.writeLock().lock(10, TimeUnit.SECONDS);
    
    // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
    boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
    // 或
    boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
    ...
    lock.unlock();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    代码实现

    @GetMapping("read")
    public Result<String> read(){
        String msg = testService.readLock();
        return Result.ok(msg);
    }
    
    @GetMapping("write")
    public Result<String> write(){
        String msg = testService.writeLock();
        return Result.ok(msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    public interface TestService {
    
        String readLock();
    
        String writeLock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    @Service
    public class TestServiceImpl implements TestService {
    
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        @Autowired
        private RedissonClient redissonClient;
    
        @Override
        public String readLock() {
            // 初始化读写锁
            RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readwriteLock");
            // 获取读锁
            RLock rLock = readWriteLock.readLock();
            // 加10s锁
            rLock.lock(10, TimeUnit.SECONDS);
            String msg = this.redisTemplate.opsForValue().get("msg");
            //rLock.unlock(); // 解锁
            return msg;
        }
    
        @Override
        public String writeLock() {
            // 初始化读写锁
            RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readwriteLock");
            // 获取写锁
            RLock rLock = readWriteLock.writeLock();
            // 加10s锁
            rLock.lock(10, TimeUnit.SECONDS);
            this.redisTemplate.opsForValue().set("msg", UUID.randomUUID().toString());
            //rLock.unlock(); // 解锁
            return "成功写入了内容";
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    打开两个浏览器窗口测试:
    http://localhost:8206/test/read
    http://localhost:8206/test/write

    • 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始;
    • 同时访问读:不用等待;
    • 先写后读:读要等待(约10s)写完成;
    • 先读后写:写要等待(约10s)读完成;
  • 相关阅读:
    Zookeeper的介绍与集群搭建
    七,vi和vim
    [框架设计之道(二)]设备、任务设置及业务流程
    探究多态的原理与实现:虚函数表、动态绑定与抽象类
    Spring IOC 配置元信息-8
    HTML网页设计——轮滑运动体育类人物介绍主题12页面毕业设计网页
    【Linux】:文件系统
    itext生成pdf
    IO流知识点学习
    html静态网站基于HTML+CSS+JavaScript上海美食介绍网站网页设计与实现共计5个页面
  • 原文地址:https://blog.csdn.net/weixin_44129618/article/details/126215735