• Linux基础组件之分布式锁的实现


    一、背景

    通常说的锁,一般是单机单进程中多个线程环境下,当多个线程涉及资源竞争的时候的锁操作;加锁的本质是在同一时刻,同一任务只能在一个线程中执行。
    通常使用的锁有:
    (1)自旋锁。互斥类型的锁。
    (2)互斥锁。互斥类型的锁。
    (3)信号量。通常应用在实现多层级的缓存;同步类型的锁。
    (4)读写锁。应用在多读少写的场景,如数据库中的行锁。
    (5)原子变量、内存屏障。
    (6)条件变量。同步类型的锁。
    互斥锁的使用:

    pthread_mutex_init(&mutex,NULL);
    pthread_mutext_lock(&mutex);
    // ...
    pthread_mutex_unlock(&mutex);
    pthread_mutex_destroy(&mutex);
    

    进程间通信(IPC)方式:
    (1)信号量。sem_init第二个参数有一个pshared;如果pshared=0在多线程中使用,pshared>0在多进程间使用。

    sem_init();
    sem_post();
    sem_wait();
    

    (2)管道。无名管道(pipe)、有名管道(FIFO)。
    (3)信号
    (4)消息队列
    (5)共享内存
    (6)socket

    在分布式系统中,一个应用部署在多个机器中,在某些场景下,为了保证数据一致性要求在同一时刻,同一任务只在一个节点上运行,即保证某行为在同一时刻只能被一个线程执行;在单机单进程多线程环境,通过锁很容易做到,比如mutex、spinlock、信号量等;而在多机多进程环境中,此时就需要分布式锁来解决了。可以理解为在分布式场景中实现互斥类型的锁

    database
    lock
    S1
    S2
    S3
    S...

    分布式锁本质上是解决分布式事务中的隔离性(某个行为可能需要访问多个目标,为了避免并发情况的发生,需要保证执行操作的顺序性)。

    二、解决的问题

    可以理解为把多线程中解决的问题的场景迁移到分布式场景中。某个资源或行为同时只允许一个实体执行,其他实体需要等待。

    分布式锁由两部分构成:
    (1)资源。锁存储。锁本质上是资源;即这是一个所有人都能访问到的资源。在分布式场景中,锁资源是通过网络交互来访问的,实体访问锁资源,锁资源要告诉实体是否访问成功。
    (2)行为。加锁或解锁;网络交互的方式实现。隐含重点:加锁对象和解锁对象必须为同一个;借鉴mutex的理念。

    三、分布式锁特性

    (1)互斥性。分布式锁是互斥类型的锁;同时只允许一个持锁对象进入临界资源,其他待持锁对象要么等待,要么轮询检测是否能获取锁。需要记录持有锁对象(加锁对象和解锁对象必须为同一个)方便判定锁被谁占有了。
    (2)锁超时。允许持锁对象持锁最长时间;如果持锁对象宕机,需要外力解除锁定,方便其他持锁对象获取锁。
    在单进程的多线程场景下,资源和行为是同生共死的关系,程序宕机会自动释放所有资源和行为。而在分布式场景中有比较大的差别,锁资源和行为是分离的,通过网络交互操作锁,要考虑到锁资源宕机和行为实体宕机的情况如何释放资源和解除行为。
    比如行为实体宕机了,如何释放锁?如果不能释放锁则其他的实体将一直等待;所以,需要锁超时机制,设置操作时长的最大值,超时释放锁。再比如锁资源宕机的情况。
    (3)高可用性。合理实时间内得到合理的回复;琐存储位置若宕机,可能引发整个系统不可用;应有备份存储位置和切换备份存储的机制,从而确保服务可用。
    实现上有两种方式:计算型(不存储数据只完成行为,比如网关只计算不存储)和存储型(存储数据,有多个备份点,存储超过半数以上的备份节点;具有切换功能,以解决宕机问题)。
    (4)容错性。若锁存储位置宕机,恰好锁丢失的话,是否能正确处理。
    一个实体行为申请了锁,此时锁资源宕机,切换到了备份资源,但是备份资源没有该实体行为的记录,这是种错误,那么就要一致性来解决(raft一致性算法和redlock来实现)。
    redlock的实现:开奇数个进程,写锁的时候,写入进程半数以上成功的返回获取锁成功,否则失败。

    四、分布式锁类型

    (1)重入锁和非重入锁。是否允许持锁对象再次获取锁。
    对应一个进程多线环境的递归锁和非递归锁;即允不允许持锁对象多次进入临界资源。
    (2)公平锁和非公平锁。对应互斥锁和自旋锁。公平锁通常通过排队来实现;非公平锁通常不间断尝试获取锁来实现。
    互斥锁中如果一个线程发现锁以及被占用了,那么会进入阻塞等待,会引起线程切换,线程进入阻塞队列;当锁释放了,阻塞队列会取出一个阻塞线程加入就绪队列,等待CPU调度。

    线程切换
    mutex
    阻塞队列
    就绪队列
    CPU核心

    自旋锁中如果一个线程发现锁以及被占用了,那么会空转CPU,如果一定时间锁还不可用,那么会调用sched_yield(),引起线程切换,线程直接进入就绪队列,等待CPU调度。自旋锁不需要考虑内存序和编译器优化,循环中不断使用原子操作检查锁是否可用。

    线程切换
    spin_lock
    就绪队列
    CPU核心

    在就绪队列中的好处是当有CPU核心空闲时会从就绪队列中取出任务执行;可以注意到互斥锁是先加入阻塞队列再进入就绪队列。

    公平锁和非公平锁就是获取锁的概率。所以公平锁对应互斥锁,非公平锁对应的自旋锁。

    五、实现重点

    (1)锁是一种资源,需要存储;保证可用性,避免锁失效。
    (2)加锁对象和解锁对象必须为同一个。
    (3)互斥语义。给锁添加标志,通过标记状态获悉锁是否占用中。
    (4)加锁和解锁行为是网络通信,需要考虑锁超时。
    (5)怎么获取持有锁对象释放锁?
    主动探寻(非公平锁)、广播方式被动通知(非公平锁)、排队单独统计(公平锁)。
    (6)是否运行同一持锁对象多次加锁?即重入锁和非重入锁。

    六、MySQL实现分布式锁

    主要利用 MySQL 唯一键的唯一性约束来实现互斥性。

    6.1、MySQL做哪些事情

    (1)存储锁。
    MySQL是关系型数据库,通过表存储数据。不同业务类型的锁放置表中不同的行。
    (2)实现互斥性

    6.2、表结构

    DROP TABLE IF EXISTS `dislock`;
    CREATE TABLE `dislock` (
      `id` int(11) unsigned NOT NULL AUTO_INCREMENT
    COMMENT '主键',
      `lock_type` varchar(64) NOT NULL COMMENT '锁类型',
      `owner_id` varchar(255) NOT NULL COMMENT '持锁对象',
      `update_time` timestamp NOT NULL DEFAULT
    CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`),
      UNIQUE KEY `idx_lock_type` (`lock_type`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT
    CHARSET=utf8 COMMENT='分布式锁表';
    

    id,不同类型的锁 主键id不断自增。
    lock_type,锁的类型用来描述不同业务类型的锁。实现互斥。
    owner_id,持锁对象,允许谁来解锁,其他对象不能解锁;另一个作用是避免重复加锁。
    update_time,具体操作锁的时间,主要用于解决锁超时问题。
    唯一索引是指一列中不存在重复字段的行,即字段唯一。主键是非空唯一索引。根据唯一索引的约束实现互斥,即lock_type在一个表中不会出现两个相同的lock_type。
    唯一键是确保字段在表中是唯一的。

    6.3、加锁

    往表插入一行数据。

    INSERT INTO dislock (`lock_type`, `owner_id`) VALUES ('act_lock', 'ad2daf3');
    

    假设有一个S1申请了锁,那么此时有S2想申请锁,由于唯一键和唯一索引的约束,S2插入数据失败,逻辑上判断加锁失败。
    MySQL没有通知机制,即申请锁对象只能不断轮询插入;非公平锁。

    // 连续分布式锁的使用案例
    while (1) {
        CLock my_lock;
        bool flag = dlm->ContinueLock("foo", 14000, my_lock);
        if (flag) {
            printf("获取成功, Acquired by client name:%s, res:%s, vttl:%d\n",
                   my_lock.m_val, my_lock.m_resource, my_lock.m_validityTime);
            // do resource job
            sleep(10);
        } else {
            printf("获取失败, lock not acquired, name:%s\n", my_lock.m_val);
            sleep(rand() % 3);
        }
    }
    

    6.4、解锁

    从表中删除一行数据;注意要带上owner_id,避免删除其他对象的锁。

    DELETE FROM dislock WHERE `lock_type` = 'act_lock' AND `owner_id` = 'ad2daf3';
    

    如果持锁对象不是自己,那么会解锁失败。

    6.5、锁超时

    锁超时是在MySQL中有超进程,利用定时器实现定时检测表,用当前时间减去update_time,如果超过最大持锁时间,则删除一行数据(释放锁)。这是计算型的高可用。

    6.6、实现重入锁

    在表结构中加一个count字段,加锁count加一,解锁count减一;当count等于0时删除数据(解锁)。

    七、redis实现分布式锁

    redis是一个、内存数据库(内存中操作数据)、数据结构数据库(提供丰富的数据结构:string \ list \ hash \ set \ zset等等)、key-vale数据库(通过key操作value)。

    database
    redis
    S1
    S2
    S3
    S...

    redis通过哨兵模式、cluster模式保证可用性;但是redis的容错性有问题,redis的主从复制(每个主节点都有备份节点)是异步的,即备份是异步复制的方式。

    所谓异步复制,是指服务操作redis时,如果操作成功了立刻返回,然后由redis异步的将改变的数据同步到备份点。

    所谓同步复制,是指服务操作redis时,如果操作成功了没有立刻返回,而是等redis成功将改变的数据同步到备份点后才返回(如果是redlock或raft一致性协议就是半数以上成功了就可以返回)。

    那么当redis宕机了,切换到备份点后,但是最新的数据没有同步到备份点,就会造成数据丢失,所以容错性存在一定问题。

    7.1、加锁

    redis的set命令中,可以把key存放锁的类型,value存放持锁对象;为了实现锁的互斥性,使用NX参数(NX就是not exist的缩写)。

    # set key value NX
    set act_lock 123123123 NX
    

    返回OK加锁成功,返回nil加锁失败。

    真正加锁操作一般不会使用上面的,因为需要考虑重入锁。可以考虑使用hash的数据结构。

    --[[
        KEYS[1]         lock_name
        KEYS[2]         lock_channel_name
        ARGV[1]         lock_time (ms)
        ARGV[2]         uuid
    ]]
    if redis.call('exists', KEYS[1]) == 0 then
        redis.call('hset', KEYS[1], ARGV[2], 1)
        redis.call('pexpire', KEYS[1], ARGV[1])
        return
    end
    -- 若支持锁重入,将注释去掉
    -- if redis.call('hexists', KEYS[1], ARGV[2]) ==
    1 then
    --     redis.call('hincrby', KEYS[1], ARGV[2], 1)
    --     redis.call('pexpire', KEYS[1], ARGV[1])
    --     return
    -- end
    redis.call("subscribe", KEYS[2])
    return redis.call('pttl', KEYS[1])
    

    7.2、解锁

    使用redis的del命令。需要检测自己是不是持锁对象。

    del act_lock
    

    get命令查看key对象是否存在。

    get act_lock
    

    所以,解锁的正确操作是:
    (1)get act_lock。获得当前锁的持锁对象。
    (2)value==owner_id?判断自己是不是持锁对象。
    (3)del act_lock。
    这就需要两次网络交互:

    get
    del
    client
    redis
    client2

    这就可能存在client2把锁释放了。这涉及到redis的事务。

    --[[
        KEYS[1]         lock_name
        KEYS[2]         lock_channel_name
        ARGV[1]         0 sign of unlock
        ARGV[2]         lock_expire_time
        ARGV[3]         uuid
    ]]
    if redis.call('exists', KEYS[1]) == 0 then
        redis.call('publish', KEYS[2], ARGV[1])
        return 1
    end
    if redis.call('hexists', KEYS[1], ARGV[3]) == 0
    then
        return
    end
    -- 若支持锁重入,将注释去掉
    -- local cnt = redis.call('hincrby', KEYS[1],
    ARGV[3], -1)
    -- if cnt > 0 then
    --     redis.call('pexpire', KEYS[1], ARGV[2])
    --     return 0
    -- else
        redis.call('del', KEYS[1])
        redis.call('publish', KEYS[2], ARGV[1])
        return 1
    -- end
    

    7.3、锁超时

    redis的set命令中有EX和PX参数,设置超时时间。
    EX就是Expire的缩写,这个以秒为单位。
    PX是pExpire,这个是以毫秒为单位。

    set act_lock 123123123 NX EX 10
    # 或者
    set act_lock 123123123 NX PX 10000
    

    可以使用ttl命令查看剩余时间。

    ttl act_lock
    

    锁超时设定一定要远远大于网络交互时间。

    7.4、分布式原子性

    redis lua脚本来实现。

    7.5、redlock解决容错性

    一般redis有5个备份结点(进程),分别在不同的机器当中。

    备份点
    redis 1
    redis 2
    redis 3
    redis 4
    redis 5
    database
    redis
    S1
    S2
    S3
    S...

    redis集群主从复制采用的是异步复制的方式,为解决主从数据不一致问题:
    (1)加锁,需要对每个进程执行加锁操作,超过半数以上成功才能说明加锁成功。
    (2)解锁,需要对每个进程执行解锁操作,超过半数以上成功才能说明解锁成功。

    redis 1
    redis 2
    redis 3
    redis 4
    redis 5
    database
    S1
    S2
    S3
    S...
    class CLock {
    public:
                                CLock();
                                ~CLock();
    public:
        int                     m_validityTime; // 当前锁可以存活的时间, 毫秒
        sds                     m_resource;     // 要锁住的资源名称
        sds                     m_val;          // 锁住资源的进程随机名字
    };
    
    class CRedLock {
    public:
                                CRedLock();
        virtual                 ~CRedLock();
    public:
        bool                    Initialize();
        bool                    AddServerUrl(const char *ip, const int port);
        void                    SetRetry(const int count, const int delay);
        bool                    Lock(const char *resource, const int ttl, CLock &lock);
        bool                    ContinueLock(const char *resource, const int ttl,
                                             CLock &lock);
        bool                    Unlock(const CLock &lock);
    private:
        bool                    LockInstance(redisContext *c, const char *resource,
                                             const char *val, const int ttl);
        bool                    ContinueLockInstance(redisContext *c, const char *resource,
                                                     const char *val, const int ttl);
        void                    UnlockInstance(redisContext *c, const char *resource,
                                               const char *val);
        sds                     GetUniqueLockId();
        redisReply *            RedisCommandArgv(redisContext *c, int argc, char **inargv);
    private:
        static int              m_defaultRetryCount;    // 默认尝试次数3
        static int              m_defaultRetryDelay;    // 默认尝试延时200毫秒
        static float            m_clockDriftFactor;     // 电脑时钟误差0.01
    private:
        sds                     m_unlockScript;         // 解锁脚本
        int                     m_retryCount;           // try count
        int                     m_retryDelay;           // try delay
        int                     m_quoRum;               // majority nums
        int                     m_fd;                   // rand file fd
        vector<redisContext *>  m_redisServer;          // redis master servers
        CLock                   m_continueLock;         // 续锁
        sds                     m_continueLockScript;   // 续锁脚本
    };
    
    

    八、分布式锁的使用场景

    (1)有些业务场景,需要严格的执行顺序,如果顺序打破可能会引起业务数据错乱。

    1
    2
    1
    2
    1
    2
    1
    2
    业务 1
    业务 2
    database
    S1
    S2
    S3
    S...

    如上图,比如按照1–>2–>1–>2–>…不会出现数据错乱问题,但如果时并发执行下出现1–>1–>1–>2–>2–>1–>2–>…就可能出现数据错乱。所以可以在1之前加锁,执行完2释放锁来保证并发下的顺序执行。

    (2)与数据库配合。数据库有不同级别的隔离,但是又不想使用最高的隔离级别,就需要分布式锁保证数据的正确性。

    8.1、分布式锁的选择

    (1)当系统中没有引入redis,只有MySQL,那么就使用MySQL实现分布式锁。不要为了实现分布式锁引入redis等中间件,因为引入中间件可能会给系统带来不稳定因素;运维上也很困难。另外,如果只有少量业务需要分布式锁,也优先考虑MySQL实现分布式锁。
    (2)如果项目中有使用redis(比如缓存是在redis里面、redis使用了集群),使用redis实现分布式锁。
    (3)如果使用了分布式系统,尤其使用的k8s部署,使用的语言是go、java的,那么使用etcd来实现分布式锁。

    总结

    (1)分布式锁是分布式场景中实现互斥类型的锁。
    (2)分布式场景就是多台机器在不同的进程当中;需要一个具体存储锁的位置,所有的进程都可以访问。锁存储和加锁 / 解锁行为,加锁对象和解锁对象必须为同一个。
    (3)解决的问题,分布式场景中,某个资源或行为只允许有一个对象可以操作,主要针对分布式的隔离性。
    (4)分布式锁的特性:互斥性、锁超时、可用性、容错性。

    分布式锁的特性
    互斥性
    可用性
    容错性
    锁超时

    (5)可用性的实现是存储型的锁,提供多个备份点以防止锁资源宕机,同时要具备切换机制,当宕机时切换到最新的备份点。
    (6)容错性中,使用raft一致性算法或redlock解决锁资源宕机时备份节点没有最新的持锁对象问题,保证持锁对象的一致性。
    (7)分布式锁的类型有:重入锁和非重入锁,公平锁和非公平锁。公平锁采用排队来实现,非公平锁采用轮询的方式实现。
    (8)MySQL实现分布式锁可用性依赖数据库;若数据库是单点,挂掉将导致业务系统不可用;还需要额外实现锁失效的问题,解锁失败,其他线程将无法获得锁。
    (9)redis集群主从复制采用的是异步复制的方式,存在主从数据不一致问题,数据丢失意味着锁丢失;为了解决这个问题使用redlock方法,执行加锁时向所有的节点都执行相同语句,有半数以上(N/2 + 1)写成功就表示加锁成功。
    (10)redis是效率最高的分布式锁;etcd是完备性最高的分布式锁;MySQl是效率最低的、最不完备的分布式锁。

    欢迎关注公众号《Lion 莱恩呀》学习技术,每日推送文章。
    在这里插入图片描述

  • 相关阅读:
    React报错之Encountered two children with the same key
    Hadoop的eclipse搭建(客观莫划走,留下来看一眼(适用人群学生初学,其他人看看就行))
    不懂“数据服务”,聊什么“数据中台”
    【正点原子FPGA连载】 第三章 硬件资源详解 摘自【正点原子】DFZU2EG/4EV MPSoC 之FPGA开发指南V1.0
    #include <hardware/bluetooth.h>
    Posix信号量
    Spring MVC注解Controller源码流程解析--映射建立
    在Linux上安装RStudio工具并实现本地远程访问【内网穿透】
    企业做商城怎么选品
    【MySQL知识点】默认约束、非空约束
  • 原文地址:https://blog.csdn.net/Long_xu/article/details/127088826