• 循序渐进 Redis 分布式锁(以及何时不用它)


    场景

    假设我们有个批处理服务,实现逻辑大致是这样的:

    1. 用户在管理后台向批处理服务投递任务;
    2. 批处理服务将该任务写入数据库,立即返回;
    3. 批处理服务有启动单独线程定时从数据库获取一批未处理(或处理失败)的任务,投递到消息队列中;
    4. 批处理服务启动多个消费线程监听队列,从队列中拿到任务并处理;
    5. 消费线程处理完成(成功或者失败)后修改数据库中相应任务的状态;

    流程如图:

    image-20220617163115681

    现在我们单独看看上图中虚线框中的内容(3~6):批处理服务从数据库拉取任务列表投递到消息队列。

    生产环境中,为了高可用,都会部署至少两台批处理服务器,也就是说至少有两个进程在执行虚线框中的流程。

    有什么问题呢?

    假设这两个进程同时去查任务表(这是很有可能的),它俩很可能会得到同一批任务列表,于是这批任务都会入列两次。

    当然,这不是说一个任务入列两次就一定会导致任务被重复执行——我们可以通过多引入一个状态值来解决此问题。

    消费者线程从队列中获取到任务后,再次用如下 SQL 更新任务状态:

    -- status:1-待处理;2-已入列;3-处理中;4-失败待重试;5-彻底失败(不可重试);

    update tasks set status=3 where status=2 and id=$id;

    由于 where 条件有 status=2,即只有原先状态是“已入列”的才能变成“处理中”,如果多个线程同时拿到同一个任务,一定只有一个线程能执行成功上面的语句,进而继续后续流程(其实这就是通过数据库实现的简单的分布式锁——乐观锁)。

    不过,当定时进程多了后,大量的重复数据仍然会带来性能等其他问题,所以有必要解决重复入列的问题。

    有个细节:请注意上图中步骤 5、6,是先改数据库状态为“已入列”,再将消息投递到消息队列中——这和常规逻辑是反过来的。

    能否颠倒 5 和 6 的顺序,先入列,再改数据库状态呢?

    不能。从逻辑上来说确实应该如此,但它会带来问题。消费线程从队列中拿到任务后,会执行如下 SQL 语句:

    update tasks set status=3 where status=2 and id=$id;

    这条 SQL 依赖于前面(第 5 步)产生的状态值,所以它要求在执行该语句的时候,第 5 步的 SQL 语句(将状态改为“已入列”)一定已经执行完了。如果将 5 和 6 颠倒(先入列,再改状态值),就有可能出现下图的执行顺序,导致消费者线程修改状态失败,进而执行不下去:

    image-20220617174423619

    上图中,任务入列后立即被消费线程获取到并去修改数据库,而此时定时线程的 SQL 可能还没执行(可能网络延迟),这就出问题了。

    定时线程先将状态改为“已入列”带来的问题是,如果改状态后(入列前)进程挂了,会导致任务一直处于已入列状态(但实际上未入列),所以还需要搭配其它的超时重试机制。

    上图虚线框中那段逻辑在并发原语中有个专门名称叫“临界区”——我们要做的就是让多个操作者(进程、线程、协程)必须一个一个地(而不能一窝蜂地)去执行临界区内部的逻辑,手段就是加锁:

    var lock = newLock()
    
    // 加锁
    lock.lock()
    
    // 执行临界区的逻辑
    
    // 释放锁
    lock.unlock()
    

    所谓锁,就是多个参与者(进程、线程)争抢同一个共享资源(术语叫“信号量”),谁抢到了就有资格往下走,没抢到的只能乖乖地等(或者放弃)。锁的本质是两点:

    1. 它是一种共享资源,对于多方参与者来说,只有一个,就好比篮球场上只有一个篮球,所有人都抢这一个球;
    2. 对该资源的操作(加锁、解锁)是原子性的。虽然大家一窝蜂都去抢一个球,但最终这个球只会属于某一个人,不可能一半在张三手上,另一半在李四手上。只有抢到球的一方才可以执行后续流程(投篮),另一方只能继续抢;

    在单个进程中,以上两点很容易实现:同一个进程中的线程之间天然是共享进程内存空间的;原子性也直接由 CPU 指令保证。所以单个进程中,我们直接用编程语言提供的锁即可。

    进程之间呢?

    进程之间的内存空间是独立的。两个进程(可能在两台不同的物理机上)创建的锁资源自然也是独立的——这就好比两个篮球场上的两个篮球之间毫不相干。

    那怎样让两个篮球场上的两队人比赛呢?只能让他们去同一个地方抢同一个球——这在编程中叫“分布式锁”。

    有很多实现分布式锁的方案(关系数据库、zookeeper、etcd、Redis 等),本篇单讲用 Redis 来实现分布式锁。


    小试牛刀

    之所以能用 Redis 实现分布式锁,依赖于其两个特性:

    1. Redis 作为独立的存储,其数据天然可以被多进程共享;
    2. Redis 的指令是单线程执行的,所以不会出现多个指令并发地读写同一块数据;
    3. Redis 指令是纯内存操作,速度是微妙级的(不考虑网络时延),性能足够高;

    有些人一想到“单线程-高性能”就条件反射地回答 IO 多路复用,其实 Redis 高性能最主要就是纯内存操作。

    Redis 分布式锁的大体调用框架是这样的:

    image-20220618165057778

    多个进程的多个线程争抢同一把 Redis 锁。

    说到 Redis 分布式锁,大部分人都会想到 setnx 指令:

    // setnx 使用方式
    SETNX key value
    

    意思是:如果 key 不存在(Not eXists),则将 key 设置为 value 并返回 1,否则啥也不做并返回 0——也就是说, key 只会被设置一次,利用这个特性就可以实现锁(如果返回 1 表示加锁成功,0 则说明别人已经加锁了,本次加锁失败)。

    我们写下伪代码:

    // 获取 redis client 单例
    var redis = NewRedisClient(redisConf);
    
    // 通过 SETNX 指令加锁
    func lock(string lockKey) bool {
        result = redis.setnx(lockKey, 1);
        return bool(result);
    }
    
    // 通过 DEL 指令解锁
    func unlock(string lockKey) {
        redis.del(lockKey);
    }
    

    上面的定时任务进程中这样使用:

    var lockKey = "batch:task:list"
    
    // 上锁
    if (!lock(lockKey)) {
        // 获取锁失败,直接返回
        return false;
    }
    
    try {
        // 查询数据库获取待处理任务列表
        // 更新任务状态
        // 入列
    } finally {
        // 解锁
        unlock(lockKey);	
    }
    

    很简单!半小时搞定,上线!


    第一次懵逼

    上线没跑几天就出问题了:任务无缘无故地不执行了,消息队列中很长时间没接收到消息了。

    分析了半天,我们发现 Redis 中一直存在 batch:task:list 这条记录,没人去删除它!

    盯着代码我们突然发现问题所在:这个 key 压根没有过期时间!也就是说,如果程序不 DEL 它就永远存在。

    估计某进程在执行 unlock 之前崩溃了(或者哪个愣头青执行了 kill -9 ?),或者 unlock 时发生了网络问题,或者 Redis 宕机了?总之 DEL 没执行,于是这个锁永远得不到释放!

    好办,加上过期时间呗:

    ...
    
    // 通过 SETNX 指令加锁
    // 加上过期时间,单位毫秒
    func lock(string lockKey, int ttl = 3000) bool {
        // 加锁
        result = redis.setnx(lockKey, 1);
        // 设置过期时间(毫秒)
        redis.pexpire(lockKey, ttl);
    	
        return bool(result);
    }
    
    ...
    

    这段代码有什么问题呢?

    这里通过两次网络请求执行了两条 Redis 指令:setnx 设置 KV,expire 设置超时时间——我们前面说锁操作必须具备原子性,但这两条操作谁也不能保证要么都成功要么都失败啊。假如第一条指令(setnx)执行成功了,但 expire 由于网络原因或者进程崩溃导致执行失败了呢?此时同样会出现上面那个懵逼的问题啊。

    我们可以用 Lua 脚本实现 setnx 和 expire 操作的原子性,不过 Redis 2.6.12 版本后可以用 SET 指令搞定:

    // 2.6.12 后的 SET 指令格式
    // 现在的 SET 指令相当强大也相当复杂,可以替代 SETNX, SETEX, PSETEX, GETSET, 此处只写出跟分布式锁有关的
    // 其中两个可选参数:
    // -- NX 表示 Not eXists,就是 SETNX 的意思;
    // -- PX 是 PEXPIRE 的意思,表示设置 key 的过期时间(毫秒);
    
    SET key value [NX] [PX milliseconds]
    

    改下 Lock 代码:

    // 加锁
    func lock(string lockKey, int ttl = 3000) bool {
        // Set 函数参数对应上面的命令格式
        result = redis.set(lockKey, 1, "NX", "PX", ttl);
    	
        return bool(result);
    }
    

    如此,加了过期时间防止锁无法释放,还保证了加锁操作的原子性,妥了,上线!


    第二次懵逼

    第二次上线没多久又出现了灵异事件:偶尔会出现一批任务重复入列——敢情这锁加了个寂寞?

    各种打日志,终于发现了端倪:有个进程加锁 3.5 秒后才解锁,而且解锁成功了——但我们设置的锁超时时间是 3 秒啊!

    也就是说,这个线程解的是别的线程的锁!

    // 通过 DEL 指令解锁
    // 这里直接调 Redis 的 DEL 指令删除 lockKey,并没有判断该 lockKey 的值是不是本进程设置的
    // 所以在有 TTL 的情况下,删的可能是别的线程加的锁
    func unlock(string lockKey) {
        redis.del(lockKey);
    }
    

    和进程内的本地锁不同的是,Redis 分布式锁加入超时机制后,锁的释放就存在两种情况:

    1. 加锁者主动释放;
    2. 超时被动释放;

    所以解锁(DEL)之前需要判断锁是不是自己加的,方法是在加锁的时候生成一个唯一标识。之前我们 SET key value 时 value 给的是固定值 1,现在我们换成一个随机值:

    // Redis 分布式锁
    // 封装成类
    // 该类实例不具备线程安全性,不应跨线程使用
    class Lock {
        private redis;
        private name;
        private token;
        private ttl;
        private status;
    	
        const ST_UNLOCK = 1;
        const ST_LOCKED = 2;
        const ST_RELEASED = 3;
    	
        public function Lock(Redis redis, string name, int ttl = 3000) {
            this.redis = redis;
    	this.name = name;
    	this.token = randStr(16);// 生成 16 字节随机字符串
    	this.ttl = ttl;
    	this.status = self::ST_UNLOCK;
        }
    	
        // 加锁
        public function lock() bool {
    	if (this.status != self::ST_UNLOCK) {
    	    return false;
    	}
    		
    	// 使用 SET 命令加锁
    	// value 不再传 1,而是设置成构造函数中生成的随机串
    	try {
    	    result = redis.set(this.name, this.token, "NX", "PX", this.ttl);
    	    if (bool(result)) {
    	        this.status = self::ST_LOCKED;
    	        return true;
    	    }
    	} catch (Exception e) {
    	    return false;
    	}
    	
    	return false;
        }
    	
        // 解锁
        public function unlock() {
    	if (this.status != self::ST_LOCKED) {
    	    return;
    	}
    		
    	// 执行 DEL 之前需要用 GET 命令判断 KEY 的值是不是当前的 token
    	// 由于需要执行 GET 和 DEL 两条指令,而锁操作必须保证原子性,需要用 Lua 脚本
    	// 脚本中通过 redis.call() 执行 Redis 命令
    	// 注意 Lua 脚本数组下标从 1 开始
    	// 这段脚本的意思是:
    	// 如果 key 的值是 token,则 DEL key,否则啥也不做
    	var lua = "
    		if (redis.call('get', KEYS[1]) == ARGV[1]) then
                        redis.call('del', KEYS[1]);
                     end
                     return 1;
    	";
    		
    	// 调 Redis 的 EVEL 指令执行 Lua 脚本
    	// EVAL 指令格式:
    	// EVEL script numkeys key1,key2,arg1,arg2... 
    	// -- script: Lua 脚本
    	// -- numkeys: 说明后面的参数中,有几个是 key,这些 key 后面的都是参数
    	// 比如:EVAL "redis.call('set', KEYS[1], ARGV[1])" 1 mykey hello
    	// 等价于命令 SET mykey hello
    	// 参见:https://redis.io/commands/eval/
    	redis.eval(lua, 1, this.name, this.token);
    	this.status = self::ST_RELEASED;
        }
    }
    

    业务调用:

    lock = new Lock(redis, "batch:task:list");
    
    try {
        if (!lock.lock()) {
    	return false;
        }
    	
        // 加锁成功,执行业务
    } finally {
        lock.unlock();
    }
    

    上面这段代码实现了:

    1. 加锁的时候设置了过期时间,防止进程崩溃而导致锁无法释放;
    2. 解锁的时候判断了当前的锁是不是自己加的,防止释放别人的锁;
    3. 加锁和解锁操作都具备原子性;

    这段代码已经是生产可用了,第三次上线。

    不过,还是有些优化需要做的。


    优化一:锁等待

    上面的 lock() 方法中,如果获取锁失败则直接返回 false,结束执行流,这可能不能满足某些业务场景。

    在本地锁场景中,如果获取锁失败,线程会进入阻塞等待状态——我们希望分布式锁也能提供该功能。

    我们在加锁失败时增加重试功能:

    class Lock {
        // 重试间隔:1 秒
        const RETRY_INTERVAL = 1000;
        // ...
        // 重试次数(包括首次)
        private retryNum;
    	
        // retryNum: 默认只执行一次(不重试)
        public function Lock(Redis redis, string name, int ttl = 3000, int retryNum = 1) {
            ...
    	// 做下防御
    	if (retryNum < 0 || retryNum > 20) {
    	    retryNum = 1;
    	}
    	this.retryNum = retryNum;
        }
    	
        // 加锁
        public function lock() bool {
            if (this.status != self::ST_UNLOCK) {
    	    return false;
    	}
    		
            // 使用 SET 命令加锁
    	// 加入重试机制
    	for (i = 0; i < this.retryNum; i++) {
    	    try {
                    result = redis.set(this.name, this.token, "NX", "PX", this.ttl);
    		if (bool(result)) {
    		    // 加锁成功,返回	
    		    this.status = self::ST_LOCKED;
    		    return true;
    		}
    	    } catch (Exception e) {
                }
    			
    	    // 加锁失败了,等待一定的时间后重试
    	    // 当前线程/协程进入休眠
    	    sleep(self::RETRY_INTERVAL);
    	}
    
    	return false;
        }
    }
    

    优化二:锁超时

    我们再回头看看上面的加锁逻辑,其核心代码如下:

    public function lock() bool {
       // ...
        result = redis.set(this.name, this.token, "NX", "PX", this.ttl);
        if (bool(result)) {
            // 加锁成功,返回	
            this.status = self::ST_LOCKED;
    	return true;
        }
        //...
    }
    

    这段代码有没有什么问题呢?

    想象如下的加锁场景:

    // 锁超时时间是 2 秒
    var lock = new Lock(redis, name, 2000);
    
    if (lock.lock()) {
        // 加锁成功,加锁用时 2.5 秒
        try {
    	// 执行业务逻辑
        } finally {
    	// 解锁
    	lock.unlock();
        }
    }
    

    如上,我们创建一个有效期 2 秒的锁,然后调 Redis 命令加锁,该过程花了 2.5 秒(可能网络抖动)。

    对于本线程来说,得到加锁成功的返回值,继续往下执行。

    但此时该 lockKey 在 Redis 那边可能已经过期了,如果此时另一个线程去拿锁,也会成功拿到锁——如此锁的作用便失效了。

    image-20220618150108303

    所以,在 lock() 方法中,调 Redis 上锁成功后,需要判断上锁用时,如果时间超过了锁的有效期,则应视为上锁无效,如果有重试机制,则重试:

    class Lock {
        // 加锁
        public function lock() bool {
            if (this.status != self::ST_UNLOCK) {
    	    return false;
    	}
    		
    	for (i = 0; i < this.retryNum; i++) {
    	    try {
    	        // 上锁之前,保存当前毫秒数
    		var startTime = getMillisecond();
    		// 上锁
    		result = redis.set(this.name, this.token, "NX", "PX", this.ttl);
    		// 上锁后,计算使用的时间
    		var useTime = getMillisecond() - startTime;
    				
    		// 加锁成功条件:Redis 上锁成功,且所用的时间小于锁有效期
    		if (bool(result) && useTime < this.ttl) {
    	            // 加锁成功,返回	
    		    this.status = self::ST_LOCKED;
    		    return true;
    		}
    	    } catch (Exception e) {}
    			
    	    // 加锁失败了,等待一定的时间后重试
    	    // 当前线程/协程进入休眠
    	    sleep(self::RETRY_INTERVAL);
            }
    
    	return false;
        }
    }
    

    如上,在判断条件中增加了加锁用时的判断。

    这段代码还有问题吗?

    有的。

    我们用 Redis 的 SET NX 命令加锁,该命令如果发现 key 已经存在,则直接返回 0,加锁失败。

    在上面的失败重试逻辑中,如果是因为加锁用时超限导致的失败(锁有效期是 2 秒,结果加锁操作用了 2.5 秒),此时我们并不能切确知道在 Redis 那边该 key 是否真的已经失效了,如果没有失效(比如来去网络用时各 1.24 秒,此时该 key 并没有失效),那么下一次的重试会因 SET NX 的机制而失败。

    所以我们不能用 SET NX 加锁,只能用普通的 SET + Lua 脚本来实现:

    class Lock {
        // 加锁
        public function lock() bool {
            if (this.status != self::ST_UNLOCK) {
    	    return false;
    	}
    		
    	// 加锁的 Lua 脚本
    	// 注意 Lua 中的注释不是用 // 或者 /**/,而是用 --
    	// 参数说明:
    	// KEYS[1]: lockKey
    	// ARGV[1]: token
    	// ARGV[2]: ttl 毫秒
    	var lua = "
    		local val = redis.call('get', KEYS[1]);
    		if (not val) then
    		    -- 没有设置,则直接设置
    		    return redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2]);
    		else
    		    -- 存在,则比较 val 是否等于 token
    		    if (val == ARGV[1] ) then
    		        -- 该 key 就是当前线程设置的
    			-- 延长其 TTL
    			return redis.call('pexpire', KEYS[1], ARGV[2]);
    		    else
    		        -- 其他线程上的锁
    			return 0;
    		    end
    		end
    	";
    		
    	for (i = 0; i < this.retryNum; i++) {
                // 加锁逻辑同上
    	}
    
    	return false;
        }
    }
    

    如此,便解决了加锁超时导致的竞态问题——但只解决了一半。

    设想这样的场景:

    进程 A 加了一个有效期 5 秒的锁,加锁成功后执行业务逻辑,业务逻辑执行耗时 10 秒——就是说,在业务逻辑执行到差不多一半的时候锁就失效了,此时别的进程就可以抢到锁了,这就会导致竞态问题。

    有两种解决方案:

    1. 设置个较长的过期时间。这是最简单的(而且也很有效)。比如我们预估 99% 的处理时间不超过 2 秒,则将锁有效期设置为 10 秒。该方案最大的缺点是一旦进程崩溃导致无法主动释放锁,就会导致其他进程在很长一段时间内(如 10 秒)无法获得锁,这在某些场景下可能是非常严重的。
    2. 搞个定时任务线程,定时延长锁的有效期。

    方案二伪代码如下:

    // 带 Refresh 版本的分布式锁
    class Lock {
        private redis;
        private name;
        private ttl;
        private token;
        private retryNum;
        private status;
        // 定时器
        private timer;
    	
        // 锁状态:1 未加锁;2 已加锁;3 已释放
        const ST_UNLOCK = 1;
        const ST_LOCKED = 2;
        const ST_RELEASED = 3;
    	
        // 刷新状态:
        //	4 刷新成功;
        //	5 非法(key 不存在或者不是本线程加的锁)
        //	6 刷新失败(Redis 不可用)
        const RF_SUC = 4;
        const RF_INVALID = 5;
        const RF_FAIL = 6;
    	
        // 构造函数
        public function Lock(Redis redis, string name, int ttl = 2000, int retryNum = 1) {
            ...
        }
    	
        // 加锁
        // 加锁成功后启动定时器
        public function lock() bool {
            if (this.status != self::ST_UNLOCK) {
                return false;
    	}
    		
    	// 加锁的 Lua 脚本,同前面的
    	lua = "...";
    		
    	for (i = 0; i < this.retryNum; i++) {
    	    var startTime = getMillisecond();
    	    try {
    	        // 执行 Lua 脚本上锁
    		result = this.redis.eval(lua, 1, this.name, this.token, this.ttl);
    		var useTime = getMillisecond() - startTime;
    				
    		if (bool(result) && useTime < this.ttl) {
    		    // 加锁成功
    		    this.status = self::ST_LOCKED;
    		    // 启动定时器
    		    this.tick();
    					
    		    return true;
    		}
    	    } catch (Exception e) {
    		// Redis 不可用
    	    }
    			
    	    // 失败重试
    	    sleep(RETRY_INTERVAL);
            }
    
    	return false;
        }
    	
        // 启动定时器,定时刷新过期时间
        private function tick() {
            this.timer = startTimerInterval(
    	    this.ttl / 3,
    	    function () {
    	        result = this.refresh();
    		if (result == self::RF_INVALID) {
    		    // key 不存在,或者该锁被其他线程占用
    		    // 停掉定时器
    		    this.timer.stop();
    		}
    	    }
    	);
        }
    	
        // 释放锁
        // 需要停掉定时器
        public function unlock() {
            if (this.status != self::ST_LOCKED) {
    	    return;
    	}
    		
            // 释放锁的 Lua 脚本,同前
    	var lua = "...";
    		
    	try {
    	    this.redis.eval(lua, 1, this.name, this.token);
            } catch (Exception e) {} finally {
    	    this.status = self::ST_RELEASED;
    	    // 停掉定时器
    	    this.timer.stop();
    	}
        }
    	
        // 刷新锁过期时间
        private function refresh() int {
            if (this.status != self::ST_LOCKED) {
    	    return self::RF_INVALID;
    	}
    		
    	var lua = "
    	    -- key 存在而且其值等于 token 才刷新过期时间
    	    if (redis.call("get", KEYS[1]) == ARGV[1]) then
    	        return redis.call("pexpire", KEYS[1], ARGV[2])
    	    else
    	        return 0
    	    end
    	";
    		
            try {
    	    result = this.redis.eval(lua, 1, this.name, this.token, this.ttl);
    	    if (result == 0) {
    	        // key 不存在或者是别人加的锁
    		return self::RF_INVALID;
    	    } else {
    	        // 刷新成功
    		return self::RF_SUC;
    	    }
    	} catch (Exception e) {
    	    // Redis 不可用
    	    return self::RF_FAIL;
    	}
        }
    }
    

    如上,加锁成功后创建一个单独的定时器(独立的线程/协程)刷新锁的 TTL,只要锁没被主动释放(而且进程没有崩溃),就会不停地续命,保证不会过期。此时,我们就能在加锁时选择一个比较小的过期时间(比如 2 秒),一旦进程崩溃,其他进程也能较快获得锁。

    上面定时器时间为何选择 ttl/3 呢?

    假设锁过期时间(ttl)为 6 秒,由上面 lock() 函数逻辑可知,加锁耗时不可能超过 6 秒(超过就会判定为加锁失败)。我们假设某次加锁耗时比 6 秒小那么一丢丢(也就是近似 6 秒),接下来什么时候发起第一次刷新才能保证 Redis 那边的 key 不过期呢?极端情况下必须立即刷新(如果考虑刷新时的网络时延,就算立即刷新也不一定能保证)。

    不过我们考虑的是一般情况。我们可以认为 6 秒耗时都花在网络上(Redis 本身执行时间可以忽略不计),然后再近似认为这 6 秒被来去均摊,各花 3 秒,因而当我们接收到 Redis 的响应时,该 key 在 Redis 那边的 TTL 已经用掉了一半,所以定时间隔必须小于 ttl/2,再将刷新时的网络时延考虑进去,取 ttl/3 或者 ttl/4 比较合适。

    就算有了 refresh 机制,也不能说是万无一失了。

    考虑 Redis 宕机或者网络不通的情况。

    假设线程 A 加锁(ttl=2s)后不久 Redis 就宕机了(或者该业务服所在网络发生分区导致网络不通),宕机期间 refresh 会失败。2s 后 Redis 重启恢复正常,此时线程 A 设置的那个 key 已经过期了,其他线程就能够获取锁,如果线程 A 的执行时间超过 2s,就和其他线程产生竞态。

    refresh 机制解决不了该问题,要用其他手段来保证 Redis 和锁的高可用性,如 Redis 集群、官方提供的 Redlock 方案等。


    可重入性

    一些语言(如 java)内置可重入锁,一些语言(如 go)则不支持。

    我们通过代码说下可重入锁是什么:

    var lock = newLock();
    
    // 在同一个线程中, foo() 调 bar()
    // 函数 foo() 和 bar() 都在竞争同一把锁
    
    function foo() {
        lock.lock();
        ...
        bar();
        ...
        lock.unlock();
    }
    
    function bar() {
        lock.lock();
        // do something
        lock.unlock();
    }
    

    如上,同一个线程中 foo() 调 bar(),由于 foo() 调 bar() 之前加了锁,因而 bar() 中再竞争该锁时就会一直等待,导致 bar() 函数执行不下去,进而导致 foo() 函数无法解锁,于是造成死锁。

    如果上面的 lock 是一把可重入锁,bar() 就会加锁成功。

    实现原理是:加锁的 lock() 方法中会判断当前这把锁被哪个线程持有,如果持有锁的线程和现在抢锁的线程是同一个线程,则视为抢锁成功(这锁本来就是被它持有的嘛,抢啥呢)。

    由于 foo() 和 bar() 是在同一个线程中调用的,所以他俩都会加锁成功。

    锁是加成功了,解锁呢?bar() 中的 unlock() 要怎么处理呢?直接把锁释放掉?不行啊,foo() 中的 unlock() 还没执行呢,bar() 虽然用完锁了,但 foo() 还没用完啊,你 bar() 三下五除二把锁给释放了,其他线程拿到锁,不就和 foo() 中代码构成竞态了吗?

    所以可重入锁采用信号量的思想,在内部维持了两个属性:threadid 表示哪个线程持有锁;lockNum 表示持有线程加了几次锁。同一个线程,每 lock() 一次 lockNum 加 1,每 unlock() 一次 lockNum 减 1,只有 lockNum 变成 0 了才表示这把锁真正释放了,其他线程才能用。

    原理讲完了,但你不觉得上面的代码很怪吗?

    既然 foo() 已经加锁了,bar() 为何还要加同一把锁呢?

    在某些情况下这样做可能是有原因的,但大多数情况下,这个问题可以从设计上解决,而不是非要引入可重入锁。

    比如我们可以将 bar() 声明为非线程安全的,将加锁工作交给调用者,同时限制 bar() 的可见域,防止其被滥用。

    go 语言不支持可重入锁的理由就是:当你的代码需要用可重入锁了,你首先要做的是审视你的设计是否有问题。

    可重入锁的便捷性可能会带来代码设计上的问题。

    所以本篇并不打算去实现可重入能力——虽然实现起来并不难,无非是将上面讲的原理在 Redis 上用 Lua 脚本实现一遍而已。


    不是银弹

    有了锤子,全世界都是钉子。

    分布式锁看似是颗银弹,但有些问题用其他方案会比分布式锁要好。

    我们看看秒杀扣库存的例子。

    网上很多讲分布式锁的文章都拿秒杀扣库存来举例。

    秒杀场景为了应对高并发,一般会将秒杀商品库存提前写入到 Redis 中,我们假设就用字符串类型存商品库存:

    // Redis 命令,设置商品 id=1234 的库存 100 件
    set seckill.stock.1234 100
    

    另外一个用户只能参加一次秒杀,所以扣库存前需要判断该用户是否已经参加了(防止羊毛党薅羊毛)。

    扣库存逻辑是这样的:

    var stockKey = "seckill.stock.1234";
    var userKey = "seckill.ordered.users";
    var lock = new Lock(redis, "seckill");
    
    // 此处省略活动时间的判断
    
    try {
        // 加分布式锁
        lock.lock();
    	
        // 判断库存
        var stockNum = redis.get(stockKey);
        if (stockNum <= 0) {
            // 库存不足
            return false;
        }
    	
        // 判断用户是否已经参加过
        if (redis.sismember(userKey, userId)) {
            return false;
        }
    	
        // 扣库存
        if (redis.decr(stockKey) >= 0) {
            // 下单
    	...
        } else {
            return false;
        }
    	
        // 将用户加入到已参加集合中
        redis.sadd(userKey, userId);
        return true;
    } catch (Exception e) {
        // 异常
    } finally {
        // 解锁
        lock.unlock();
    }
    

    以上逻辑为何要用分布式锁呢?

    假设不用分布式锁,羊毛党同时发了十个请求(同一个用户),由于 redis.sismember(userKey, userId) 判断都会返回 0,于是都能扣库存下单,羊毛薅了一地。

    但该场景有没有更优的解决方案呢?

    我们使用分布式锁是为了保证临界区代码(lock 保护的区域)执行的原子性——不过 Redis 的原子性还可以通过 Lua 脚本来实现吧。

    上面代码一共进行了 6 次 Redis 交互,假设每次用时 50ms,光 Redis 交互这块就用了 0.3s 的时间。

    如果我们将这些逻辑封装成 Lua 脚本,只需要一次 Redis 交互就能保证原子性:

    var lua = "
        -- 参数说明:
        --	KEYS[1]: actKey
        --	KEYS[2]: userKey
        --	KEYS[3]: stockKey
        --	ARGV[1]: userId
    
        -- 判断活动时间
        -- (事先将活动的关键信息保存到 Redis hash 中)
        -- 取活动的开始和结束时间
        local act = redis.call('hmget', KEYS[1], 'start', 'end');
        local now = redis.call('time')[1];
        if (not act[1] or now < act[1] or now >= act[2])
        then
            return 0;
        end
    	
        -- 判断库存
        local stock = redis.call('get', KEYS[3]);
        if (not stock or tonumber(stock) <= 0)
        then
            return 0;
        end
    	
        -- 判断用户是否已经参与过
        if (redis.call('sismember', KEYS[2], ARGV[1]) == 1)
        then
            return 0;
        end
    	
        -- 扣库存
        if (redis.call('decr', KEYS[3]) >= 0)
        then
            -- 加入用户
            return redis.call('sadd', KEYS[2], ARGV[1]);
        else
            return 0;
        end
    ";
    
    var actKey = "seckill.act."+actId;
    var userKey = actKey + ".users";
    var stockKey = actKey + ".stock." + goodsId;
    
    if (redis.eval(lua, 3, actKey, userKey, stockKey, userId)) {
        // 扣库存成功,下单
        ...
    }
    

    上面的脚本还可以先缓存到 Redis 服务器中,然后用 evalsha 命令执行,这样客户端就不用每次都传这么一大坨代码,进一步提升传输性能。


    总结

    本篇我们从 setnx 命令开始实现了一个最简单的分布式锁,而后通过实际使用发现其存在各种缺陷并逐步增强其实现,主要涉及到以下几个方面:

    1. 被动释放。进程崩溃后,进程本地锁自然会销毁,但 Redis 锁不会。所以要加 TTL 机制,防止因加锁者崩溃而导致锁无法释放;
    2. 属主。线程不能释放别的线程的锁;
    3. 锁等待。加锁失败时可以等待一段时间并重试,而不是立即返回;
    4. 保活。通过定时刷新锁的 TTL 防止被动释放;

    不难发现,分布式锁比进程内本地锁要复杂得多,也重得多(本地锁操作是纳秒级别,分布式锁操作是毫秒级别),现实中,在使用分布式锁之前我们要思考下有没有其它更优方案,比如乐观锁、Lua 脚本等。

    另外需要注意的是,分布式锁只能解决多进程之间的并发问题,并不能实现数据操作的幂等性。一个例子是增减积分的操作。

    增加积分的例子:

    // 给用户增加积分
    // sourceType、sourceId:积分来源标识,如消费赠送积分场景的 orderCode
    // 幂等性:同样的 userId-sourceType-sourceId 不能重复加积分
    function addBonus(userId, sourceType, sourceId, bonus) {
        // 加分布式锁
        var lock = new Lock(...);
    	
        try {
            if (!lock.lock()) {
                return false;
            }
    		
            // 检查是否重复
            if (isRepeat(userId, sourceType, sourceId)) {
                return false;
            }
    		
            // 加积分
            add(userId, sourceType, sourceId, bonus);
        } finally {
            lock.unlock();
        }
    }
    

    上面分布式锁的作用是防止并发请求(调用端 bug?薅羊毛?),而该操作的幂等性是由 isRepeat() 保证的(查数据库)。

    保障幂等性一般有悲观锁和乐观锁两种模式。

    上面这种属于悲观锁模式(把整个操作锁起来),另一种乐观锁实现方式是给 userId-sourceType-sourceId 加上组合唯一键约束,此时就不需要加分布式锁,也不需要 isRepeat() 检测,直接 add(userId, sourceType, sourceId, bonus) 就能搞定。

    最后说下文中为啥使用伪代码(而不是用具体某一门编程语言实现)。

    用伪代码的最主要目的是省去语言特定的实现细节,将关注点放在逻辑本身。

    比如 redis 客户端,不同语言有不同的使用方式,就算同一门语言的不同类库用法也不同,有些语言的类库用起来又臭又长,影响心情。

    伪代码不受特定语言约束,用起来自由自在,本文中 redis 客户端的使用方式和 Redis 官方的原始命令格式完全一致,没有额外的心智负担。

    再比如生成 token 的随机字符串函数 randStr(),go 语言要这样写:

    func randStr(size int) (string, error) {
        sl := make([]byte, size)
        if _, err := io.ReadFull(rand.Reader, sl); err != nil {
            return "", err
        }
        return base64.RawURLEncoding.EncodeToString(sl), nil
    }
    

    代码虽然不多,但没玩过 go 的小伙伴看到这儿心里是不是要起伏那么两三下?但这玩意怎么实现跟本文的主题没半毛钱关系。

    相反,本文的 lua 脚本都是货真价实的,测试通过的——因为这是本文的核心啊。

    伪代码的缺点是它不能“拎包入住”,但本文的重点并不是要写个源码库——我们没必要真的自己写一个,直接用 redission 或者其他什么库不香吗?

    本文的重点在于分析 Redis 分布式锁的原理,分布式锁面临哪些问题?解决思路是什么?使用时要注意什么?知其然知其所以然。

    当你不知其所以然时,很多东西显得特高大上,什么“看门狗”,搞得神乎其神,当搞明白其原理和目的时,也就那么回事。




  • 相关阅读:
    大数据相关积累
    浏览器发送请求的方法
    中间件Weblogic12.2.1.4与iServer 8C 10i兼容性问题解决过程分享
    can 光纤 光纤 can 认识can光纤转换器的接口定义、终端电阻和指示灯
    微服务SpringBoot 整合Redis 实现点赞、点赞排行榜
    HTTP协议详解
    猿创征文|分布式事务常见解决方案
    SpringCloud 06 Ribbon
    css实现input搜索框展开动画
    java中循环遍历某个文件夹下面的文件,不压缩自身的文件夹,然后压缩成tar.gz格式,压缩失败报异常,代码类编写?
  • 原文地址:https://www.cnblogs.com/linvanda/p/16393316.html