• redis分布式锁和看门狗的实现


    redis分布式锁和看门狗的实现

    • 分布式应用进行逻辑处理时经常会遇到并发问题。
    • 比如一个操作要修改用户的状态,修改状态需要先读出用户的状态,在内存里进行修 改,改完了再存回去。如果这样的操作同时进行了,就会出现并发问题,因为读取和保存状 态这两个操作不是原子的。(Wiki 解释:所谓原子操作是指不会被线程调度机制打断的操 作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch 线程切换。)

    在这里插入图片描述

    演示不使用分布式锁

    • 以下均使用python代码进行演示

    逻辑:开启多线程模拟并发存储数据,在最后阶段取出数据与redis中存储的数据进行比对

    import threading
    import redis as _redis
    redis = _redis.Redis(host='localhost', port=6379, decode_responses=True, db=11)
    
    
    def storage_redis(value):
        redis.set('age', value)
        
    
    if __name__ == '__main__':
        for i in [1,2,3,4,5]:
            t = threading.Thread(target=storage_redis, args=(i,))
            t.start()
        print('--------------------')
        print(redis.get('age'))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 启动脚本运行,结果如下图

    在这里插入图片描述

    • 看一看,是不是直接裂开🙃,数据存的和取的结果根本不一样

    像这种应该办,这个时候就需要用到分布式锁了,类似于MySQL中悲观锁与乐观锁

    redis分布式锁实现

    • 分布式锁本质上要实现的目标就是在 Redis 里面占一个“茅坑”,当别的进程也要来占 时,发现已经有人蹲在那里了,就只好放弃或者稍后再试。

    • 占坑一般是使用 setnx(set if not exists) 指令,只允许被一个客户端占坑。先来先占, 用 完了,再调用 del 指令释放茅坑。

    • 但是有个问题,如果逻辑执行到中间出现异常了,可能会导致 del 指令没有被调用,这样 就会陷入死锁,锁永远得不到释放

    • 于是我们在拿到锁之后,再给锁加上一个过期时间,比如 5s,这样即使中间出现异常也 可以保证 5 秒之后锁会自动释放。

    • 但是以上逻辑还有问题。如果在 setnx 和 expire 之间服务器进程突然挂掉了,可能是因 为机器掉电或者是被人为杀掉的,就会导致 expire 得不到执行,也会造成死锁。

    • 这种问题的根源就在于 setnx 和 expire 是两条指令而不是原子指令。如果这两条指令可 以一起执行就不会出现问题。也许你会想到用 Redis 事务来解决。但是这里不行,因为 expire 是依赖于 setnx 的执行结果的,如果 setnx 没抢到锁,expire 是不应该执行的。事务里没有 ifelse 分支逻辑,事务的特点是一口气执行,要么全部执行要么一个都不执行。

    • 为了治理这个乱象,Redis 2.8 版本中作者加入了 set 指令的扩展参数,使得 setnx 和 expire 指令可以一起执行,彻底解决了分布式锁的乱象。从此以后所有的第三方分布式锁 library 可以休息了。 > set lock:codehole true ex 5 nx OK … do something critical … > del lock:codehole 上面这个指令就是 setnx 和 expire 组合在一起的原子指令,它就是分布式锁的奥义所在

    import threading
    import time
    import uuid
    import redis as _redis
    from redis import WatchError
    from threading import Timer
    redis = _redis.Redis(host='localhost', port=6379, decode_responses=True, db=11)
    
    class DistributedLocks(object):
        """
        Redis分布式锁实现类
        """
        lock_timeout = None
        lock_name = None
        def __init__(self,coon,lock_name=None , acquire_timeout=3 , lock_timeout=3, **kwargs):
            """
            coon --> redis的连接
            lock_name --> 锁的名称
            acquire_timeout -->  获取锁的超时时间(秒)
            lock_timeout -->  锁的超时时间(秒)
            """
            self.coon = coon
            if not lock_name:
                lock_name = 'default'
            self.lock_name = f'lock:{lock_name}'
            self.rem_ark = str(uuid.uuid4())
            self.lock_timeout = lock_timeout
            self.end = time.time() + acquire_timeout
            DistributedLocks.lock_timeout = self.lock_timeout
            DistributedLocks.lock_name = self.lock_name
    
        def _lock(self) -> bool:
            """
            加锁
            """
            while time.time() <= self.end:
                # 如果锁不存在,设置锁并给锁加上超时时间
                if self.coon.set(self.lock_name,self.rem_ark,ex=self.lock_timeout,nx=True):
                    return True
                time.sleep(0.001)
            return False
    
        def _unlock(self) -> bool:
            """
            释放锁 方法一事务
            """
            # 使用redis中的事务pipeline
            pipe = self.coon.pipeline(True)
            while True:
                try:
                    # 通过watch监视锁名称,如果锁名称一旦改变,就抛出WatchError
                    pipe.watch(self.lock_name)
                    rem_rak = self.coon.get(self.lock_name)
                    if rem_rak and rem_rak == self.rem_ark:
                        pipe.multi()
                        pipe.delete(self.lock_name)
                        pipe.excute()
                        return True
    
                    pipe.unwatch()
                    break
    
                except WatchError:
                    pass
    
            return False
    
        def release_lock(self):
            """
            释放锁 方法二lua脚本
            """
            unlock_lua = """
            if redis.call("get",KEYS[1]) == ARGV[1] then
                return redis.call("del",KEYS[1])
            else
                return 0
            end
            """
            unlock = self.coon.register_script(unlock_lua)
            result = unlock(key=[self.lock_name,],args=[self.rem_ark])
            if result:
                return True
            return False
    
        def __enter__(self):
                self._lock()
    
    
        def __exit__(self, exc_type, exc_val, exc_tb):
    
            try:
                # self._unlock()
                self.release_lock()
            except Exception:
                return False
    
    def storage_redis(value):
        # 加锁与释放锁使用类DistributedLocks来实现
        DL = DistributedLocks(redis,'test') # 实例化分布式锁
        with DL:
            redis.set('age', value)
        print(redis.get('age'))
            
            
    if __name__ == '__main__':
        for i in range(5):
            t = threading.Thread(target=storage_redis, args=(i,))
            t.start()
        print('--------------------')
        print(redis.get('age'))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110

    这样看下来就不会发生数据错乱的问题了。但是还有一个问题,超时问题

    Redis 的分布式锁不能解决超时问题,如果在加锁和释放锁之间的逻辑执行的太长,以至 于超出了锁的超时限制,就会出现问题。因为这时候锁过期了,第二个线程重新持有了这把锁, 但是紧接着第一个线程执行完了业务逻辑,就把锁给释放了,第三个线程就会在第二个线程逻 辑执行完之间拿到了锁。

    为了避免这个问题,Redis 分布式锁不要用于较长时间的任务。如果真的偶尔出现了,数据出现的小波错乱可能需要人工介入解决

    这个时候就需要看门狗来做了

    redis看门狗的实现

    import threading
    import time
    import uuid
    import redis as _redis
    from redis import WatchError
    from threading import Timer
    redis = _redis.Redis(host='localhost', port=6379, decode_responses=True, db=11)
    
    
    class WatchDog(object):
        """
        Python实现redis的看门狗机制
        """
        def __init__(self,timeout,user_handler=None):
            self.timeout  = timeout
            self.user_handler = user_handler if user_handler else self.default_handler
            self.timer = Timer(self.timeout,self.user_handler)
            self.timer.start()
    
        def reset(self):
            """计时器重启"""
            self.timer.cancel()
            timer = Timer(self.timeout,self.user_handler)
            timer.start()
    
        def stop(self):
            """计时器停止"""
            self.timer.cancel()
    
        def default_handler(self):
            pass
    
        @classmethod
        def error_handler(cls):
            pass
    
    
    class DistributedLocks(object):
        """
        Redis分布式锁实现类
        """
        lock_timeout = None
        lock_name = None
        def __init__(self,coon,lock_name=None , acquire_timeout=3 , lock_timeout=3, **kwargs):
            """
            coon --> redis的连接
            lock_name --> 锁的名称
            acquire_timeout -->  获取锁的超时时间(秒)
            lock_timeout -->  锁的超时时间(秒)
            """
            self.coon = coon
            if not lock_name:
                lock_name = 'default'
            self.lock_name = f'lock:{lock_name}'
            self.rem_ark = str(uuid.uuid4())
            self.lock_timeout = lock_timeout
            self.end = time.time() + acquire_timeout
            DistributedLocks.lock_timeout = self.lock_timeout
            DistributedLocks.lock_name = self.lock_name
    
        def _lock(self) -> bool:
            """
            加锁
            """
            while time.time() <= self.end:
                # 如果锁不存在,设置锁并给锁加上超时时间
                if self.coon.set(self.lock_name,self.rem_ark,ex=self.lock_timeout,nx=True):
                    return True
                time.sleep(0.001)
            return False
    
        def _unlock(self) -> bool:
            """
            释放锁 方法一事务
            """
            # 使用redis中的事务pipeline
            pipe = self.coon.pipeline(True)
            while True:
                try:
                    # 通过watch监视锁名称,如果锁名称一旦改变,就抛出WatchError
                    pipe.watch(self.lock_name)
                    rem_rak = self.coon.get(self.lock_name)
                    if rem_rak and rem_rak == self.rem_ark:
                        pipe.multi()
                        pipe.delete(self.lock_name)
                        pipe.excute()
                        return True
    
                    pipe.unwatch()
                    break
    
                except WatchError:
                    pass
    
            return False
    
        def release_lock(self):
            """
            释放锁 方法二lua脚本
            """
            unlock_lua = """
            if redis.call("get",KEYS[1]) == ARGV[1] then
                return redis.call("del",KEYS[1])
            else
                return 0
            end
            """
            unlock = self.coon.register_script(unlock_lua)
            result = unlock(key=[self.lock_name,],args=[self.rem_ark])
            if result:
                return True
            return False
    
    
        def extra_time(self):
            """
            如果锁的超时时间小于redis存取的时间(锁已经超时了,但是任务还未执行完成)
            """
            # 默认给锁再延长10秒
            try:
                self.coon.expire(DistributedLocks.lock_name, 5)
            except Exception:
                raise TimeoutError
    
        def __enter__(self):
                self._lock()
    
    
        def __exit__(self, exc_type, exc_val, exc_tb):
    
            try:
                # self._unlock()
                self.release_lock()
            except Exception:
                return False
    
    
    def storage_redis(value):
        # 加锁与释放锁使用类DistributedLocks来实现
        DL = DistributedLocks(redis,'test') # 实例化分布式锁
        with DL:
            watchdog = WatchDog(DL.lock_timeout,DL.extra_time()) # 启动看门狗
            try:
                redis.set('age', value)
            except Exception:  # 捕获异常,进行异常操作
                # watchdog.error_handler() # 捕获异常,自定义异常处理
                # watchdog.reset() # 看门狗重启
                watchdog.stop() # 停止看门狗
    
        print(redis.get('age'))
    
    
    
    if __name__ == '__main__':
        for i in range(5):
            t = threading.Thread(target=storage_redis, args=(i,))
            t.start()
        print('--------------------')
        print(redis.get('age'))
        # start = time.time()
        # storage_redis(13)
        # end = time.time()-start
        # print(end)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164

    如果锁已经超时了,但是任务还未执行完成那么便再给它加5秒,当然这种也会有问题,如果5秒后任务还没执行完成就会报错,当然了你可以在watchdog.error_handler()方法内自定义异常处理

    redis可重入锁的实现

    可重入性是指线程在持有锁的情况下再次请求加锁,如果一个锁支持同一个线程的多次加锁,那么这个锁就是可重入的。

    import threading
    import time
    import uuid
    import redis as _redis
    from redis import WatchError
    from threading import Timer
    redis = _redis.Redis(host='localhost', port=6379, decode_responses=True, db=11)
    
    
    
    class WatchDog(object):
        """
        Python实现redis的看门狗机制
        """
        def __init__(self,timeout,user_handler=None):
            self.timeout  = timeout
            self.user_handler = user_handler if user_handler else self.default_handler
            self.timer = Timer(self.timeout,self.user_handler)
            self.timer.start()
    
        def reset(self):
            """计时器重启"""
            self.timer.cancel()
            timer = Timer(self.timeout,self.user_handler)
            timer.start()
    
        def stop(self):
            """计时器停止"""
            self.timer.cancel()
    
        def default_handler(self):
            pass
    
        @classmethod
        def error_handler(cls):
            pass
    
    
    class DistributedLocks(object):
        """
        Redis分布式锁实现类
        """
        lock_timeout = None
        lock_name = None
        def __init__(self,coon,lock_name=None , acquire_timeout=3 , lock_timeout=3, **kwargs):
            """
            coon --> redis的连接
            lock_name --> 锁的名称
            acquire_timeout -->  获取锁的超时时间(秒)
            lock_timeout -->  锁的超时时间(秒)
            """
            self.coon = coon
            if not lock_name:
                lock_name = 'default'
            self.lock_name = f'lock:{lock_name}'
            self.rem_ark = str(uuid.uuid4())
            self.lock_timeout = lock_timeout
            self.end = time.time() + acquire_timeout
            DistributedLocks.lock_timeout = self.lock_timeout
            DistributedLocks.lock_name = self.lock_name
    
            self.locks = threading.local()
            # 可重入锁栈
            self.locks.redis = {}
    
    
        def _lock(self) -> bool:
            """
            加锁
            """
            while time.time() <= self.end:
                # 如果栈内存在此锁,把持有锁的数量加一
                if self.lock_name in self.locks.redis:
                    self.locks.redis[self.lock_name] +=1
                    return True
                # 如果栈内锁不存在,设置锁并给锁加上超时时间
                if self.coon.set(self.lock_name,self.rem_ark,ex=self.lock_timeout,nx=True):
                    # 再在栈中加入一个键值对key为当前线程,value为线程持有锁的数量
                    self.locks.redis[self.lock_name] = 1
                    return True
                return False
    
    
    
        def _unlock(self) -> bool:
            """
            释放锁 方法一事务
            """
            # 判断锁是否在栈中
            if self.lock_name in self.locks.redis:
                # 将锁的数量减一
                self.locks.redis[self.lock_name] -= 1
                # 锁如果小于等于零的话,直接将锁释放掉
                if self.locks.redis[self.lock_name] <= 0:
                    # 使用redis中的事务pipeline
                    pipe = self.coon.pipeline(True)
                    while True:
                        try:
                            # 通过watch监视锁名称,如果锁名称一旦改变,就抛出WatchError
                            pipe.watch(self.lock_name)
                            rem_rak = self.coon.get(self.lock_name)
                            if rem_rak and rem_rak == self.rem_ark:
                                pipe.multi()
                                pipe.delete(self.lock_name)
                                pipe.excute()
                                return True
    
                            pipe.unwatch()
                            break
    
                        except WatchError:
                            pass
    
                    return False
                return False
    
        def release_lock(self):
            """
            释放锁 方法二lua脚本
            """
            unlock_lua = """
            if redis.call("get",KEYS[1]) == ARGV[1] then
                return redis.call("del",KEYS[1])
            else
                return 0
            end
            """
            unlock = self.coon.register_script(unlock_lua)
            result = unlock(key=[self.lock_name,],args=[self.rem_ark])
            if result:
                return True
            return False
    
    
        def extra_time(self):
            """
            如果锁的超时时间小于redis存取的时间(锁已经超时了,但是任务还未执行完成)
            """
            # 默认给锁再延长10秒
            try:
                self.coon.expire(DistributedLocks.lock_name, 5)
            except Exception:
                raise TimeoutError
    
        def __enter__(self):
                self._lock()
    
    
        def __exit__(self, exc_type, exc_val, exc_tb):
    
            try:
                self._unlock()
                # self.release_lock()
            except Exception:
                return False
    
    
    def storage_redis(value):
        # 加锁与释放锁使用类DistributedLocks来实现
        DL = DistributedLocks(redis,'test') # 实例化分布式锁
        with DL:
            watchdog = WatchDog(DL.lock_timeout,DL.extra_time()) # 启动看门狗
            try:
                redis.set('age', value)
            except Exception:  # 捕获异常,进行异常操作
                # watchdog.error_handler() # 捕获异常,自定义异常处理
                # watchdog.reset() # 看门狗重启
                watchdog.stop() # 停止看门狗
    
        print(redis.get('age'))
    
    
    
    if __name__ == '__main__':
        for i in range(5):
            t = threading.Thread(target=storage_redis, args=(i,))
            t.start()
        print('--------------------')
        print(redis.get('age'))
        # start = time.time()
        # storage_redis(13)
        # end = time.time()-start
        # print(end)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 本人极其不推荐使用可重入锁,它加重了客户端的复杂性,在编写业务方法时注意在 逻辑结构上进行调整完全可以不使用可重入锁
    select 100-mean(usage_idle) as usage_idle from (
        select last(usage_idle)  as usage_idle from (
            select mean("usage_idle") as usage_idle from cpu where sys_name ='study' and time > now() - 10m group by host,cpu,time(10m)
        ) group by host,cp
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 相关阅读:
    C++GUI之wxWidgets(4)-编写应用涉及的类和方法(1)
    CCF中国开源大会专访|毛晓光:“联合”是开源走向“共赢”的必由之路
    XML语法、约束
    操作系统 进程 - 进程调动
    【大麦小米学量化】使用文心一言AI编写股票量化交易策略代码(含演示代码和进阶演示)
    Linux:基础笔记
    第四届辽宁省大学生程序设计竞赛(正式赛)A B H F M C
    一场技术破案的经过
    jenkins 部署 vue 项目
    二分模板代码
  • 原文地址:https://blog.csdn.net/prigilm/article/details/126050332