• 如何用Redis实现分布式锁?


    简介

    我相信很多人学分布式锁最大的动力并不是他自己的系统需要,而是面试官需要。。。当然,这也侧面说明分布锁很重要,经常作为考题,在学习之前,我们要先明确几个问题。

    一、锁重要吗?

    当然重要,只要访问临界资源的时候,都会用到锁,要不然就会有线程安全问题。

    二、那我们为什么不用Java自带的锁?比如synchronized和Lock还要自己实现呢?

    这里需要的明确一个问题,这些Java自带的锁都是在同一个JVM中发挥作用的,如果是在分布式的服务中,会有多个JVM虚拟机下的服务进行并发访问,这些锁是发挥不了作用的。分布式环境下,锁需要第三方服务提供。

    三、常用的分布式锁实现方案有哪些?
    • 基于MySQL的分布式锁
    • 基于Redis的分布式锁
    • 基于ZooKeeper的分布式锁

    其实说白了,只要能存储数据的数据库都能实现分布锁,因为我们只需要告诉其它线程,当前资源被占用了就可以了,其实synchronized和Lock也是存储一个标记,告知别的线程,当前资源有没有被占用而已,没什么神秘的。

    三种实现方式

    首先我们需要明白,分布式锁应该达到什么样的要求:

    1. 互斥性。(在分布式集群中,同一个方法在同一时间只能被一台机器上的一个线程获得)。
    2. 可重入性。(递归调用不应该被阻塞、避免死锁)。
    3. 锁的超时。(避免死锁、死循环等意外情况)。
    4. 加锁和解锁必须为同一客户端。(除非锁到期自动收回,否则加锁和解锁需为同一客户端)。

    接下来简单介绍下三种锁实现原理,以及他们的优缺点。

    基于MySQL的分布式锁

    基于MySQL的锁主要是两种实现方式,一种是悲观锁,一种是乐观锁

    1. 悲观锁:主要利用 select … where … for update 对所查行进行锁定和操作,需要注意的是“where name = lock ”,name字段必须要走索引,否则会锁表。
    2. 乐观锁:是基于CAS的思想,不认为锁争抢经常发生,只有update version失败后才能觉察到,锁争抢不多时,是很好的解决方案,但争抢过多,很浪费CPU资源。
    优点:
    • 实现相对简单,都由MySql去解决争抢问题。
    • 架构相对简单,不再需要多余的三方组件,使整个系统更简单。
    缺点:
    • 性能相对较差,并且有锁表的风险。
    • 非阻塞操作失败后,需要轮询,占用cpu资源,和MySQL数据库资源。
    • 长时间不提交或者长时间轮询,会占用过多MySQL连接资源。

    基于Redis的分布式锁

    Redis的一些操作和特性,可能参考这篇文章:Redis学习笔记。这里直接介绍需要用到的三个命令,分别是:SETNXexpiredelete

    1. SETNX key val
      SETNXSET if Not eXists缩写,作用为:若key不存在,则存入键值对,返回1;若key存在,则什么都不做,返回0。
    2. expire key
      为key设置一个超时时间,单位为second,超过这个时间锁会自动删除key,避免因宕机引起死锁。
    3. delete key
      删除指定key。

    实现思想为:

    1. 获取锁的时候,使用setnx加锁,key为锁名,value值为一个随机生成的UUID。
    2. 获取锁成功后,使用expire命令为锁添加一个超时时间,若超过这个时间则放弃获取锁。
    3. 释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
    优点:
    • 依赖于Redis高并发的特性,所以性能极佳。
    • 过期时间不好控制,需要考虑续锁问题。
    缺点:
    • 实现相对复杂,需要考虑的因素太多。
    • 非阻塞,操作失败后,需要轮询,占用cpu资源。
    • 主节点挂掉,未成功同步的情况下,可能导致多个节点获取到锁。

    基于ZooKeeper的分布式锁

    ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。Znode 又分为四种类型:持久节点持久顺序节点临时节点临时顺序节点
    ZooKeeper 分布式锁是基于临时顺序节点来实现的,锁可理解为 ZooKeeper 上的一个节点,当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。
    而且用临时顺序节点的另外一个用意是,如果某个客户端创建临时顺序节点后,自己意外宕机了也没关系,ZooKeeper 感知到某个客户端宕机后会自动删除对应的临时顺序节点,相当于自动释放锁。

    优点:
    • 有效的解决单点宕机问题,具备高可用性。
    • 可做可重入锁。
    • 可做阻塞锁。
    缺点:
    • 需要频繁的创建和删除节点,性能上不如Redis。
    • 客户端与 Zookeeper 的长时间失联,锁被释放问题。

    总结

    从实现容易度上比较:MySQL数据库 > Zookeeper > Redis缓存。
    从性能上比较:Redis缓存 > Zookeeper > MySQL数据库。
    从可靠性上比较:Zookeeper > MySQL数据库 > Redis缓存。

    Redis的具体实现代码

    用Redis做代码实现时,我们需要考虑以下几个问题:

    • 因为宕机引起的死锁问题:设置过期时间。
    • 业务时间长于过期时间:开一个守护进程,做锁续期。
    • 锁被别人释放:锁写入唯一标识,释放锁先检查标识,再释放。

    那么这些实现起来太麻烦怎么办,直接用别人封装好的库即可:Redisson。不但更加方便,也更加稳定,代码如下:

    // 1.构造redisson实现分布式锁必要的Config
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("password").setDatabase(0);
    
    // 2.构造RedissonClient
    RedissonClient redissonClient = Redisson.create(config);
    
    // 3.获取锁对象实例
    RLock rLock = redissonClient.getLock(lockKey);
    try {
       
        // 4.尝试获取锁
        boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
        if (res) {
            // 获得锁成功,处理业务
        }
    } catch (Exception e) {
        // 继续等待,或执行别的操作
        throw new RuntimeException("aquire lock fail");
    }finally{
        // 无论如何, 最后都要解锁
        rLock.unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    Redisson的实现方法如下:
    tryLock实现方法如下:

    	public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            long time = unit.toMillis(waitTime);
            long current = System.currentTimeMillis();
            long threadId = Thread.currentThread().getId();
            
            // 尝试获取锁
            Long ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }
    
            // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
    
            current = System.currentTimeMillis();
    
            /**
             * 订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
             * 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
             * 当 this.await 返回 true,进入循环尝试获取锁.
             */
            RFuture subscribeFuture = subscribe(threadId);
            // await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future)
            if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                if (!subscribeFuture.cancel(false)) {
                    subscribeFuture.onComplete((res, e) -> {
                        if (e == null) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    });
                }
                acquireFailed(threadId);
                return false;
            }
    
            try {
                // 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
                time -= System.currentTimeMillis() - current;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                  }
    
                /**
                 * 收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
                 * 获取锁成功,则立马返回 true,
                 * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环
                 */
                while (true) {
                    long currentTime = System.currentTimeMillis();
    
                    // 再次尝试获取锁
                    ttl = tryAcquire(leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        return true;
                    }
                    // 超过最大等待时间则返回 false 结束循环,获取锁失败
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(threadId);
                        return false;
                    }
    
                    /**
                     * 阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):
                     */
                    currentTime = System.currentTimeMillis();
                    if (ttl >= 0 && ttl < time) {
                        //如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        //则就在wait time 时间范围内等待可以通过信号量
                        getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                    }
    
                    // 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(threadId);
                        return false;
                    }
                }
            } finally {
                // 无论是否获得锁,都要取消订阅解锁消息
                unsubscribe(subscribeFuture, threadId);
            }
            return get(tryLockAsync(waitTime, leaseTime, unit));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94

    Redisson看门狗续锁机制

    	private  RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    		
    		// 如果带有过期时间,则按照普通方式获取锁
            if (leaseTime != -1) {
                return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
            }
    
            // 先按照30秒的过期时间来执行获取锁的方法
            RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            
            // 如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e != null) {
                    return;
                }
    
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            });
            return ttlRemainingFuture;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    续期原理
    就是用lua脚本,将锁的时间重置为30s

    /*
     Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,
     然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果客户端 还持有锁 key
     (判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),
     那么就会不断的延长锁 key 的生存时间。如果服务宕机了,Watch Dog 机制线程也就没有了,
     此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。
    */
    private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }
    
    protected RFuture renewExpirationAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.singletonList(getName()),
            internalLockLeaseTime, getLockName(threadId));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    怎样判定一个可执行文件是否是PIE 格式的文件
    .NET 团队公布.NET 9开发目标 并发布.NET9的首个预览版
    大学生WEB前端静态网页——旅游介绍35页 响应式,
    前端面试题---作用域链和原型链
    Linux常见命令
    UbuntuToGo | Ubuntu 22.04.6 VMware UEFI启动 VHD虚拟磁盘
    Git 将某个提交合并到另一个分支
    HTML5 新表单元素详解
    常见算法题分类总结之归并排序(Merge-Sort):从二路到多路
    轻量封装WebGPU渲染系统示例<4>-CubeMap/天空盒(源码)
  • 原文地址:https://blog.csdn.net/drhrht/article/details/126081119