• Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)


    Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)

    1 概念

    应用场景

    Golang自带的Lock锁单机版OK(存储在程序的内存中),分布式不行
    分布式锁:

    • 简单版:redis setnx=》加锁设置过期时间需要保证原子性=》lua脚本
    • 完整版:redis Lua脚本+实现可重入+自动续期=》hset结构

    应用场景:

    1. 防止用户重复下单,锁住用户id
    2. 防止商品超卖问题
    3. 锁住账户,防止并发操作

    例如:我本地启两个端口跑两个相同服务,然后通过Nginx反向代理分别将请求均衡打到两个服务(模拟分布式微服务),最后通过Jmeter模拟高并发场景。同时我在代码里添加上lock锁。

    • 可以看到还是有消费到相同数据,出现超卖现象,这是因为lock锁是在go程序的内存,只能锁住当前程序。如果是分布式的话,就需要涉及分布式锁。
      在这里插入图片描述

    注意📢:本地通过Mac+Jmeter+Iris+Nginx模拟分布式场景详情可见:https://blog.csdn.net/weixin_45565886/article/details/136635997

    package main
    
    import (
    	"context"
    	"github.com/go-redis/redis/v8"
    	"github.com/kataras/iris/v12"
    	context2 "github.com/kataras/iris/v12/context"
    	"myTest/demo_home/redis_demo/distributed_lock/constant"
    	service2 "myTest/demo_home/redis_demo/distributed_lock/other_svc/service"
    	"sync"
    )
    
    func main() {
    	constant.RedisCli = redis.NewClient(&redis.Options{
    		Addr: "localhost:6379",
    		DB:   0,
    	})
    	_, err := constant.RedisCli.Set(context.TODO(), constant.AppleKey, 500, -1).Result()
    	if err != nil && err != redis.Nil {
    		panic(err)
    	}
    	app := iris.New()
    	xLock2 := new(sync.Mutex)
    	app.Get("/consume", func(c *context2.Context) {
    		xLock2.Lock()
    		defer xLock2.Unlock()
    		service2.GoodsService2.Consume()
    		c.JSON("ok port:9999")
    	})
    	app.Listen(":9999", nil)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    分布式锁必备特性

    分布式锁需要具备的特性:

    1. 独占性(排他性):任何时刻有且仅有一个线程持有
    2. 高可用:redis集群情况下,不能因为某个节点挂了而出现获取锁失败和释放锁失败的情况
    3. 防死锁:杜绝死锁,必须有超时控制机制或撤销操作 Expire key
    4. 不乱抢:防止乱抢。(自己只能unlock自己的锁)lua脚本保证原子性,且只删除自己的锁
    5. 重入性:同一个节点的同一个线程如果获得锁之后,它也可以再次获取这个锁
      • setnx只能解决有无分布式锁
      • hset 解决可重入问题,记录加锁次数: hset zyRedisLock uuid:threadID 3

    2 思路分析

    宕机与过期

    如果加锁成功之后,某个Redis节点宕机,该锁一直得不到释放,就会导致其他Redis节点加锁失败。

    • 加锁时需要设置过期时间
    //通过lua脚本保证加锁与设置过期时间的原子性
    
    func (r *RedisLock) TryLock() bool {
    	//通过lua脚本加锁[hincrby如果key不存在,则会主动创建,如果存在则会给count数加1,表示又重入一次]
    	lockCmd := "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
    		"then " +
    		"   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
    		"   redis.call('expire', KEYS[1], ARGV[2]) " +
    		"   return 1 " +
    		"else " +
    		"   return 0 " +
    		"end"
    	result, err := r.redisCli.Eval(context.TODO(), lockCmd, []string{r.key}, r.Id, r.expire).Result()
    	if err != nil {
    		log.Errorf("tryLock %s %v", r.key, err)
    		return false
    	}
    	i := result.(int64)
    	if i == 1 {
    		//获取锁成功&自动续期
    		go r.reNewExpire()
    		return true
    	}
    	return false
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    防止误删key

    锁过期时间设置30s,业务逻辑假如要跑40s。30s后锁自动过期释放了,其他线程加锁了。再过10s后业务逻辑走完了,去释放锁,就会出现把其他人的锁删除。【张冠李戴】

    • 设置key时,可带上线程id和uuid(我这里以uuid演示)。删除key之前,要判断是否是自己的锁。如果是则unlock释放,不是就return走。
    func (r *RedisLock) Unlock() {
    	//通过lua脚本删除锁
    	//1. 查看锁是否存在,如果不存在,直接返回
    	//2. 如果存在,对锁进行hincrby -1操作,当减到0时,表明已经unlock完成,可以删除key
    	delCmd := "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
    		"then " +
    		"   return nil " +
    		"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
    		"then " +
    		"   return redis.call('del', KEYS[1]) " +
    		"else " +
    		"   return 0 " +
    		"end"
    	resp, err := r.redisCli.Eval(context.TODO(), delCmd, []string{r.key}, r.Id).Result()
    	if err != nil && err != redis.Nil {
    		log.Errorf("unlock %s %v", r.key, err)
    	}
    	if resp == nil {
    		fmt.Println("delKey=", resp)
    		return
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    Lua保证原子性

    加锁与设置过期时间需要保证原子性。否则如果加锁成功后,还没来得及设置过期时间,Redis节点挂掉了,就又会出现其他节点一直获取不到锁的问题。

    • Lua脚本保证原子性
    //lock 加锁&设置过期时间
    "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
    		"then " +
    		"   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
    		"   redis.call('expire', KEYS[1], ARGV[2]) " +
    		"   return 1 " +
    		"else " +
    		"   return 0 " +
    		"end"
    
    //unlock解锁
    	delCmd := "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
    		"then " +
    		"   return nil " +
    		"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
    		"then " +
    		"   return redis.call('del', KEYS[1]) " +
    		"else " +
    		"   return 0 " +
    		"end"
    
    //自动续期
    renewCmd := "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
    		"then " +
    		"   return redis.call('expire', KEYS[1], ARGV[2]) " +
    		"else " +
    		"   return 0 " +
    		"end"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    可重入锁

    存在一部分业务,方法里还需要继续加锁。需要实现锁的可重入,记录加锁的次数。Lock几次,就unLock几次。

    • map[string]map[string]int =>可通过Redis hset结构实现
    # yiRedisLock :redis的key
    # fas421424safsfa:1 :uuid+线程号
    # 5 :加锁次数(重入次数)
    hset yiRedisLock fas421424safsfa:1 5
    
    • 1
    • 2
    • 3
    • 4
    //通过hset&hincrby 保证可重入(记录加锁次数)
    lockCmd := "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
    		"then " +
    		"   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
    		"   redis.call('expire', KEYS[1], ARGV[2]) " +
    		"   return 1 " +
    		"else " +
    		"   return 0 " +
    		"end"
    
    delCmd := "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
    		"then " +
    		"   return nil " +
    		"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
    		"then " +
    		"   return redis.call('del', KEYS[1]) " +
    		"else " +
    		"   return 0 " +
    		"end"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    自动续期

    相同业务耗时可能因为网络等问题而有所变化。例如:我们设置分布式锁超时时间为20s,但是业务因为网络问题某次耗时达到了30s,这时锁就会被超时释放,其他线程就能获取到锁。存在业务风险。

    • 加锁成功之后设置自动续期,启一个timer定时任务,比如每10s检测一下锁有没有被释放,如果没有,就自动续期。
    // 判断锁是否存在,如果存在(表明业务还未完成),重新设置过期时间(自动续期)
    renewCmd := "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
    		"then " +
    		"   return redis.call('expire', KEYS[1], ARGV[2]) " +
    		"else " +
    		"   return 0 " +
    		"end"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3 代码

    3.1 项目结构解析

    在这里插入图片描述

    • constant模块:定义分布式锁名称、业务Key(用于模拟扣减数据库)
    • lock模块:核心模块,实现分布式锁
      • Lock
      • TryLock
      • UnLock
      • NewRedisLock
    • other_svc:在其他端口启另外一个服务,用于本地模拟分布式
    • service:业务类,扣减商品数量(其中的扣减操作涉及分布式锁)
    • main:提供iris web服务

    3.2 全部代码

    注::other_svc这里不提供,与分布式锁实现无太大关系。同时为了快速演示效果,部分项目结构与代码不规范。

    感兴趣的朋友,可以上Github查看全部代码。

    • Github:https://github.com/ziyifast/ziyifast-code_instruction/tree/main/redis_demo/distributed_lock
    • 现象:
      在这里插入图片描述
    constant/const.go
    package constant
    
    import "github.com/go-redis/redis/v8"
    
    var (
    	BizKey   = "XXOO"
    	AppleKey = "apple"
    	RedisCli *redis.Client
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    lock/redis_lock.go
    package service
    
    import (
    	"context"
    	"github.com/go-redis/redis/v8"
    	"github.com/ziyifast/log"
    	"myTest/demo_home/redis_demo/distributed_lock/constant"
    	"myTest/demo_home/redis_demo/distributed_lock/lock"
    	"strconv"
    )
    
    type goodsService struct {
    }
    
    var GoodsService = new(goodsService)
    
    func (g *goodsService) Consume() {
    	redisLock := lock.NewRedisLock(constant.RedisCli, constant.BizKey)
    	redisLock.Lock()
    	defer redisLock.Unlock()
    	//consume goods
    	result, err := constant.RedisCli.Get(context.TODO(), constant.AppleKey).Result()
    	if err != nil && err != redis.Nil {
    		panic(err)
    	}
    	i, err := strconv.ParseInt(result, 10, 64)
    	if err != nil {
    		panic(err)
    	}
    	if i < 0 {
    		log.Infof("no more apple...")
    		return
    	}
    	_, err = constant.RedisCli.Set(context.TODO(), constant.AppleKey, i-1, -1).Result()
    	if err != nil && err != redis.Nil {
    		panic(err)
    	}
    	log.Infof("consume success...appleID:%d", i)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    service/goods_service.go
    package service
    
    import (
    	"context"
    	"github.com/go-redis/redis/v8"
    	"github.com/ziyifast/log"
    	"myTest/demo_home/redis_demo/distributed_lock/constant"
    	"myTest/demo_home/redis_demo/distributed_lock/lock"
    	"strconv"
    )
    
    type goodsService struct {
    }
    
    var GoodsService = new(goodsService)
    
    func (g *goodsService) Consume() {
    	redisLock := lock.NewRedisLock(constant.RedisCli, constant.BizKey)
    	redisLock.Lock()
    	defer redisLock.Unlock()
    	//consume goods
    	result, err := constant.RedisCli.Get(context.TODO(), constant.AppleKey).Result()
    	if err != nil && err != redis.Nil {
    		panic(err)
    	}
    	i, err := strconv.ParseInt(result, 10, 64)
    	if err != nil {
    		panic(err)
    	}
    	if i < 0 {
    		log.Infof("no more apple...")
    		return
    	}
    	_, err = constant.RedisCli.Set(context.TODO(), constant.AppleKey, i-1, -1).Result()
    	if err != nil && err != redis.Nil {
    		panic(err)
    	}
    	log.Infof("consume success...appleID:%d", i)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    main.go
    package main
    
    import (
    	"context"
    	"github.com/go-redis/redis/v8"
    	"github.com/kataras/iris/v12"
    	context2 "github.com/kataras/iris/v12/context"
    	"myTest/demo_home/redis_demo/distributed_lock/constant"
    	"myTest/demo_home/redis_demo/distributed_lock/service"
    )
    
    func main() {
    	constant.RedisCli = redis.NewClient(&redis.Options{
    		Addr: "localhost:6379",
    		DB:   0,
    	})
    	_, err := constant.RedisCli.Set(context.TODO(), constant.AppleKey, 500, -1).Result()
    	if err != nil && err != redis.Nil {
    		panic(err)
    	}
    	app := iris.New()
    	//xLock := new(sync.Mutex)
    	app.Get("/consume", func(c *context2.Context) {
    		//xLock.Lock()
    		//defer xLock.Unlock()
    		service.GoodsService.Consume()
    
    		c.JSON("ok port:8888")
    	})
    	app.Listen(":8888", nil)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
  • 相关阅读:
    分库分表利器:Sharding-JDBC、TDDL、Mycat选择与应用
    载羟基喜树碱-聚乳酸纳米粒|平均粒径为85nm的葫芦素BE聚乳酸纳米微粒(CuBE- PLA-NP)技术资料
    E-Prime心理学实验设计软件丨产品简介
    CMake篇1: Windows上用CMake编译生成可执行程序
    Redis实现消息队列的4种方案
    DSI及DPHY的学习知识点
    腾讯云的ubuntu系统,远程登录不上,解决方案
    C++变量与基本类型
    http和https区别与上网过程
    css:button实现el-radio效果
  • 原文地址:https://blog.csdn.net/weixin_45565886/article/details/136755738