• 内核同步机制-自旋锁(spin_lock)


    kernel 5.12

    1、spinlock_t结构体

            自旋锁由数据结构spinlock_t来表示,具体的定义如下:

    1. typedef struct spinlock {
    2. union {
    3. struct raw_spinlock rlock; //自旋锁的核心成员是和raw_spinlock锁。
    4. #ifdef CONFIG_DEBUG_LOCK_ALLOC //如果打开次配置,会增加调试信息存储
    5. # define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
    6. struct {
    7. u8 __padding[LOCK_PADSIZE];
    8. struct lockdep_map dep_map;
    9. };
    10. #endif
    11. };
    12. } spinlock_t;
    13. //这里如果打开的CONFIG_PREEMPT_RT开始,spinlock会被转化成rt_mutex,实时内核需要注意这里会睡眠
    14. /* PREEMPT_RT kernels map spinlock to rt_mutex */
    15. #include
    16. typedef struct spinlock {
    17. struct rt_mutex_base lock;
    18. #ifdef CONFIG_DEBUG_LOCK_ALLOC
    19. struct lockdep_map dep_map;
    20. #endif
    21. } spinlock_t;
    22. typedef struct raw_spinlock {
    23. arch_spinlock_t raw_lock; /*该成员变量是自旋锁数据类型的核心,
    24. 它展开后实质上是一个Volatile unsigned类型的变量。具体的锁定过程与它密切
    25. 相关,该变量依赖于内核选项CONFIG_SMP*/
    26. #ifdef CONFIG_DEBUG_SPINLOCK
    27. unsigned int magic, owner_cpu; //所拥有者所在的CPU
    28. void *owner; //锁的持有者
    29. #endif
    30. #ifdef CONFIG_DEBUG_LOCK_ALLOC
    31. struct lockdep_map dep_map; //调试使用,定义一个锁对象
    32. #endif
    33. } raw_spinlock_t;
    34. typedef struct { volatile int counter; } atomic_t; 
    35. /*x86/arm64 arch_spinlock_t数据结构,val是一个32位原子类型整数;
    36. 与二个16位的locked_pending(0-15) tail(16-31)共享内存;同时
    37. 与四个8位的locked(0-7), pending(8-15),2位idx + 14位cpu共享内存,
    38. 函数中具体通过掩码和位偏移来确定,可以看下文掩码相关详解*/
    39. | cpu |idx|pending locked |-->小端
    40. | tail |locked_pending |
    41. | val |
    42. 31---------------15--------------0
    43. typedef struct qspinlock {
    44. union {
    45. atomic_t val;
    46. /*
    47. * By using the whole 2nd least significant byte for the
    48. * pending bit, we can allow better optimization of the lock
    49. * acquisition for the pending bit holder.
    50. */
    51. #ifdef __LITTLE_ENDIAN
    52. struct {
    53. u8 locked; //可以理解为最优先持锁标志,即当unlock之后只有这个位的CPU最先持锁,也只会有1和0
    54. u8 pending; //用来表示这个锁是否被人持有,只会有1和0两种情况,即1被人持有,0无人持锁
    55. };
    56. struct {
    57. u16 locked_pending;//由locked 和 pending构成
    58. u16 tail;//由idx CPU构成,用来标识等待队列最后一个节点
    59. };
    60. #else
    61. struct {
    62. u16 tail;
    63. u16 locked_pending;
    64. };
    65. struct {
    66. u8 reserved[2];
    67. u8 pending;
    68. u8 locked;
    69. };
    70. #endif
    71. };
    72. } arch_spinlock_t;
    73. /*arm arch_spinlock_t 数据结构slock是一个32位无符号整数,与无符号16位整数owner
    74. 和 next共享内存空间,owner占低16位(0-15),next占高16(16-31)数据分布*/
    75. | next | owner |
    76. | slock |
    77. ----------------15---------------
    78. typedef struct {
    79. union {
    80. u32 slock;
    81. struct __raw_tickets {
    82. #ifdef __ARMEB__
    83. u16 next;
    84. u16 owner;
    85. #else
    86. u16 owner;
    87. u16 next;
    88. #endif
    89. } tickets;
    90. };
    91. } arch_spinlock_t;

    还有个重要的锁队列辅助结构需要重要讲一下。

             内核定义了个每cpu变量的qnodes数组,数组成员max_nodes=4个,其是为了表达每个cpu上最多等待包含进程、软中断、硬中断、nmi上下文的4个spinlock锁;数组成员为qnode,他是对mcs_spinlock结构体的一个封装,mcs_spinlock结构体里面包含一个链表成员指针,一个持锁标记locked成员和等锁成员数量count。

    其实他就是每个cpu上维护了四个等待spinlock锁的队列。

    1. struct mcs_spinlock {
    2. struct mcs_spinlock *next; //单链表
    3. int locked; /* 1 if lock acquired 如果锁被持有,则该值为1*/
    4. int count; /* nesting count, see qspinlock.c 嵌套数*/
    5. };
    6. struct qnode {
    7. struct mcs_spinlock mcs;
    8. #ifdef CONFIG_PARAVIRT_SPINLOCKS
    9. long reserved[2];
    10. #endif
    11. };
    12. static DEFINE_PER_CPU_ALIGNED(struct qnode, qnodes[MAX_NODES]);

            再补充个lock->val各个字段的掩码,用来具体的区分tail、pengding、locked字段

    1. /*
    2. * Bitfields in the atomic value:
    3. *
    4. * When NR_CPUS < 16K //超过16Kcpu的机器很少见吧,基本都这种布局。
    5. * 0- 7: locked byte
    6. * 8: pending
    7. * 9-15: not used
    8. * 16-17: tail index
    9. * 18-31: tail cpu (+1)
    10. *
    11. * When NR_CPUS >= 16K
    12. * 0- 7: locked byte
    13. * 8: pending
    14. * 9-10: tail index
    15. * 11-31: tail cpu (+1)
    16. */
    17. #define _Q_SET_MASK(type) (((1U << _Q_ ## type ## _BITS) - 1)\
    18. << _Q_ ## type ## _OFFSET)
    19. #define _Q_LOCKED_OFFSET 0
    20. #define _Q_LOCKED_BITS 8//locked 占8位 0-7
    21. #define _Q_LOCKED_MASK _Q_SET_MASK(LOCKED)//0-7位全1
    22. #define _Q_PENDING_OFFSET (_Q_LOCKED_OFFSET + _Q_LOCKED_BITS) //8 = 0 + 8,pending从8bit开始
    23. #if CONFIG_NR_CPUS < (1U << 14) //基本这个条件都成立
    24. #define _Q_PENDING_BITS 8 //pending占8位,8-15位
    25. #else
    26. #define _Q_PENDING_BITS 1
    27. #endif
    28. #define _Q_PENDING_MASK _Q_SET_MASK(PENDING)//8-15位全1
    29. #define _Q_TAIL_IDX_OFFSET (_Q_PENDING_OFFSET + _Q_PENDING_BITS) //16 = 8 + 8,tail idx从16bit开始
    30. #define _Q_TAIL_IDX_BITS 2 //idx所使用的位数,16-17bit
    31. #define _Q_TAIL_IDX_MASK _Q_SET_MASK(TAIL_IDX) //16-17位全1
    32. #define _Q_TAIL_CPU_OFFSET (_Q_TAIL_IDX_OFFSET + _Q_TAIL_IDX_BITS) // 18 = 16 + 2
    33. #define _Q_TAIL_CPU_BITS (32 - _Q_TAIL_CPU_OFFSET) // 14 = 32 - 18,表示用tail的18-31位表示cpu
    34. #define _Q_TAIL_CPU_MASK _Q_SET_MASK(TAIL_CPU)//18-31全为1
    35. #define _Q_TAIL_OFFSET _Q_TAIL_IDX_OFFSET //tail 从16bit开始
    36. #define _Q_TAIL_MASK (_Q_TAIL_IDX_MASK | _Q_TAIL_CPU_MASK)//16-31bit
    37. #define _Q_LOCKED_VAL (1U << _Q_LOCKED_OFFSET) //locked区域 + 1 值;
    38. #define _Q_PENDING_VAL (1U << _Q_PENDING_OFFSET) //pendging区域 + 1值;
    39. #endif /* __ASM_GENERIC_QSPINLOCK_TYPES_H */

    2、spin_lock的内核相关API的实现

    spin_lock的初始化:

            spin_lock_init是用来初始化我们自定义的自旋锁,该函数会根据是否开始PREEMPT_RT来确实初始化rt_mutex,还是raw_spinlock,这里我们先不考虑rt_mutex这种实时内核场景,所以最终会调用宏__SPIN_LOCK_UNLOCKED来初始化spinlock锁,非rt内核下,实际上初始化的是raw_spinlock锁,所以这里如果我们再rt场景下也需要使用spinlock而不是rt_mutex,则需要我们直接初始化raw_spinlock达到目的效果,例如可以在中断上下文使用;然后我们继续在__SPIN_LOCK_UNLOCKED宏实现的文件中,我们可以发现,也可以用宏DEFINE_SPINLOCK直接来定义自旋锁。

            自旋锁的初始化其实很简单,就是做个spinlock->raw_spinlock->arch_spinlock_t的转换,然后把arch_spinlock_t的val初始化为0;

    1. # define spin_lock_init(_lock) \
    2. do { \
    3. spinlock_check(_lock); \
    4. *(_lock) = __SPIN_LOCK_UNLOCKED(_lock); \
    5. } while (0)
    6. #define ___SPIN_LOCK_INITIALIZER(lockname) \
    7. { \
    8. .raw_lock = __ARCH_SPIN_LOCK_UNLOCKED, \
    9. SPIN_DEBUG_INIT(lockname) \
    10. SPIN_DEP_MAP_INIT(lockname) }
    11. #define __SPIN_LOCK_INITIALIZER(lockname) \
    12. { { .rlock = ___SPIN_LOCK_INITIALIZER(lockname) } }
    13. #define __SPIN_LOCK_UNLOCKED(lockname) \
    14. (spinlock_t) __SPIN_LOCK_INITIALIZER(lockname)
    15. #define DEFINE_SPINLOCK(x) spinlock_t x = __SPIN_LOCK_UNLOCKED(x)

    spin_lock各个获取锁接口的使用;

            spin_lock函数会关抢占并尝试获取自旋锁,单cpu情况下就退化为关抢占功能;使用该函数时
    需要考虑是否存在中断与线程共享数据的场景,为防止中断打断获取自旋锁,可能要考虑
    spin_lock_irq*之类的函数;

    1. static __always_inline void spin_lock(spinlock_t *lock)
    2. {
    3.         raw_spin_lock(&lock->rlock); 
    4. }

    spin_lock_bh该函数会关抢占,关软中断,然后尝试获取自旋锁,这里可以防止被抢占或被软中断抢占拿锁;

    1. static __always_inline void spin_lock_bh(spinlock_t *lock)
    2. {
    3.         raw_spin_lock_bh(&lock->rlock);
    4. }

    spin_lock_irq函数会关抢占,关本地中断,然后尝试获取自旋锁,防止被抢占或被中断抢占拿锁

    1. static __always_inline void spin_lock_irq(spinlock_t *lock)
    2. {
    3.         raw_spin_lock_irq(&lock->rlock);
    4. }

            如果是在中断处理流程用到获取自旋锁,该函数在上个函数的基础上保存了中断状态,
    以便恢复本地中断到之前的状态;如果确认获取锁之前本地中断是开启的,则可以使用
    spin_lock_irq函数;

    1. #define spin_lock_irqsave(lock, flags)                          \
    2. do {                                                            \
    3.         raw_spin_lock_irqsave(spinlock_check(lock), flags);     \
    4. } while (0)

    spinlock释放锁的接口:

            实际就是上述函数的反向操作,解锁 + (开抢占(开中断));解锁的操作其实也很简单,就是设置lock->locked = 0 就行了;不用其余操作,别人一直在盯着他为0了。

    1. static __always_inline void spin_unlock(spinlock_t *lock)
    2. {
    3. raw_spin_unlock(&lock->rlock);
    4. }
    5. static __always_inline void spin_unlock_bh(spinlock_t *lock)
    6. {
    7. raw_spin_unlock_bh(&lock->rlock);
    8. }
    9. static __always_inline void spin_unlock_irq(spinlock_t *lock)
    10. {
    11. raw_spin_unlock_irq(&lock->rlock);
    12. }
    13. static __always_inline void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)
    14. {
    15. raw_spin_unlock_irqrestore(&lock->rlock, flags);
    16. }
    17. static __always_inline int spin_trylock_bh(spinlock_t *lock)
    18. {
    19. return raw_spin_trylock_bh(&lock->rlock);
    20. }

    拿锁接口具体实现细节:

    接下来会调用raw_*函数,然后调用宏_raw_*,最后调用__raw_*函数的一层一层的封装,接下来主要看下__raw_** 函数;

    1. static inline unsigned long __raw_spin_lock_irqsave(raw_spinlock_t *lock)
    2. {
    3. unsigned long flags;
    4. local_irq_save(flags);//关本地中断,并保存中断状态
    5. preempt_disable();//关抢占,受CONFIG_PREEMPT_COUNT和CONFIG_PREEMPT开关而不同
    6. spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    7. /*
    8. * On lockdep we dont want the hand-coded irq-enable of
    9. * do_raw_spin_lock_flags() code, because lockdep assumes
    10. * that interrupts are not re-enabled during lock-acquire:
    11. */
    12. #ifdef CONFIG_LOCKDEP
    13. LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
    14. #else
    15. do_raw_spin_lock_flags(lock, &flags);
    16. #endif
    17. return flags;
    18. }
    19. static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
    20. {
    21. local_irq_disable();//关本地中断,不保存中断状态
    22. preempt_disable(); //关抢占
    23. spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    24. LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);/*这个是拿
    25. 锁的核心流程,首先通过do_raw_spin_trylock函数打头阵,如果没有人持锁,就实现快速拿锁,
    26. 否则就需要do_raw_spin_lock去自旋等锁了。*/
    27. }
    28. static inline void __raw_spin_lock_bh(raw_spinlock_t *lock)
    29. {
    30. __local_bh_disable_ip(_RET_IP_, SOFTIRQ_LOCK_OFFSET);/*禁止中断
    31. 下半部(软中断)执行*/
    32. spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    33. LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
    34. }
    35. static inline void __raw_spin_lock(raw_spinlock_t *lock)
    36. {
    37. preempt_disable();//关抢占
    38. spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);/*这段代码使用来调试使用的,
    39. 需要开启CONFIG_DEBUG_LOCK_ALLOC配置才有用*/
    40. LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
    41. }

    __raw_*类函数的关键实现在于和架构相关的二个函数:do_raw_spin_trylock, do_raw_spin_lock,其中他们分别会调用与架构相关的二个函数:arch_spin_trylock和arch_spin_lock函数,首先通过do_raw_spin_trylock函数尝试快速拿锁,如果无人持有锁,就会从快路径拿锁成功,否则就需要do_raw_spin_lock去慢路径自旋等锁了;

    对于x86、ARM64而言,spinlock被转换成了qspinlock,具体的原因,参照博文:spinlock前世今生

    具体实现看qspinlock的实现细节,有点多,300多行代码保姆级注释,大家耐心看完,会有收获!

    1. /*
    2.  * Remapping spinlock architecture specific functions to the corresponding
    3.  * queued spinlock functions.
    4. 看这里,x86/ARM64 spinlock相关的函数被定义到qspinlock相关的函数了。
    5.  */
    6. #define arch_spin_lock(l)        queued_spin_lock(l)
    7. #define arch_spin_trylock(l)        queued_spin_trylock(l)
    8. #ifndef queued_spin_lock
    9. /**
    10. * queued_spin_lock - acquire a queued spinlock
    11. * @lock: Pointer to queued spinlock structure
    12. */
    13. static __always_inline void queued_spin_lock(struct qspinlock *lock)
    14. {
    15. int val = 0;
    16. /*atomic_cmpxchg_acquire 比较lock->val与val=0, 如果相等(返回true),
    17. 即lock->value=0,表示无人持有锁,则设置lock->value的联合体成员locked为1,
    18. 表示拿锁,即lock->val = _Q_LOCKED_VAL,并设置val为旧值lock->value=0,否
    19. 则value获得lock->value!=0,lock->val是联合体的所有成员的结合,该函数返回false
    20. 进去慢路径流程*/
    21. if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))
    22. return;
    23. queued_spin_lock_slowpath(lock, val);//这里就是直接获取锁失败的情况,需要自旋等待了。
    24. }
    25. #endif
    26. /**
    27. * queued_spin_lock_slowpath - acquire the queued spinlock
    28. * @lock: Pointer to queued spinlock structure
    29. * @val: Current value of the queued spinlock 32-bit word //qspinlock value值
    30. *
    31. * (queue tail, pending bit, lock value) 下面括号中数值对应的含义
    32. *
    33. * fast : slow : unlock
    34. * : :
    35. * uncontended (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0)
    36. * : | ^--------.------. / :
    37. * : v \ \ | :
    38. * pending : (0,1,1) +--> (0,1,0) \ | :
    39. * : | ^--' | | :
    40. * : v | | :
    41. * uncontended : (n,x,y) +--> (n,0,0) --' | :
    42. * queue : | ^--' | :
    43. * : v | :
    44. * contended : (*,x,y) +--> (*,0,0) ---> (*,0,1) -' :
    45. * queue : ^--' :
    46. */
    47. /*上面的注释表明了锁获取的几种情况
    48. 1、val为0,可以快速拿锁,处理完数据后释放锁
    49. 2、如果val不为0,则需要慢路径等待拿锁,这里分为以下几种情况:
    50. 1)(0,0,1) -> (0,1,1),有人持锁,此时只有我1个等待拿锁,
    51. 则会设置pending位,自旋后直到locked=0后获得锁,处理完数据后释放锁
    52. 2)(0,1,1) -> (n,x,y)如果有超过我1个cpu在等待,则需要通过mcs入队处理
    53. a)val=pending,即此时pending = 1,tail=0,locked=0,此时处于一个刚被解锁,
    54. 有1个等待持锁的交接期,此时需要持续读取val值,直到val!=pending(这里可以
    55. 是locked!=0或者tail!=0)
    56. b)如果存在pending!=0 或者 tail!=0,说明明显存在更早的竞争这,我们需要
    57. 入队排队等待
    58. c)如果tail=0, pending=0,locked != 0,表示有人拿锁,我是第一个等待者,这里会
    59. 设置pending,并拿到设置前的val,此时根据该val判断 是否可能存在竞争状态:
    60. c.1,如果旧值val的pending=0且tail!=0,表明有竞争者参与拿锁,被别人截胡了
    61. 则需要我们清除 设置的pending 位,然后进入排队等锁的流程
    62. c.2,如果旧值val依然pending=0,tail=0,locked!=0,表明没有竞争者,表明
    63. 只有我一个等锁,则此时则自旋等待locked被清除,然后正式拿锁,清除pending位
    64. 并设置locked=1,此时表明拿锁成功,拿锁流程结束;
    65. 3) queue: 如果val!=0,并且tail!=0,则需要跳转到入队处理流程:
    66. a)通过qnodes[0].count++,记录当前cpu上等spinlock锁的数量,并根据当前拿
    67. 锁的cpu和idx,更新tail值
    68. b)通过当前拿锁的idx获取mcs spinlock锁节点node=qnodes[idx].mcs
    69. c)初始化node节点,locked=0, next=NULL(表示会插入链表尾部)
    70. d)更新lock->tail值,返回旧的tail值,并根据旧的tail值确定是否可能存在竞态
    71. d.1,如果旧的tail!=0,则会根据cpu、idx解码从旧tail取出等待队列的尾节点prev
    72. 并把node链接到prev节点之后,开始本cpu自旋等待前一个节点对我们的node.locked=1
    73. 进行置位,通知该我们拿锁了,然 后进入到下面流程
    74. d.2,如果旧的tail==0,或者经过上面的自旋等待被通知该我了,然后我们
    75. 自旋等待直到pending==0 或者lock->val=0,并返回当前val = lock->val,
    76. 此时判断tail是否已经改变
    77. d.2.1,如果还是原来的tail,此时我们可以拿锁设置locked,并清空tail,结束
    78. 拿锁流程
    79. d.2.2,如果tail发生变化,表明我们后面还有等锁的,这时需要我们先拿锁,
    80. 然后作为前一个拿锁的设置后一个拿锁节点node的node->locked,通知他该拿
    81. 锁了,到此结束
    82. */
    83. void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
    84. {
    85. struct mcs_spinlock *prev, *next, *node;
    86. u32 old, tail;
    87. int idx;
    88. /* cpu数量不能大于 */
    89. BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
    90. if (pv_enabled())
    91. goto pv_queue;
    92. if (virt_spin_lock(lock))
    93. return;
    94. /*
    95. * Wait for in-progress pending->locked hand-overs with a bounded
    96. * number of spins so that we guarantee forward progress.
    97. *
    98. * 0,1,0 -> 0,0,1
    99. * val 由tail(cpu idx)、pending、locked组成
    100. */
    101. /* val == pending,意味locked=0, tail=0,但pending=1,表示虽然已经
    102. unlock了锁,但是已经有人在排队,此时需要等待这个状态变化*/
    103. if (val == _Q_PENDING_VAL) {
    104. int cnt = _Q_PENDING_LOOPS;//cnt == 1;
    105. /*循环读取lock->val,直到脱离这个状态,可以是pending位被清零,
    106. 也可是locked被置位,也可以是等待队列中出现成员,这里注意VAL*/
    107. val = atomic_cond_read_relaxed(&lock->val,
    108. (VAL != _Q_PENDING_VAL) || !cnt--);
    109. }
    110. /*
    111. * If we observe any contention; queue.
    112. */
    113. /*
    114. 如果tail或者pending非0,则说明有其他的竞争者
    115. 此时我们需要加入等待队列了,则跳转到需要加入等待队列的情况
    116. */
    117. if (val & ~_Q_LOCKED_MASK)
    118. goto queue;
    119. /*
    120. * trylock || pending
    121. *
    122. * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
    123. *
    124. * 走到这里,就是tail=0, pending=0,locked != 0,表示有人拿着锁
    125. 此时我们就是第一个等待者,设置pending位(持有pending位),
    126. val为返回锁的旧值
    127. */
    128. val = queued_fetch_set_pending_acquire(lock);
    129. /*
    130. * If we observe contention, there is a concurrent locker.
    131. *
    132. * Undo and queue; our setting of PENDING might have made the
    133. * n,0,0 -> 0,0,0 transition fail and it will now be waiting
    134. * on @next to become !NULL.
    135. */
    136. /* 如果我们设置了pending位之前,从锁的旧值知道有其他CPU进入
    137. 了等待队列, 即tail发生了改变,存在竞争情况
    138. * 清除设置的pending位,进入加入等待队列的流程
    139. */
    140. if (unlikely(val & ~_Q_LOCKED_MASK)) {
    141. /* Undo PENDING if we set it. */
    142. if (!(val & _Q_PENDING_MASK))
    143. clear_pending(lock);
    144. goto queue;
    145. }
    146. /*
    147. * We're pending, wait for the owner to go away.
    148. *
    149. * 0,1,1 -> 0,1,0
    150. *
    151. * this wait loop must be a load-acquire such that we match the
    152. * store-release that clears the locked bit and create lock
    153. * sequentiality; this is because not all
    154. * clear_pending_set_locked() implementations imply full
    155. * barriers.
    156. */
    157. /* 到这,我们持有了pending位,并且tail位0,即等待队列为空
    158. 我们通过atomic_cond_read_acquire,不停的读取lock->val,
    159. 直到locked位被清零,即在此处spin等待locked位
    160. */
    161. if (val & _Q_LOCKED_MASK)
    162. atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));
    163. /*
    164. * take ownership and clear the pending bit.
    165. *
    166. * 0,1,0 -> 0,0,1
    167. */
    168. /*到这里我们要正式拿锁,清除pending位,表明我们不需要再等待,
    169. 设置locked位持锁,正式拿到锁
    170. */
    171. clear_pending_set_locked(lock);
    172. lockevent_inc(lock_pending);
    173. return;
    174. /*
    175. * End of pending bit optimistic spinning and beginning of MCS
    176. * queuing.
    177. */
    178. queue:
    179. lockevent_inc(lock_slowpath);
    180. pv_queue:
    181. node = this_cpu_ptr(&qnodes[0].mcs);
    182. /*将每CPU的mcs_nodes[0].count作为计数器,
    183. 记录此CPU加入了多少spinlock等待者到等待队列*/
    184. idx = node->count++;
    185. /*通过cpu编号和idx给tail赋值,实际上是对
    186. 当前待入队的spinlock所在cpu和所在队列(数组下标idx)进行编码*/
    187. tail = encode_tail(smp_processor_id(), idx);
    188. /*
    189. * 4 nodes are allocated based on the assumption that there will
    190. * not be nested NMIs taking spinlocks. That may not be true in
    191. * some architectures even though the chance of needing more than
    192. * 4 nodes will still be extremely unlikely. When that happens,
    193. * we fall back to spinning on the lock directly without using
    194. * any MCS node. This is not the most elegant solution, but is
    195. * simple enough.
    196. */
    197. /*虽然不可能,但如果发生nmi嵌套拿锁,idx>=4, 那么就不用mcs机制,
    198. 直接等待拿锁成功为止来解决*/
    199. if (unlikely(idx >= MAX_NODES)) {
    200. lockevent_inc(lock_no_node);
    201. while (!queued_spin_trylock(lock))//死循环等待拿锁成功
    202. cpu_relax();//barrier + 短暂让出cpu
    203. goto release;
    204. }
    205. //获取当前cpu的qnodes[idx].mcs作为我们待入队的spinlock节点
    206. node = grab_mcs_node(node, idx);
    207. /*
    208. * Keep counts of non-zero index values:
    209. */
    210. lockevent_cond_inc(lock_use_node2 + idx - 1, idx);
    211. /*
    212. * Ensure that we increment the head node->count before initialising
    213. * the actual node. If the compiler is kind enough to reorder these
    214. * stores, then an IRQ could overwrite osur assignments.
    215. */
    216. barrier();
    217. //初始化待入队节点mcs_spinlock *node;
    218. node->locked = 0;
    219. node->next = NULL;//node放到链表位
    220. //通过node初始化pv_node
    221. pv_init_node(node);
    222. /*
    223. * We touched a (possibly) cold cacheline in the per-cpu queue node;
    224. * attempt the trylock once more in the hope someone let go while we
    225. * weren't watching.
    226. */
    227. //前面一些操作后,再次尝试拿锁,如果拿成功,需要release流程清除等锁数量
    228. if (queued_spin_trylock(lock))
    229. goto release;
    230. /*
    231. * Ensure that the initialisation of @node is complete before we
    232. * publish the updated tail via xchg_tail() and potentially link
    233. * @node into the waitqueue via WRITE_ONCE(prev->next, node) below.
    234. */
    235. smp_wmb();
    236. /*
    237. * Publish the updated tail.
    238. * We have already touched the queueing cacheline; don't bother with
    239. * pending stuff.
    240. *
    241. * p,*,* -> n,*,*
    242. */
    243. //lock->tail设置为我们生成的新的tail,old为返回的旧的tail
    244. old = xchg_tail(lock, tail);
    245. next = NULL;
    246. /*
    247. * if there was a previous node; link it and wait until reaching the
    248. * head of the waitqueue.
    249. */
    250. //如果前一个tail不为0,即队列还有其余的spinlock等待者
    251. if (old & _Q_TAIL_MASK) {
    252. //解码旧的tail -> cpu 、idx,通过cpu,idx,得到链表尾节点mcs_spinlock *prev
    253. prev = decode_tail(old);
    254. /* Link @node into the waitqueue. */
    255. //node加入到等待队列
    256. WRITE_ONCE(prev->next, node);
    257. pv_wait_node(node, prev);
    258. /*加入等待队列并且非链表头的节点在自己的CPU副本上自旋,等待
    259. node的locked成员被前一个等锁者置位,即通知该我们拿锁了
    260. */
    261. arch_mcs_spin_lock_contended(&node->locked);
    262. /*
    263. * While waiting for the MCS lock, the next pointer may have
    264. * been set by another lock waiter. We optimistically load
    265. * the next pointer & prefetch the cacheline for writing
    266. * to reduce latency in the upcoming MCS unlock operation.
    267. */
    268. /*到达此处时,我们自己的node->locked被其他人置位了,这是
    269. 通知我们现在我们是等待链表的第一个了
    270. */
    271. next = READ_ONCE(node->next);
    272. /*我们现在还是不是尾结点?不是的话提前加载下一个节点node,
    273. 后面需要操作下一节点的数据的,这里是优化处理
    274. */
    275. if (next)
    276. prefetchw(next);//预取下一个链表节点
    277. }
    278. /*
    279. * we're at the head of the waitqueue, wait for the owner & pending to
    280. * go away.
    281. *
    282. * *,x,y -> *,0,0
    283. *
    284. * this wait loop must use a load-acquire such that we match the
    285. * store-release that clears the locked bit and create lock
    286. * sequentiality; this is because the set_locked() function below
    287. * does not imply a full barrier.
    288. *
    289. * The PV pv_wait_head_or_lock function, if active, will acquire
    290. * the lock and return a non-zero value. So we have to skip the
    291. * atomic_cond_read_acquire() call. As the next PV queue head hasn't
    292. * been designated yet, there is no way for the locked value to become
    293. * _Q_SLOW_VAL. So both the set_locked() and the
    294. * atomic_cmpxchg_relaxed() calls will be safe.
    295. *
    296. * If PV isn't active, 0 will be returned instead.
    297. *
    298. */
    299. if (val = pv_wait_head_or_lock(lock, node))
    300. goto locked;
    301. /*到达此处时,可能是等待链表为空,因此我们就是第一个节点
    302. 也可能是等待node->locked被置位,这是前一个节点
    303. 告诉我们现在我们是等待链表第一个节点了
    304. 此时,我们自旋等待lock的pending位和locked位全部被清零
    305. atomic_cond_read_acquire在lock中spin等待pending位或lock->val位被清零,这里
    306. 显然需要pending=0、locked为0才能成立
    307. */
    308. val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));
    309. locked:
    310. /*
    311. * claim the lock:
    312. *
    313. * n,0,0 -> 0,0,1 : lock, uncontended
    314. * *,*,0 -> *,*,1 : lock, contended
    315. * 现在,lock中pending位和locked位全部被清零
    316. * 我们是等待队列的第一个节点,该我们持锁了
    317. * If the queue head is the only one in the queue (lock value == tail)
    318. * and nobody is pending, clear the tail code and grab the lock.
    319. * Otherwise, we only need to grab the lock.
    320. */
    321. /*
    322. * In the PV case we might already have _Q_LOCKED_VAL set, because
    323. * of lock stealing; therefore we must also allow:
    324. *
    325. * n,0,1 -> 0,0,1
    326. *
    327. * Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the
    328. * above wait condition, therefore any concurrent setting of
    329. * PENDING will make the uncontended transition fail.
    330. *
    331. *如果tail还是我们设置的,说明我们同时是等待队列的最后一个节点,
    332. 后面没人了,设置lock->val = 1(_Q_LOCKED_VAL),这是设置locked位,
    333. 同时清零tail,因为我们是最后一个,等我们持锁等待队列就为空了
    334. */
    335. if ((val & _Q_TAIL_MASK) == tail) {
    336. if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
    337. goto release; /* No contention */
    338. }
    339. /*
    340. * Either somebody is queued behind us or _Q_PENDING_VAL got set
    341. * which will then detect the remaining tail and queue behind us
    342. * ensuring we'll see a @next.
    343. */
    344. /* 如果tail发生了改变,说明后面有新的节点,这里先持锁 lock->locked = 1 */
    345. set_locked(lock);
    346. /*
    347. * contended path; wait for next if not observed yet, release.
    348. */
    349. /*读取下一节点
    350. if (!next)
    351. next = smp_cond_load_relaxed(&node->next, (VAL));
    352. /* 还记得上面我们在等待队列中等待自己node->locked被置位不?
    353. * 这个是谁置位的?答案就在这里,上一个节点持锁完成之后干的
    354. * 这里就是将下一个节点的node->locked设置为1,通知他,
    355. * 你是等待队列的第一个了
    356. */
    357. arch_mcs_spin_unlock_contended(&next->locked);
    358. pv_kick_node(lock, next);
    359. release:
    360. /*
    361. * 现在我们离开了等待队列,mcs_nodes[0].count用来表示
    362. 我们在多少个spinlock的等待队列里面,现在该把它-1了
    363. */
    364. /*
    365. * release the node
    366. */
    367. __this_cpu_dec(qnodes[0].mcs.count);
    368. }
    369. EXPORT_SYMBOL(queued_spin_lock_slowpath);

    对于ARM32而言:

    1. static inline void arch_spin_lock(arch_spinlock_t *lock)
    2. {
    3. unsigned long tmp;
    4. u32 newval;
    5. arch_spinlock_t lockval;
    6. prefetchw(&lock->slock); //gcc 内置预取指令,指定读取到最近的缓存以加速执行
    7. __asm__ __volatile__(
    8. "1: ldrex %0, [%3]\n" // lockval = &lock->slock,并设置&lock->lock为独占访问
    9. " add %1, %0, %4\n" /* newval = lockval + 1 << 16,
    10. 16bit位于next成员,等于 lockval.tickets.next +1;*/
    11. " strex %2, %1, [%3]\n" /* 如果内存独占,则更新内存lock->slock = newval,并设置tmp=0,
    12. 并清除独占标记,否则tmp=1 */
    13. " teq %2, #0\n " //tmp == 0
    14. " bne 1b" //以上测试不成立,则跳到标号 1 处从头执行,否则继续向下执行
    15. : "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
    16. : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
    17. : "cc");
    18. /* 以上next已经进行了+1操作,如果锁还没有释放,即owner还没进行 +1 操作前,先判断
    19. lockval.tickets.next 是否等于 lockval.tickets.owner,不相等时,调用 wfe 指令进入
    20. idle 状态,等待 CPU event,被唤醒后继续判断锁变量是否相等;要完整地理解加锁过程,就
    21. 必须要提到解锁,因为这两者是相对的,解锁的实现很简单:就是将 spinlock 结构体中的 owner
    22. 进行 +1 操作,因此,当一个 spinlock初始化时,next 和 onwer 都为 0。某个执行流 A 获得锁,
    23. next + 1,此时在其它执行流 B 眼中,next != owner,所以 B 等待。当 A 调用 spin_unlock时,
    24. owner + 1*/
    25. while (lockval.tickets.next != lockval.tickets.owner) {
    26. wfe(); //执行WFE指令,让core进入low-power state,释放spinlock时被SEV指令唤醒
    27. lockval.tickets.owner = READ_ONCE(lock->tickets.owner);
    28. }
    29. smp_mb();
    30. }
    31. static inline int arch_spin_trylock(arch_spinlock_t *lock)
    32. {
    33. unsigned long contended, res;
    34. u32 slock;
    35. prefetchw(&lock->slock); //gcc 内置预取指令,指定读取到最近的缓存以加速执行
    36. do {
    37. __asm__ __volatile__(
    38. " ldrex %0, [%3]\n" // slock = &lock->slock,并设置&lock->slock为独占访问
    39. " mov %2, #0\n" //res = 0
    40. " subs %1, %0, %0, ror #16\n" /*16bit位于next成员,等于
    41. contended = lockval.tickets.next - 1,溢出借位CF=1,否则CF=0*/
    42. " addeq %0, %0, %4\n" /* 如果CF=1,则slock = slock + 1 << 16,否则执行
    43. 下下一步*/
    44. " strexeq %2, %0, [%3]" /*如果内存独占,更新内存&lock->slock = slock,
    45. 并设置res = 0, 同时清除独占标志,这里给锁next+1,此时相当于拿锁了;否则设置res=1,这种
    46. 情况表示别人已经标记为独占在拿锁了*/
    47. : "=&r" (slock), "=&r" (contended), "=&r" (res)
    48. : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
    49. : "cc");
    50. } while (res);
    51. if (!contended) { // contended = 0, 则快速获取锁成功,否则失败
    52. smp_mb();
    53. return 1;
    54. } else {
    55. return 0;
    56. }
    57. }

  • 相关阅读:
    你还不知道责任链模式的使用场景吗?
    MySQL·SQL优化
    关于一个git的更新使用流程
    Windows系统部署Serva PXE网络引导安装操作系统
    ComplexHeatmap | 你的热图注释还挤在一起看不清吗!?
    代码随想录 Day38 完全背包问题 LeetCode T70 爬楼梯 T322 零钱兑换 T279 完全平方数
    spring boot + minio 8.5.4 遇到 okhttp3包冲突
    二本Java渣渣9面字节遭虐,苦修数月深造这份宝典,终进阿里
    java blob 转文件
    数据库备份与恢复(实战mysqldump+bin-log)
  • 原文地址:https://blog.csdn.net/hzj_001/article/details/125860225