• Redis 分布式锁


    Redis 分布式锁

    实现原理

    共享资源互斥,实现资源串行化,在单体应用中常用的有:Synchronized、ReentrantLock。分布式锁是控制分布式系统之间同步访问共享资源的一种方式,利用 Redis 的单线程特性对共享资源进行串行化。

    实现方式

    可以使用 Jedis 实现分布式锁的获取和释放,推荐使用 jedis 的 set 方法,其中设置 NX 保证互斥性,添加 EX 过期时间,也可是使用 setnx 命令实现,但是会存在并发问题。

    释放锁可以使用 del 命令或者 redis + lua 脚本。del 命令也会存在并发问题,例如 A、B 客户端,A 在执行 jedis.del() 之前,A 锁突然过期,同时 B 客户端尝试加锁成功,然后 A 客户端执行 del 命令,解锁了 B 客户端刚加的锁。

    存在问题

    除了上边并发的问题,还存在其他的问题。

    1、单机:无法高可用。搭建集群 AP 模型。

    2、主从复制:由于主从数据的不一致,有可能锁会重复获得。主库的锁没有复制到从库,从库就由于主库宕机而升级为主库,此时的主库没有锁,同一资源就能重复获得锁。该问题可以使用 RedLock (红锁)来解决,保证获取锁的时候需要在满足大于一半的缓存服务器 set 锁成功的情况下才能获取成功。

    3、无法续租:超过 expireTime 后,不能继续使用。使用 Redisson 框架。

    CP 强一致性

    AP 高可用

    Redission 分布式锁的使用

    Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网络。基于 NIO 的 Netty 框架上,生产环境使用分布式锁。

    Maven jar 包
    <dependency>
        <groupId>org.redissongroupId>
        <artifactId>redissonartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    配置 Redisson

    application-redis.yml

    spring:
      redis:
        host: localhost
        port: 6379
        password: 123456           #没有密码就保留空
        timeout: 5000  # 连接超时时间
        jedis:
           pool:
             max-active: 1000 # 池在给定时间可以分配的最大连接数。使用负值表示无限制。
             max-idle: 50  #池中“空闲”连接的最大数量。使用负值表示空闲连接的数量不受限制
             min-idle: 10  # 目标是池中要维护的最小空闲连接数。。
             max-wait: -1  # 在池耗尽时引发异常之前,连接分配应阻止的最长时间。使用负值无限期阻塞。
        redisson:
          pool:
            max-active: 1000   # 池在给定时间可以分配的最大连接数。使用负值表示无限制。
            min-idle: 10   # 目标是池中要维护的最小空闲连接数。
          tokenName: Authorization    # 用于分布式锁的唯一标识,一般使用token如果没有找到,就找sessionId
      session:
        store-type: redis   #设置session保存为默认redis的方式 ,可以解决分布式session不一致问题
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    使用文件

    import org.redisson.Redisson;
    import org.redisson.config.Config;
    
    public class RedissonManager {
        private static Config config = new Config();
        
        // 声明 redisson 对象
        private static Redisson redisson = null;
        
        // 实例化 redisson
        static {
            config.useClusterServers()
                    // 集群扫描间隔
                    .setScanInterval(2000)
                    // cluster 方式
    //                .addNodeAddress("redis://127.0.0.1:6379")
    //                .addNodeAddress("redis://127.0.0.1:6379")
                    .addNodeAddress("redis://127.0.0.1:6379");
            redisson = (Redisson) Redisson.create(config);
        }
        
        public static Redisson getRedisson() {
            return redisson;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    锁的获取和释放

    public class DistributedRedisLock {
        //从配置类中获取redisson对象
        private static Redisson redisson = RedissonManager.getRedisson();
        private static final String LOCK_TITLE = "redisLock_";
    
        // 加锁
        public static boolean acquire(String lockName) {
            // 声明 key 对象
            String key = LOCK_TITLE + lockName;
            // 获取锁对象
            RLock myLock = redisson.getLock(key);
            //加锁,并且设置锁过期时间3秒,防止死锁的产生 uuid+threadId
            myLock.lock(3,  TimeUnit.SECONDS);
            return true;
        }
        
        // 锁的释放
        public static void reelease(String lockName) {
            String key = LOCK_TITLE + lockName;
            RLock myLock = redisson.getLock(key);
            myLock.unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    业务使用
    public String doSoming() {
        String key = "lock001";
        DistributedRedisLock.acquire(key);
        // do something
        DistributedRedisLock.release(key);
        return "success";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Redisson 分布式锁的实现原理

    加锁的机制

    Redisson 在加锁前会根据 hash 阶段选择一台机器(Redis 机器),发送 lua 脚本到 redis 服务器上,脚本如下

    //如果不存在加锁的key(serviceKey)就执行加锁的逻辑
    "if (redis.call('exists', KEYS[1]) == 0) then " +
        "redis.call('hset', KEYS[1], ARGV[2], 1); " +  -- 无锁,加锁,1:加了一回
        //pexpire命令:给指定的Key设置过期时间
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " +
        "end; " +
        //判断Hash类型的加锁的key对应的的Map结构中,key对应的value是否存在
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        //将key对应的value值进行+1,重入锁
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        //pexpire命令:给指定的Key设置过期时间
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
         "return nil; " +
         "end; " +
        //返回Key(serviceKey)的过期时间
    "return redis.call('pttl', KEYS[1]);",
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    lua 的作用,保证业务逻辑执行的原子性。也就是解决上边踢桃的 setnx 的并发问题。

    名词解析

    KEYS[1]:加锁的 key

    ARGV[1]:key 的生成时间, expire。

    ARGV[2]:加锁的客户端 Id (UUID.randomUUID + “:” + threadId)

    自动延迟机制

    一旦加锁成功,就会启动一个 watch dog (看门狗),一个后端线程,会每隔 10 秒检查一下,如果还持有锁 key,会不断的延长锁的生存时间。可以通过 lockWatchdogTimeout 进行配置。

    释放锁的机制
    -- 锁对应的hash不存在
    if (redis.call('exists', KEYS[1]) == 0) 
    then 
        -- 通知抢锁。
        redis.call('publish', KEYS[2], ARGV[1]); 
        --结束
        return 1; 
    end;
    -- 如果锁不存在,不处理
    if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) 
    then 
        return nil;
    end; 
    --对其中的元素进行计数 -1 实现可重入
    local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
    -- 如果此时还有计数
    if (counter > 0) 
    then 
        -- 刷新过期时间
        redis.call('pexpire', KEYS[1], ARGV[2]); 
        return 0; 
    else 
        -- 解锁,通知其他线程争抢锁。
        redis.call('del', KEYS[1]); 
        redis.call('publish', KEYS[2], ARGV[1]); 
        return 1; 
    end; 
    return nil;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • KEYS[1] :需要加锁的key,这里需要是字符串类型。

    • KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName: “redisson_lockchannel{” + getName() + “}”

    • ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合 redis 的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。

    • ARGV[2] :锁的超时时间,防止死锁

    • ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId

    总结

    分布式锁的特性
    • 互斥性:只能一个客户端
    • 同一性:只能同一个客户端
    • 可重入:持续加锁
    • 容错性:防止死锁
  • 相关阅读:
    Java 字节输出流FileOutputStream的用法和概述
    Cookie、代理服务器
    Flink 1.10 版本之前如何生成 Watermark
    MYSQL索引查询问题质疑
    Flink之DataStream API开发Flink程序过程与Flink常见数据类型
    工厂方法模式
    【一文清晰】单元测试到底是什么?应该怎么做?
    【JVM】垃圾回收(GC)详解
    设计模式篇---观察者模式
    前端404页面的制作
  • 原文地址:https://blog.csdn.net/qq_34178764/article/details/126694678