• 如何用 Redis 实现分布式锁


    1. 如何用 Redis 实现分布式

    1.1. 序言

    前面的文章都是理论知识, 写多了头有点大, 突然想写点实战方面的内容, 刚好最近公司在做异步任务迁移, 用到了分布式锁和任务分片, 所以打算写 2 篇实战方面的文章, 分别介绍分布式锁和任务分片的实现方式, 这个在实际项目中, 应该会经常用到, 今天这篇文章就先讲解分布式锁的实现方式。

    1.2. 使用场景

    分布式锁的使用场景其实很多, 在小米这边我主要遇到以下场景:

    • 在服务集群中执行定时任务, 我们希望只有一台机器去执行, 就需要用到分布式锁, 只有拿到锁的机器, 才能执行该定时任务;
    • 当外部请求打到集群中时, 比如该请求是对订单进行操作, 为了避免请求重入, 我们需要在入口加上订单维度的分布式锁。

    1.3. Redis 分布式锁

    Redis 分布式锁是面试常面的考题, 很多同学都知道用 SetNx() 去获取锁, 如果面试官问你下面 2 个问题, 你知道怎么回答么?

    • 如果获取锁的机器挂掉, 如何处理?
    • 当锁超时时, A/B 两台机器同时获取锁, 可能会同时获取, 如何解决?

    其实 Redis 分布式锁, 肯定不仅仅是 SetNx() 就能解决的, 什么? 你不知道什么是 SetNx(), 楼哥是暖男嘛, 马上给你解答:

    Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时, 为 key 设置指定的值。(返回值: 设置成功, 返回 1, 设置失败, 返回 0)

    如果调用 SetNx() 返回 1, 表示获取到锁, 如果返回 0, 表示没有获取到锁, 为了避免机器宕机&重启, 导致锁一直没有释放, 所以我们需要记录锁的超时时间, 整体执行流程如下:

    • 先通过 SetNx() 获取锁, 并将 value 设置成超时时间, 如果成功获取锁, 直接返回;
    • 如果未获取到锁, 可能是机器宕机&重启等, 需要通过 GetKey() 获取锁的超时时间 value, 如果锁未超时, 证明机器未宕机&重启, 获取锁失败;
    • 如果锁已经超时, 就可以重新去获取锁, 并设置锁的新的超时时间, 为了避免多台机器机器同时拿到锁, 需要使用 GetSet() 方法, 因为 GetSet() 会返回之前的旧值, 如果此时有两台机器 A/B 同时执行 GetSet() 方法, 假如 A 先执行, B 后执行, 那么 A 调用 GetSet() 返回的值, 其实就等于之前调用 GetKey() 获取的的值 current_time, B 调用 GetKey() 返回的值, 其实就是 A 设置的新值, 肯定不等于 current_time, 所以我们可以通过两个时间是否相等, 来判断是谁先拿到锁。(这里应该是分布式锁最难理解的地方, 我每次重温这个逻辑, 都会在这个地方卡一下。)

    Redis Getset 命令用于设置指定 key 的值, 并返回 key 的旧值。(返回值: 返回给定 key 的旧值。当 key 没有旧值时, 即 key 不存在时, 返回 nil; 当 key 存在但不是字符串类型时, 返回一个错误。)

    可能有同学说, 写了一堆, 看的我头都大了, 来来来, 楼哥给你画了一幅图, 是不是就清晰很多

    1.4. 具体实现

    基本原理讲清楚了, 下面就开始堆代码了哈, 先看看获取锁的逻辑, 里面的注释写的相当详细, 即使不懂编程的同学, 应该都能看懂:

    // 获取分布式锁, 需要考虑以下情况: 
    // 1. 机器 A 获取到锁, 但是在未释放锁之前, 机器挂掉或者重启, 会导致其它机器全部 hang 住, 这时需要根据锁的超时时间, 判断该锁是否需要重置; 
    // 2. 当锁超时时, 需要考虑两台机器同时去获取该锁, 需要通过 GETSET 方法, 让先执行该方法的机器获取锁, 另外一台继续等待。
    func GetDistributeLock(key string, expireTime int64) bool {
     currentTime := time.Now().Unix()
     expires := currentTime + expireTime
     redisAlias := "jointly"
     // 1. 获取锁, 并将 value 值设置为锁的超时时间
     redisRet, err := redis.SetNx(redisAlias, key, expires)
     if nil == err && utils.MustInt64(1) == redisRet {
      // 成功获取到锁
      return true
     }
     // 2. 当获取到锁的机器突然重启&挂掉时, 就需要判断锁的超时时间, 如果锁超时, 新的机器可以重新获取锁
     // 2.1 获取锁的超时时间
     currentLockTime, err := redis.GetKey(redisAlias, key)
     if err != nil {
      return false
     }
     // 2.2 当"锁的超时时间"大于等于"当前时间", 证明锁未超时, 直接返回
     if utils.MustInt64(currentLockTime) >= currentTime {
      return false
     }
     // 2.3 将最新的超时时间, 更新到锁的 value 值, 并返回旧的锁的超时时间
     oldLockTime, err := redis.GetSet(redisAlias, key, expires)
     if err != nil {
      return false
     }
     // 2.4 当锁的两个"旧的超时时间"相等时, 证明之前没有其它机器进行 GetSet 操作, 成功获取锁
     // 说明: 这里存在并发情况, 如果有 A 和 B 同时竞争, A 会先 GetSet, 当 B 再去 GetSet 时, oldLockTime 就等于 A 设置的超时时间
     if utils.MustString(oldLockTime) == currentLockTime {
      return true
     }
     return false
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    对于里面的一些函数 utils.MustString()、utils.MustInt64(), 其实就是一些底层封装好的类型转换函数, 应该不会影响大家理解哈, 如果想直接拿去使用, 这里需要简单修改一下。

    再看看删除锁的逻辑:

    // 删除分布式锁
    // @return bool true-删除成功; false-删除失败
    func DelDistributeLock(key string) bool {
     redisAlias := "jointly"
     redisRet := redis.Del(redisAlias, key)
     if redisRet != nil {
      return false
     }
     return true
    }
    然后是业务处理逻辑: 
    func DoProcess(processId int) {
     fmt.Printf("启动第%d 个线程、n", processId)
     redisKey := "redis_lock_key"
     for {
      // 获取分布式锁
      isGetLock := GetDistributeLock(redisKey, 10)
      if isGetLock {
       fmt.Printf("Get Redis Key Success, id:%d\n", processId)
       time.Sleep(time.Second * 3)
       // 删除分布式锁
       DelDistributeLock(redisKey)
      } else {
       // 如果未获取到该锁, 为了避免 redis 负载过高, 先睡一会
       time.Sleep(time.Second * 1)
      }
     }
    }
    最后起个 10 个多线程, 去执行这个 DoProcess(): 
    func main() {
     // 初始化资源
     var group string = "i18n"
     var name string = "jointly_shop"
     var host string
     // 初始化资源
     host = "http://ip:port"
     _, err := xrpc.NewXRpcDefault(group, name, host)
     if err != nil {
      panic(fmt.Sprintf("initRpc when init rpc  failed, err:%v", err))
     }
     redis.SetRedis("jointly", "redis_jointly")
     // 开启 10 个线程, 去抢 Redis 分布式锁
     for i := 0; i <= 9; i ++ {
      go DoProcess(i)
     }
     // 避免子线程退出, 主线程睡一会
     time.Sleep(time.Second * 100)
     return
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    程序跑了 100s, 我们可以看到, 每次都只有 1 个线程获取到锁, 分别是 2、1、5、9、3, 执行结果如下:

    启动第 0 个线程
    启动第 6 个线程
    启动第 9 个线程
    启动第 4 个线程
    启动第 5 个线程
    启动第 2 个线程
    启动第 1 个线程
    启动第 8 个线程
    启动第 7 个线程
    启动第 3 个线程
    Get Redis Key Success, id:2
    Get Redis Key Success, id:1
    Get Redis Key Success, id:5
    Get Redis Key Success, id:9
    Get Redis Key Success, id:3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1.5. 遇到的坑

    中间出现过一些坑, 我简单说一下:

    • 之前我们做过一次服务迁移, 需要将物理机迁移到 Neo 云, 当把流量从物理机迁移 Neo 云后, 千万不要忘了停掉物理机上的定时任务, 否则物理机会去抢占这个分布式锁, 特别是代码有变更后, 物理机如果抢到锁, 会继续执行旧的代码, 那就是个大坑了。
    • 不要轻易去修改分布式锁的超时时间, 之前为了能快速排查问题, 修改过一次, 然后出现了非常诡异的问题, 当时排查了一天, 具体问题也记不太清了, 大家感兴趣, 可以自己模拟一下。

    1.6. 后记

    这个分布式锁其实是我 2019 年写的, 已经在线上跑了 2 年, 只需要进行简单修改, 就可以放到线上跑, 不用担心里面有坑哈, 因为坑已经被我趟过了。
    上周写了一篇限流的文章, 加上今天这个分布式锁, 其实都是最近项目中使用的, 所以就整理一下, 其实我最想写的, 是任务分片的实现方式, 也是最近在公司做异步任务时 Get 到的新技能, 它支持多机并发执行一个任务, 是不是很神奇, 后面会分享给大家。

  • 相关阅读:
    Linux常用文本编辑器,及文本查看摘选的常用命令
    SLAM从入门到精通(tf的使用)
    Java基于springboot +vue的箱包销售购物网站 多商家
    思腾云计算
    Linux的LVM磁盘扩容
    交换机技术综述(第十一课)
    linux-sed命令使用方法记录
    `英语` 2022/8/22
    001 opencv addWeighted
    WinUI(WASDK)使用MediaPipe检查人体姿态关键点
  • 原文地址:https://blog.csdn.net/wan212000/article/details/126143727