• 使用py-redis分布式锁解决超卖问题——微服务总结(一)


    py-redis 分布式锁

    电商库存服务

    电商的库存为核心服务,一般抽离为独立的服务

    用户查看商品信息中的库存量,以及下单时扣除库存都需要与库存服务交互

    下边的案例为简化的电商下单扣除库存的案例

    image.png

    1. 用户下单后并不会直接扣除库存量,而是预扣除
    2. 当订单超时,或者用户支付失败时,会归还库存

    库存表设计

    为了使用代码演示库存并发问题,设计一个简单的库存ORM, 这里使用的是Python 的peewee 库:

    from datetime import datetime
    
    from peewee import *
    from playhouse.shortcuts import ReconnectMixin
    from playhouse.pool import PooledMySQLDatabase
    from inventory_srv.settings import settings
    
    
    class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
        pass
    
    
    db = ReconnectMySQLDatabase("airflow_test", host="127.0.0.1", port=3306, user="dbadmin", password="hE4sqSfuCQeXEXwz")
    
    
    class BaseModel(Model):
        add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
        is_deleted = BooleanField(default=False, verbose_name="是否删除")
        update_time = DateTimeField(verbose_name="更新时间", default=datetime.now)
    
        def save(self, *args, **kwargs):
            # 判断这是一个新添加的数据还是更新的数据
            if self._pk is not None:
                # 这是一个新数据
                self.update_time = datetime.now()
            return super().save(*args, **kwargs)
    
        @classmethod
        def delete(cls, permanently=False):  # permanently表示是否永久删除
            if permanently:
                return super().delete()
            else:
                return super().update(is_deleted=True)
    
        def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
            if permanently:
                return self.delete(permanently).where(self._pk_expr()).execute()
            else:
                self.is_deleted = True
                self.save()
    
        @classmethod
        def select(cls, *fields):
            return super().select(*fields).where(cls.is_deleted == False)
    
        class Meta:
            database = settings.DB
    
    
    class Inventory(BaseModel):
        # 商品的库存表
        # stock = PrimaryKeyField(Stock)
        goods = IntegerField(verbose_name="商品id", unique=True)
        stocks = IntegerField(verbose_name="库存数量", default=0)
        version = IntegerField(verbose_name="版本号", default=0)  # 分布式锁的乐观锁
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    image.png

    初始库存表:有4个商品的库存都为100

    并发问题

    超卖问题

    使用多线程模拟并发扣除库存:

    import threading
    import time
    from random import randint
    
    
    def sell0(user):
        """
        使用Mysql的事务机制,当库存不足时回滚操作
        模拟并发下超卖问题:
        先查询剩余库存,如果剩余库存不足,回滚操作
        """
        goods_list = [(1, 99), (2, 20), (3, 30)]
        with db.atomic() as txn:
            for goods_id, num in goods_list:
                # 查询库存
                goods_inv = Inventory.get(Inventory.goods == goods_id)
                time.sleep(randint(1, 3))
                if goods_inv.stocks < num:
                    print(f"商品:{goods_id} 库存不足")
                    txn.rollback()
                    break
                else:
                    # 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
                    query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
                    ok = query.execute()
                    if ok:
                        print(f"{user}购买商品{goods_id} 售出 {num}件")
                    else:
                        print(f"{user}购买商品失败")
      
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=sell0, args=("用户1", ))
        t2 = threading.Thread(target=sell0, args=("用户2", ))
        t1.start()
        t2.start()
    
        t1.join()
        t2.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    运行结果如下:

    用户2购买商品1 售出 99件
    用户2购买商品2 售出 20件
    用户1购买商品1 售出 99件
    用户1购买商品2 售出 20件
    用户2购买商品3 售出 30件
    用户1购买商品3 售出 30件
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    商品1总共只有100件,但是两个用户都卖出了99件,查看数据库发现库存变为了负数:

    img

    • 之所以引起超卖问题是因为我们在卖出前查询库存的时候,两个线程基本同时查询出来的库存都是100件,导致两个线程都更新成功

    解决方案

    使用悲观锁

    • 悲观锁就是直接将整个扣除操作加上互斥锁,多个线程同时只有一个可以执行扣除操作
    • 使用悲观锁最大的问题就是并发性不高,而且为单机的

    当应用以分布式的方式部署时,将不适用

    代码实现

    只需要修改上边的代码,增加锁即可,在执行代码前需要重置下数据库的商品库存

    import threading
    import time
    from random import randint
    
    
    R = threading.Lock()
    
    
    def sell(user):
        goods_list = [(1, 99), (2, 20), (3, 30)]
        with db.atomic() as txn:
            for goods_id, num in goods_list:
                # 查询库存
                R.acquire()  # 获取锁 
                goods_inv = Inventory.get(Inventory.goods == goods_id)
                import time
                from random import randint
                time.sleep(randint(1, 3))
                if goods_inv.stocks < num:
                    print(f"商品:{goods_id} 库存不足")
                    txn.rollback()
                    break
                else:
                    query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
                    ok = query.execute()
                    if ok:
                        print(f"{user}购买商品{goods_id} 售出 {num}件")
                    else:
                        print(f"{user}购买商品失败")
                R.release()  # 释放锁
                
                
    if __name__ == "__main__":
        t1 = threading.Thread(target=sell1, args=("用户1", ))
        t2 = threading.Thread(target=sell1, args=("用户2", ))
        t1.start()
        t2.start()
    
        t1.join()
        t2.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    代码执行结果如下:

    用户1购买商品1 售出 99件
    用户1购买商品2 售出 20件
    用户1购买商品3 售出 30件
    商品:1 库存不足
    
    • 1
    • 2
    • 3
    • 4

    可以发现枷锁之后没有出现超卖现象

    使用分布式锁

    基于MySQL的乐观锁

    • 使用乐观锁就需要在表中增加一个version 字段,当执行更新操作的时候,判断version 是否为当前参数版本,如果不是就不执行更新操作如果是就执行更新操作,并且version 加 一

    • 该方式实现简单,但是会增加MySQL的压力,影响性能

    import threading
    import time
    from random import randint
    
    
    def sell2(user):
        # 演示基于数据库的乐观锁机制
        goods_list = [(1, 10), (2, 20), (3, 30)]
        with db.atomic() as txn:
            # 超卖
            for goods_id, num in goods_list:
                # 查询库存
                while True:
                    goods_inv = Inventory.get(Inventory.goods == goods_id)
                    print(f"当前的版本号:{goods_inv.version}")
                    time.sleep(randint(1, 3))
                    if goods_inv.stocks < num:
                        print(f"商品:{goods_id} 库存不足")
                        txn.rollback()
                        break
                    else:
                        query = Inventory.update(stocks=Inventory.stocks - num, version=Inventory.version + 1).where(
                            Inventory.goods == goods_id, Inventory.version==goods_inv.version)
                        ok = query.execute()
                        if ok:
                            print(f"{user}购买商品{goods_id} 售出 {num}件")
                        else:
                            print(f"{user}购买商品失败")
                
                
    if __name__ == "__main__":
        t1 = threading.Thread(target=sell2, args=("用户1", ))
        t2 = threading.Thread(target=sell2, args=("用户2", ))
        t1.start()
        t2.start()
    
        t1.join()
        t2.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    执行结果:

    当前的版本号:0
    当前的版本号:0
    用户1购买商品1 售出 10件
    当前的版本号:1
    用户2购买商品失败
    当前的版本号:1
    用户1购买商品1 售出 10件
    当前的版本号:2
    用户2购买商品失败
    当前的版本号:2
    用户2购买商品1 售出 10件
    当前的版本号:3
    用户1购买商品失败
    当前的版本号:3
    用户1购买商品1 售出 10件
    当前的版本号:4
    用户2购买商品失败
    当前的版本号:4
    用户1购买商品1 售出 10件
    当前的版本号:5
    用户2购买商品失败
    当前的版本号:5
    用户1购买商品1 售出 10件
    当前的版本号:6
    用户2购买商品失败
    当前的版本号:6
    用户1购买商品1 售出 10件
    当前的版本号:7
    用户2购买商品失败
    当前的版本号:7
    用户1购买商品1 售出 10件
    当前的版本号:8
    用户2购买商品失败
    当前的版本号:8
    用户1购买商品1 售出 10件
    当前的版本号:9
    用户2购买商品失败
    当前的版本号:9
    用户1购买商品1 售出 10件
    当前的版本号:10
    商品:1 库存不足
    用户2购买商品失败
    当前的版本号:10
    当前的版本号:0
    用户1购买商品2 售出 20件
    当前的版本号:1
    商品:1 库存不足
    当前的版本号:1
    用户1购买商品2 售出 20件
    当前的版本号:2
    用户2购买商品失败
    当前的版本号:2
    用户1购买商品2 售出 20件
    当前的版本号:3
    用户2购买商品失败
    当前的版本号:3
    用户2购买商品2 售出 20件
    用户1购买商品失败
    当前的版本号:4
    当前的版本号:4
    用户2购买商品2 售出 20件
    当前的版本号:5
    商品:2 库存不足
    当前的版本号:0
    用户1购买商品失败
    当前的版本号:5
    商品:2 库存不足
    当前的版本号:0
    用户2购买商品3 售出 30件
    当前的版本号:1
    用户2购买商品3 售出 30件
    当前的版本号:2
    用户1购买商品失败
    当前的版本号:2
    用户1购买商品3 售出 30件
    当前的版本号:3
    用户2购买商品失败
    当前的版本号:3
    商品:3 库存不足
    商品:3 库存不足
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    基于redis的乐观锁

    • 基于redis的乐观锁就是在更新数据库前,从Redis中获取锁,更新后再将Redis中的锁释放

    • 从Redis 中获取锁:就是每次操作的时候在Redis 中设置一个锁的key

      释放锁:就是每次在操作完后删除key

    使用redis 作为分布式锁需要注意的问题

    1. 获取释放锁原子性问题
      • 使用redis 的set NX 操作保证获取和释放锁为一个原子性操作
      • redis 是使用lua语言编写的,为了保证获取锁和释放锁操作的原子性,会将这些关键操作封装为Lua 脚本代码,然后再使用的时候去调用这些脚本代码
    1. 死锁问题
      • 获取锁的客户端因为某些原因而宕机,而未能释放锁,其他客户端无法获取此锁,需要有机制来避免该类问题的发生
      • 每一个锁都应该有一个过期时间,过期后如果该锁没有被续租,那么该锁应该自动被释放
    1. 锁过期续租问题
      • 在锁的过期时间内,业务逻辑没有执行完,需要有机制保证该线程可以继续拥有该锁
      • 在执行业务逻辑的线程中,当获取锁的时候,需要开启一个守护线程,该线程就是用来监视锁过期的问题,如果需要续租,该线程就会重新设置该锁的过期时间
    1. 锁唯一性问题
      • 一个完整的锁包括id 和 name, 该id 一般使用uuid保证其唯一性,线程在获取锁的时候,需要同时传递id 和name

    第三方开源分布式redis 锁:

    https://github.com/ionelmc/python-redis-lock

    核心源码分析

    import sys
    import threading
    import weakref
    from base64 import b64encode
    from logging import getLogger
    from os import urandom
    
    from redis import StrictRedis
    
    __version__ = '3.6.0'
    
    logger = getLogger(__name__)
    
    text_type = str
    binary_type = bytes
    
    
    # Check if the id match. If not, return an error code.
    # 这里的KEYS[1]就是在获取锁的时候传递的id, KEYS[2] 为锁的name
    # 为了保证锁的唯一性,当释放锁的时候需要先判断 id 是否一致
    UNLOCK_SCRIPT = b"""
        if redis.call("get", KEYS[1]) ~= ARGV[1] then
            return 1
        else
            redis.call("del", KEYS[2])
            redis.call("lpush", KEYS[2], 1)
            redis.call("pexpire", KEYS[2], ARGV[2])
            redis.call("del", KEYS[1])
            return 0
        end
    """
    
    # Covers both cases when key doesn't exist and doesn't equal to lock's id
    # 续租:在续租的前也需要判断锁的id 是否一致
    # ARGV[2]续租时间,也就是新的TTL,默认为expire*2/3
    EXTEND_SCRIPT = b"""
        if redis.call("get", KEYS[1]) ~= ARGV[1] then
            return 1
        elseif redis.call("ttl", KEYS[1]) < 0 then
            return 2
        else
            redis.call("expire", KEYS[1], ARGV[2])
            return 0
        end
    """
    
    # 重置锁
    RESET_SCRIPT = b"""
        redis.call('del', KEYS[2])
        redis.call('lpush', KEYS[2], 1)
        redis.call('pexpire', KEYS[2], ARGV[2])
        return redis.call('del', KEYS[1])
    """
    
    # 重置所有锁
    RESET_ALL_SCRIPT = b"""
        local locks = redis.call('keys', 'lock:*')
        local signal
        for _, lock in pairs(locks) do
            signal = 'lock-signal:' .. string.sub(lock, 6)
            redis.call('del', signal)
            redis.call('lpush', signal, 1)
            redis.call('expire', signal, 1)
            redis.call('del', lock)
        end
        return #locks
    """
    
    
    class AlreadyAcquired(RuntimeError):
        pass
    
    
    class NotAcquired(RuntimeError):
        pass
    
    
    class AlreadyStarted(RuntimeError):
        pass
    
    
    class TimeoutNotUsable(RuntimeError):
        pass
    
    
    class InvalidTimeout(RuntimeError):
        pass
    
    
    class TimeoutTooLarge(RuntimeError):
        pass
    
    
    class NotExpirable(RuntimeError):
        pass
    
    
    class Lock(object):
        """
        A Lock context manager implemented via redis SETNX/BLPOP.
        """
        unlock_script = None
        extend_script = None
        reset_script = None
        reset_all_script = None
       
        def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
            """
            :param redis_client:
                An instance of :class:`~StrictRedis`.
            :param name:
                The name (redis key) the lock should have.
            :param expire:
                The lock expiry time in seconds. If left at the default (None)
                the lock will not expire.
            :param id:
                The ID (redis value) the lock should have. A random value is
                generated when left at the default.
                Note that if you specify this then the lock is marked as "held". Acquires
                won't be possible.
            :param auto_renewal:
                If set to ``True``, Lock will automatically renew the lock so that it
                doesn't expire for as long as the lock is held (acquire() called
                or running in a context manager).
                Implementation note: Renewal will happen using a daemon thread with
                an interval of ``expire*2/3``. If wishing to use a different renewal
                time, subclass Lock, call ``super().__init__()`` then set
                ``self._lock_renewal_interval`` to your desired interval.
            :param strict:
                If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
            :param signal_expire:
                Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
            """
            if strict and not isinstance(redis_client, StrictRedis):
                raise ValueError("redis_client must be instance of StrictRedis. "
                                 "Use strict=False if you know what you're doing.")
            if auto_renewal and expire is None:
                raise ValueError("Expire may not be None when auto_renewal is set")
    
            self._client = redis_client
    
            if expire:
                expire = int(expire)
                if expire < 0:
                    raise ValueError("A negative expire is not acceptable.")
            else:
                expire = None
            self._expire = expire
    
            self._signal_expire = signal_expire
            if id is None:
                self._id = b64encode(urandom(18)).decode('ascii')
            elif isinstance(id, binary_type):
                try:
                    self._id = id.decode('ascii')
                except UnicodeDecodeError:
                    self._id = b64encode(id).decode('ascii')
            elif isinstance(id, text_type):
                self._id = id
            else:
                raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
            self._name = 'lock:' + name
            self._signal = 'lock-signal:' + name
            self._lock_renewal_interval = (float(expire) * 2 / 3
                                           if auto_renewal
                                           else None)
            self._lock_renewal_thread = None
    
            self.register_scripts(redis_client)
    
        @classmethod
        def register_scripts(cls, redis_client):
            global reset_all_script
            if reset_all_script is None:
                reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
                cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
                cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
                cls.reset_script = redis_client.register_script(RESET_SCRIPT)
                cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
    
        @property
        def _held(self):
            return self.id == self.get_owner_id()
    
        def reset(self):
            """
            Forcibly deletes the lock. Use this with care.
            """
            self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
    
        @property
        def id(self):
            return self._id
    
        def get_owner_id(self):
            owner_id = self._client.get(self._name)
            if isinstance(owner_id, binary_type):
                owner_id = owner_id.decode('ascii', 'replace')
            return owner_id
    
        def acquire(self, blocking=True, timeout=None):
            """
            默认以阻塞的方式获取锁,redis 维护了一个等待获取锁的客户端队列
            将以阻塞方式获取锁的客户端放进队列中,每当释放对应的锁后,就会从队列中
            在取出一个客户端获取该锁
            :param blocking:
                Boolean value specifying whether lock should be blocking or not.
            :param timeout:
                An integer value specifying the maximum number of seconds to block.
            """
            logger.debug("Getting %r ...", self._name)
    
            if self._held:
                raise AlreadyAcquired("Already acquired from this Lock instance.")
    
            if not blocking and timeout is not None:
                raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
    
            if timeout:
                timeout = int(timeout)
                if timeout < 0:
                    raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
    
                if self._expire and not self._lock_renewal_interval and timeout > self._expire:
                    raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
    
            busy = True
            blpop_timeout = timeout or self._expire or 0
            timed_out = False
            while busy:
                busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
                if busy:
                    if timed_out:
                        return False
                    elif blocking:
                        timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
                    else:
                        logger.debug("Failed to get %r.", self._name)
                        return False
            #是否应该取刷新过期时间,不是一定要这样做, 这是有风险, 如果当前的进程没有挂,但是一直阻塞,退不出来,就会永远续租
            logger.debug("Got lock for %r.", self._name)
            if self._lock_renewal_interval is not None:
                self._start_lock_renewer()
            return True
    
        def extend(self, expire=None):
            """Extends expiration time of the lock.
            续租锁的过期时间
            :param expire:
                New expiration time. If ``None`` - `expire` provided during
                lock initialization will be taken.
            """
            if expire:
                expire = int(expire)
                if expire < 0:
                    raise ValueError("A negative expire is not acceptable.")
            elif self._expire is not None:
                expire = self._expire
            else:
                raise TypeError(
                    "To extend a lock 'expire' must be provided as an "
                    "argument to extend() method or at initialization time."
                )
    
            error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
            if error == 1:
                raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
            elif error == 2:
                raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
            elif error:
                raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
    
        @staticmethod
        def _lock_renewer(lockref, interval, stop):
            """
            Renew the lock key in redis every `interval` seconds for as long
            as `self._lock_renewal_thread.should_exit` is False.
            """
            log = getLogger("%s.lock_refresher" % __name__)
            while not stop.wait(timeout=interval):
                log.debug("Refreshing lock")
                lock = lockref()
                if lock is None:
                    log.debug("The lock no longer exists, "
                              "stopping lock refreshing")
                    break
                lock.extend(expire=lock._expire)
                del lock
            log.debug("Exit requested, stopping lock refreshing")
    
        def _start_lock_renewer(self):
            """
            Starts the lock refresher thread.
            """
            if self._lock_renewal_thread is not None:
                raise AlreadyStarted("Lock refresh thread already started")
    
            logger.debug(
                "Starting thread to refresh lock every %s seconds",
                self._lock_renewal_interval
            )
            self._lock_renewal_stop = threading.Event()
            self._lock_renewal_thread = threading.Thread(
                group=None,
                target=self._lock_renewer,
                kwargs={'lockref': weakref.ref(self),
                        'interval': self._lock_renewal_interval,
                        'stop': self._lock_renewal_stop}
            )
            self._lock_renewal_thread.setDaemon(True)
            self._lock_renewal_thread.start()
    
        def _stop_lock_renewer(self):
            """
            Stop the lock renewer.
            This signals the renewal thread and waits for its exit.
            """
            if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
                return
            logger.debug("Signalling the lock refresher to stop")
            self._lock_renewal_stop.set()
            self._lock_renewal_thread.join()
            self._lock_renewal_thread = None
            logger.debug("Lock refresher has stopped")
    
        def __enter__(self): #用来使用with语句
            acquired = self.acquire(blocking=True)
            assert acquired, "Lock wasn't acquired, but blocking=True"
            return self
    
        def __exit__(self, exc_type=None, exc_value=None, traceback=None):
            self.release()
    
        def release(self):
            """Releases the lock, that was acquired with the same object.
            .. note::
                释放锁,当我们在不同的线程中去获取和释放锁的时,
                在实例化Lock的时候必须带上id
                If you want to release a lock that you acquired in a different place you have two choices:
                * Use ``Lock("name", id=id_from_other_place).release()``
                * Use ``Lock("name").reset()``
            """
            if self._lock_renewal_thread is not None:
                self._stop_lock_renewer()
            logger.debug("Releasing %r.", self._name)
            error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
            if error == 1:
                raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
            elif error:
                raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
    
        def locked(self):
            """
            Return true if the lock is acquired.
            Checks that lock with same name already exists. This method returns true, even if
            lock have another id.
            """
            return self._client.exists(self._name) == 1
    
    
    reset_all_script = None
    
    
    def reset_all(redis_client):
        """
        Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
        :param redis_client:
            An instance of :class:`~StrictRedis`.
        """
        Lock.register_scripts(redis_client)
    
        reset_all_script(client=redis_client)  # noqa
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372

    源码逻辑比较简明:

    但是成功解决了上边提到的使用redis 作为分布式乐观锁的问题:

    1. 原子性问题:这里直接使用Lua脚本语言操作锁的核心逻辑

    2. 死锁问题:这里使用过期时间解决了死锁的问题

    3. 续租问题:预留了续租的接口参数,当我们指定需要续租的时候,会开启守护进程来监控过期时间

      但是如果由于业务代码异常导致一直续租,也会导致死锁问题,斟酌使用

    4. 锁唯一性问题:redis 中使用key 作为锁的name, value 作为锁的唯一id, 当客户端操作锁的时候,如果不显示的指定id,会默认生成一个唯一id

      但是当在分布式线程中获取锁的时候,最好自己指定id, 然后在释放锁的时候带上id

    image.png

    简单使用demo

    import time
    
    from redis import Redis
    conn = Redis(host='localhost', port=6379, db=15, password='IyY1NA3Zre76542M')
    
    import redis_lock
    import uuid
    
    lock_id = str(uuid.uuid4())
    lock = redis_lock.Lock(conn, "name-of-the-lock", expire=3, id=lock_id, auto_renewal=True)
    if lock.acquire(blocking=True):
        while True:
            print("Got the lock.")
            print("busy...")
            # 获取锁后,模拟耗时业务,这个时候该锁应该被续租
            time.sleep(10)
            break
        print("release the lock")
        lock.release()
    else:
        print("Someone else has the lock.")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    使用redis 分布式锁解决库存并发问题

    from redis import Redis
    import redis_lock
    import uuid
    import time
    
    conn = Redis(host='localhost', port=6379, db=15, password='IyY1NA3Zre76542M')
    
    def sell3(user):
        goods_list = [(1, 10), (2, 20), (3, 30)]
        with db.atomic() as txn:
            for goods_id, num in goods_list:
                lock_id = str(uuid.uuid4())
                lock = redis_lock.Lock(conn, "buy-goods-lock", expire=3, id=lock_id, auto_renewal=True)
                lock.acquire()
                goods_inv = Inventory.get(Inventory.goods == goods_id)
                time.sleep(5)
                if goods_inv.stocks < num:
                    print(f"商品:{goods_id} 库存不足")
                    txn.rollback()
                    break
                else:
                    query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
                    ok = query.execute()
                    if ok:
                        print(f"{user}购买商品{goods_id} 售出 {num}件")
                    else:
                        print(f"{user}购买商品失败")
                lock.release()
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=sell3, args=("用户1", ))
        t2 = threading.Thread(target=sell3, args=("用户2", ))
        t1.start()
        t2.start()
    
        t1.join()
        t2.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
  • 相关阅读:
    Spring boot 通过 wkhtmltopdf 实现URL转PDF
    【机器学习】无监督学习:探索数据背后的隐藏模式
    python爱心代码高级
    系统编程 day11 信号量 (我也不知道是什么东西,不好解释 )(和之前的信号有一些不同 (函数), 但是也有一些操作相同 ,比如说 p v 操作 , )
    阿里最新分享的《多线程核心技术第三版》神书就此霸榜GitHub,3天点击量已破百万
    搭建Android自动化python+appium环境
    Java入门基础知识
    spark集成hive
    C语言学习笔记(四)
    Zookeeper集群Leader选举源码剖析
  • 原文地址:https://blog.csdn.net/qq_42586468/article/details/126317336