kernel 5.12
自旋锁由数据结构spinlock_t来表示,具体的定义如下:
- typedef struct spinlock {
- union {
- struct raw_spinlock rlock; //自旋锁的核心成员是和raw_spinlock锁。
-
- #ifdef CONFIG_DEBUG_LOCK_ALLOC //如果打开次配置,会增加调试信息存储
- # define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
- struct {
- u8 __padding[LOCK_PADSIZE];
- struct lockdep_map dep_map;
- };
- #endif
- };
- } spinlock_t;
-
- //这里如果打开的CONFIG_PREEMPT_RT开始,spinlock会被转化成rt_mutex,实时内核需要注意这里会睡眠
- /* PREEMPT_RT kernels map spinlock to rt_mutex */
- #include
- typedef struct spinlock {
- struct rt_mutex_base lock;
- #ifdef CONFIG_DEBUG_LOCK_ALLOC
- struct lockdep_map dep_map;
- #endif
- } spinlock_t;
-
-
-
- typedef struct raw_spinlock {
- arch_spinlock_t raw_lock; /*该成员变量是自旋锁数据类型的核心,
- 它展开后实质上是一个Volatile unsigned类型的变量。具体的锁定过程与它密切
- 相关,该变量依赖于内核选项CONFIG_SMP*/
- #ifdef CONFIG_DEBUG_SPINLOCK
- unsigned int magic, owner_cpu; //所拥有者所在的CPU
- void *owner; //锁的持有者
- #endif
- #ifdef CONFIG_DEBUG_LOCK_ALLOC
- struct lockdep_map dep_map; //调试使用,定义一个锁对象
- #endif
- } raw_spinlock_t;
-
- typedef struct { volatile int counter; } atomic_t;
- /*x86/arm64 arch_spinlock_t数据结构,val是一个32位原子类型整数;
- 与二个16位的locked_pending(0-15) tail(16-31)共享内存;同时
- 与四个8位的locked(0-7), pending(8-15),2位idx + 14位cpu共享内存,
- 函数中具体通过掩码和位偏移来确定,可以看下文掩码相关详解*/
-
- | cpu |idx|pending locked |-->小端
- | tail |locked_pending |
- | val |
- 31---------------15--------------0
- typedef struct qspinlock {
- union {
- atomic_t val;
-
- /*
- * By using the whole 2nd least significant byte for the
- * pending bit, we can allow better optimization of the lock
- * acquisition for the pending bit holder.
- */
- #ifdef __LITTLE_ENDIAN
- struct {
- u8 locked; //可以理解为最优先持锁标志,即当unlock之后只有这个位的CPU最先持锁,也只会有1和0
- u8 pending; //用来表示这个锁是否被人持有,只会有1和0两种情况,即1被人持有,0无人持锁
- };
- struct {
- u16 locked_pending;//由locked 和 pending构成
- u16 tail;//由idx CPU构成,用来标识等待队列最后一个节点
- };
- #else
- struct {
- u16 tail;
- u16 locked_pending;
- };
- struct {
- u8 reserved[2];
- u8 pending;
- u8 locked;
- };
- #endif
- };
- } arch_spinlock_t;
-
-
- /*arm arch_spinlock_t 数据结构slock是一个32位无符号整数,与无符号16位整数owner
- 和 next共享内存空间,owner占低16位(0-15),next占高16(16-31)数据分布*/
- | next | owner |
- | slock |
- ----------------15---------------
- typedef struct {
- union {
- u32 slock;
- struct __raw_tickets {
- #ifdef __ARMEB__
- u16 next;
- u16 owner;
- #else
- u16 owner;
- u16 next;
- #endif
- } tickets;
- };
- } arch_spinlock_t;
还有个重要的锁队列辅助结构需要重要讲一下。
内核定义了个每cpu变量的qnodes数组,数组成员max_nodes=4个,其是为了表达每个cpu上最多等待包含进程、软中断、硬中断、nmi上下文的4个spinlock锁;数组成员为qnode,他是对mcs_spinlock结构体的一个封装,mcs_spinlock结构体里面包含一个链表成员指针,一个持锁标记locked成员和等锁成员数量count。
其实他就是每个cpu上维护了四个等待spinlock锁的队列。
- struct mcs_spinlock {
- struct mcs_spinlock *next; //单链表
- int locked; /* 1 if lock acquired 如果锁被持有,则该值为1*/
- int count; /* nesting count, see qspinlock.c 嵌套数*/
- };
-
- struct qnode {
- struct mcs_spinlock mcs;
- #ifdef CONFIG_PARAVIRT_SPINLOCKS
- long reserved[2];
- #endif
- };
-
- static DEFINE_PER_CPU_ALIGNED(struct qnode, qnodes[MAX_NODES]);
再补充个lock->val各个字段的掩码,用来具体的区分tail、pengding、locked字段
- /*
- * Bitfields in the atomic value:
- *
- * When NR_CPUS < 16K //超过16Kcpu的机器很少见吧,基本都这种布局。
- * 0- 7: locked byte
- * 8: pending
- * 9-15: not used
- * 16-17: tail index
- * 18-31: tail cpu (+1)
- *
- * When NR_CPUS >= 16K
- * 0- 7: locked byte
- * 8: pending
- * 9-10: tail index
- * 11-31: tail cpu (+1)
- */
- #define _Q_SET_MASK(type) (((1U << _Q_ ## type ## _BITS) - 1)\
- << _Q_ ## type ## _OFFSET)
- #define _Q_LOCKED_OFFSET 0
- #define _Q_LOCKED_BITS 8//locked 占8位 0-7
- #define _Q_LOCKED_MASK _Q_SET_MASK(LOCKED)//0-7位全1
-
- #define _Q_PENDING_OFFSET (_Q_LOCKED_OFFSET + _Q_LOCKED_BITS) //8 = 0 + 8,pending从8bit开始
- #if CONFIG_NR_CPUS < (1U << 14) //基本这个条件都成立
- #define _Q_PENDING_BITS 8 //pending占8位,8-15位
- #else
- #define _Q_PENDING_BITS 1
- #endif
- #define _Q_PENDING_MASK _Q_SET_MASK(PENDING)//8-15位全1
-
- #define _Q_TAIL_IDX_OFFSET (_Q_PENDING_OFFSET + _Q_PENDING_BITS) //16 = 8 + 8,tail idx从16bit开始
- #define _Q_TAIL_IDX_BITS 2 //idx所使用的位数,16-17bit
- #define _Q_TAIL_IDX_MASK _Q_SET_MASK(TAIL_IDX) //16-17位全1
-
- #define _Q_TAIL_CPU_OFFSET (_Q_TAIL_IDX_OFFSET + _Q_TAIL_IDX_BITS) // 18 = 16 + 2
- #define _Q_TAIL_CPU_BITS (32 - _Q_TAIL_CPU_OFFSET) // 14 = 32 - 18,表示用tail的18-31位表示cpu
- #define _Q_TAIL_CPU_MASK _Q_SET_MASK(TAIL_CPU)//18-31全为1
-
- #define _Q_TAIL_OFFSET _Q_TAIL_IDX_OFFSET //tail 从16bit开始
- #define _Q_TAIL_MASK (_Q_TAIL_IDX_MASK | _Q_TAIL_CPU_MASK)//16-31bit
-
- #define _Q_LOCKED_VAL (1U << _Q_LOCKED_OFFSET) //locked区域 + 1 值;
- #define _Q_PENDING_VAL (1U << _Q_PENDING_OFFSET) //pendging区域 + 1值;
-
- #endif /* __ASM_GENERIC_QSPINLOCK_TYPES_H */
spin_lock的初始化:
spin_lock_init是用来初始化我们自定义的自旋锁,该函数会根据是否开始PREEMPT_RT来确实初始化rt_mutex,还是raw_spinlock,这里我们先不考虑rt_mutex这种实时内核场景,所以最终会调用宏__SPIN_LOCK_UNLOCKED来初始化spinlock锁,非rt内核下,实际上初始化的是raw_spinlock锁,所以这里如果我们再rt场景下也需要使用spinlock而不是rt_mutex,则需要我们直接初始化raw_spinlock达到目的效果,例如可以在中断上下文使用;然后我们继续在__SPIN_LOCK_UNLOCKED宏实现的文件中,我们可以发现,也可以用宏DEFINE_SPINLOCK直接来定义自旋锁。
自旋锁的初始化其实很简单,就是做个spinlock->raw_spinlock->arch_spinlock_t的转换,然后把arch_spinlock_t的val初始化为0;
- # define spin_lock_init(_lock) \
- do { \
- spinlock_check(_lock); \
- *(_lock) = __SPIN_LOCK_UNLOCKED(_lock); \
- } while (0)
-
-
- #define ___SPIN_LOCK_INITIALIZER(lockname) \
- { \
- .raw_lock = __ARCH_SPIN_LOCK_UNLOCKED, \
- SPIN_DEBUG_INIT(lockname) \
- SPIN_DEP_MAP_INIT(lockname) }
-
- #define __SPIN_LOCK_INITIALIZER(lockname) \
- { { .rlock = ___SPIN_LOCK_INITIALIZER(lockname) } }
-
- #define __SPIN_LOCK_UNLOCKED(lockname) \
- (spinlock_t) __SPIN_LOCK_INITIALIZER(lockname)
-
- #define DEFINE_SPINLOCK(x) spinlock_t x = __SPIN_LOCK_UNLOCKED(x)
spin_lock各个获取锁接口的使用;
spin_lock函数会关抢占并尝试获取自旋锁,单cpu情况下就退化为关抢占功能;使用该函数时
需要考虑是否存在中断与线程共享数据的场景,为防止中断打断获取自旋锁,可能要考虑
spin_lock_irq*之类的函数;
- static __always_inline void spin_lock(spinlock_t *lock)
- {
- raw_spin_lock(&lock->rlock);
- }
spin_lock_bh该函数会关抢占,关软中断,然后尝试获取自旋锁,这里可以防止被抢占或被软中断抢占拿锁;
- static __always_inline void spin_lock_bh(spinlock_t *lock)
- {
- raw_spin_lock_bh(&lock->rlock);
- }
spin_lock_irq函数会关抢占,关本地中断,然后尝试获取自旋锁,防止被抢占或被中断抢占拿锁
- static __always_inline void spin_lock_irq(spinlock_t *lock)
- {
- raw_spin_lock_irq(&lock->rlock);
- }
如果是在中断处理流程用到获取自旋锁,该函数在上个函数的基础上保存了中断状态,
以便恢复本地中断到之前的状态;如果确认获取锁之前本地中断是开启的,则可以使用
spin_lock_irq函数;
- #define spin_lock_irqsave(lock, flags) \
- do { \
- raw_spin_lock_irqsave(spinlock_check(lock), flags); \
- } while (0)
spinlock释放锁的接口:
实际就是上述函数的反向操作,解锁 + (开抢占(开中断));解锁的操作其实也很简单,就是设置lock->locked = 0 就行了;不用其余操作,别人一直在盯着他为0了。
- static __always_inline void spin_unlock(spinlock_t *lock)
- {
- raw_spin_unlock(&lock->rlock);
- }
-
- static __always_inline void spin_unlock_bh(spinlock_t *lock)
- {
- raw_spin_unlock_bh(&lock->rlock);
- }
-
- static __always_inline void spin_unlock_irq(spinlock_t *lock)
- {
- raw_spin_unlock_irq(&lock->rlock);
- }
-
- static __always_inline void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)
- {
- raw_spin_unlock_irqrestore(&lock->rlock, flags);
- }
-
- static __always_inline int spin_trylock_bh(spinlock_t *lock)
- {
- return raw_spin_trylock_bh(&lock->rlock);
- }
拿锁接口具体实现细节:
接下来会调用raw_*函数,然后调用宏_raw_*,最后调用__raw_*函数的一层一层的封装,接下来主要看下__raw_** 函数;
- static inline unsigned long __raw_spin_lock_irqsave(raw_spinlock_t *lock)
- {
- unsigned long flags;
-
- local_irq_save(flags);//关本地中断,并保存中断状态
- preempt_disable();//关抢占,受CONFIG_PREEMPT_COUNT和CONFIG_PREEMPT开关而不同
- spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
- /*
- * On lockdep we dont want the hand-coded irq-enable of
- * do_raw_spin_lock_flags() code, because lockdep assumes
- * that interrupts are not re-enabled during lock-acquire:
- */
- #ifdef CONFIG_LOCKDEP
- LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
- #else
- do_raw_spin_lock_flags(lock, &flags);
- #endif
- return flags;
- }
-
- static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
- {
- local_irq_disable();//关本地中断,不保存中断状态
- preempt_disable(); //关抢占
- spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
- LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);/*这个是拿
- 锁的核心流程,首先通过do_raw_spin_trylock函数打头阵,如果没有人持锁,就实现快速拿锁,
- 否则就需要do_raw_spin_lock去自旋等锁了。*/
- }
-
- static inline void __raw_spin_lock_bh(raw_spinlock_t *lock)
- {
- __local_bh_disable_ip(_RET_IP_, SOFTIRQ_LOCK_OFFSET);/*禁止中断
- 下半部(软中断)执行*/
- spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
- LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
- }
-
- static inline void __raw_spin_lock(raw_spinlock_t *lock)
- {
- preempt_disable();//关抢占
- spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);/*这段代码使用来调试使用的,
- 需要开启CONFIG_DEBUG_LOCK_ALLOC配置才有用*/
- LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
- }
__raw_*类函数的关键实现在于和架构相关的二个函数:do_raw_spin_trylock, do_raw_spin_lock,其中他们分别会调用与架构相关的二个函数:arch_spin_trylock和arch_spin_lock函数,首先通过do_raw_spin_trylock函数尝试快速拿锁,如果无人持有锁,就会从快路径拿锁成功,否则就需要do_raw_spin_lock去慢路径自旋等锁了;
对于x86、ARM64而言,spinlock被转换成了qspinlock,具体的原因,参照博文:spinlock前世今生
具体实现看qspinlock的实现细节,有点多,300多行代码保姆级注释,大家耐心看完,会有收获!
- /*
- * Remapping spinlock architecture specific functions to the corresponding
- * queued spinlock functions.
- 看这里,x86/ARM64 spinlock相关的函数被定义到qspinlock相关的函数了。
- */
- #define arch_spin_lock(l) queued_spin_lock(l)
- #define arch_spin_trylock(l) queued_spin_trylock(l)
-
-
- #ifndef queued_spin_lock
- /**
- * queued_spin_lock - acquire a queued spinlock
- * @lock: Pointer to queued spinlock structure
- */
- static __always_inline void queued_spin_lock(struct qspinlock *lock)
- {
- int val = 0;
-
- /*atomic_cmpxchg_acquire 比较lock->val与val=0, 如果相等(返回true),
- 即lock->value=0,表示无人持有锁,则设置lock->value的联合体成员locked为1,
- 表示拿锁,即lock->val = _Q_LOCKED_VAL,并设置val为旧值lock->value=0,否
- 则value获得lock->value!=0,lock->val是联合体的所有成员的结合,该函数返回false
- 进去慢路径流程*/
-
- if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))
- return;
-
- queued_spin_lock_slowpath(lock, val);//这里就是直接获取锁失败的情况,需要自旋等待了。
- }
- #endif
-
-
- /**
- * queued_spin_lock_slowpath - acquire the queued spinlock
- * @lock: Pointer to queued spinlock structure
- * @val: Current value of the queued spinlock 32-bit word //qspinlock value值
- *
- * (queue tail, pending bit, lock value) 下面括号中数值对应的含义
- *
- * fast : slow : unlock
- * : :
- * uncontended (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0)
- * : | ^--------.------. / :
- * : v \ \ | :
- * pending : (0,1,1) +--> (0,1,0) \ | :
- * : | ^--' | | :
- * : v | | :
- * uncontended : (n,x,y) +--> (n,0,0) --' | :
- * queue : | ^--' | :
- * : v | :
- * contended : (*,x,y) +--> (*,0,0) ---> (*,0,1) -' :
- * queue : ^--' :
- */
- /*上面的注释表明了锁获取的几种情况
- 1、val为0,可以快速拿锁,处理完数据后释放锁
- 2、如果val不为0,则需要慢路径等待拿锁,这里分为以下几种情况:
- 1)(0,0,1) -> (0,1,1),有人持锁,此时只有我1个等待拿锁,
- 则会设置pending位,自旋后直到locked=0后获得锁,处理完数据后释放锁
- 2)(0,1,1) -> (n,x,y)如果有超过我1个cpu在等待,则需要通过mcs入队处理
- a)val=pending,即此时pending = 1,tail=0,locked=0,此时处于一个刚被解锁,
- 有1个等待持锁的交接期,此时需要持续读取val值,直到val!=pending(这里可以
- 是locked!=0或者tail!=0)
- b)如果存在pending!=0 或者 tail!=0,说明明显存在更早的竞争这,我们需要
- 入队排队等待
- c)如果tail=0, pending=0,locked != 0,表示有人拿锁,我是第一个等待者,这里会
- 设置pending,并拿到设置前的val,此时根据该val判断 是否可能存在竞争状态:
- c.1,如果旧值val的pending=0且tail!=0,表明有竞争者参与拿锁,被别人截胡了
- 则需要我们清除 设置的pending 位,然后进入排队等锁的流程
- c.2,如果旧值val依然pending=0,tail=0,locked!=0,表明没有竞争者,表明
- 只有我一个等锁,则此时则自旋等待locked被清除,然后正式拿锁,清除pending位
- 并设置locked=1,此时表明拿锁成功,拿锁流程结束;
- 3) queue: 如果val!=0,并且tail!=0,则需要跳转到入队处理流程:
- a)通过qnodes[0].count++,记录当前cpu上等spinlock锁的数量,并根据当前拿
- 锁的cpu和idx,更新tail值
- b)通过当前拿锁的idx获取mcs spinlock锁节点node=qnodes[idx].mcs
- c)初始化node节点,locked=0, next=NULL(表示会插入链表尾部)
- d)更新lock->tail值,返回旧的tail值,并根据旧的tail值确定是否可能存在竞态
- d.1,如果旧的tail!=0,则会根据cpu、idx解码从旧tail取出等待队列的尾节点prev
- 并把node链接到prev节点之后,开始本cpu自旋等待前一个节点对我们的node.locked=1
- 进行置位,通知该我们拿锁了,然 后进入到下面流程
- d.2,如果旧的tail==0,或者经过上面的自旋等待被通知该我了,然后我们
- 自旋等待直到pending==0 或者lock->val=0,并返回当前val = lock->val,
- 此时判断tail是否已经改变
- d.2.1,如果还是原来的tail,此时我们可以拿锁设置locked,并清空tail,结束
- 拿锁流程
- d.2.2,如果tail发生变化,表明我们后面还有等锁的,这时需要我们先拿锁,
- 然后作为前一个拿锁的设置后一个拿锁节点node的node->locked,通知他该拿
- 锁了,到此结束
-
- */
-
- void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
- {
- struct mcs_spinlock *prev, *next, *node;
- u32 old, tail;
- int idx;
- /* cpu数量不能大于 */
- BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
-
- if (pv_enabled())
- goto pv_queue;
-
- if (virt_spin_lock(lock))
- return;
-
- /*
- * Wait for in-progress pending->locked hand-overs with a bounded
- * number of spins so that we guarantee forward progress.
- *
- * 0,1,0 -> 0,0,1
- * val 由tail(cpu idx)、pending、locked组成
- */
- /* val == pending,意味locked=0, tail=0,但pending=1,表示虽然已经
- unlock了锁,但是已经有人在排队,此时需要等待这个状态变化*/
- if (val == _Q_PENDING_VAL) {
- int cnt = _Q_PENDING_LOOPS;//cnt == 1;
- /*循环读取lock->val,直到脱离这个状态,可以是pending位被清零,
- 也可是locked被置位,也可以是等待队列中出现成员,这里注意VAL*/
- val = atomic_cond_read_relaxed(&lock->val,
- (VAL != _Q_PENDING_VAL) || !cnt--);
- }
-
- /*
- * If we observe any contention; queue.
- */
- /*
- 如果tail或者pending非0,则说明有其他的竞争者
- 此时我们需要加入等待队列了,则跳转到需要加入等待队列的情况
- */
- if (val & ~_Q_LOCKED_MASK)
- goto queue;
-
- /*
- * trylock || pending
- *
- * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
- *
- * 走到这里,就是tail=0, pending=0,locked != 0,表示有人拿着锁
- 此时我们就是第一个等待者,设置pending位(持有pending位),
- val为返回锁的旧值
- */
- val = queued_fetch_set_pending_acquire(lock);
-
- /*
- * If we observe contention, there is a concurrent locker.
- *
- * Undo and queue; our setting of PENDING might have made the
- * n,0,0 -> 0,0,0 transition fail and it will now be waiting
- * on @next to become !NULL.
- */
- /* 如果我们设置了pending位之前,从锁的旧值知道有其他CPU进入
- 了等待队列, 即tail发生了改变,存在竞争情况
- * 清除设置的pending位,进入加入等待队列的流程
- */
- if (unlikely(val & ~_Q_LOCKED_MASK)) {
-
- /* Undo PENDING if we set it. */
- if (!(val & _Q_PENDING_MASK))
- clear_pending(lock);
-
- goto queue;
- }
-
- /*
- * We're pending, wait for the owner to go away.
- *
- * 0,1,1 -> 0,1,0
- *
- * this wait loop must be a load-acquire such that we match the
- * store-release that clears the locked bit and create lock
- * sequentiality; this is because not all
- * clear_pending_set_locked() implementations imply full
- * barriers.
- */
- /* 到这,我们持有了pending位,并且tail位0,即等待队列为空
- 我们通过atomic_cond_read_acquire,不停的读取lock->val,
- 直到locked位被清零,即在此处spin等待locked位
- */
- if (val & _Q_LOCKED_MASK)
- atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));
-
- /*
- * take ownership and clear the pending bit.
- *
- * 0,1,0 -> 0,0,1
- */
- /*到这里我们要正式拿锁,清除pending位,表明我们不需要再等待,
- 设置locked位持锁,正式拿到锁
- */
- clear_pending_set_locked(lock);
- lockevent_inc(lock_pending);
- return;
-
- /*
- * End of pending bit optimistic spinning and beginning of MCS
- * queuing.
- */
- queue:
- lockevent_inc(lock_slowpath);
- pv_queue:
- node = this_cpu_ptr(&qnodes[0].mcs);
- /*将每CPU的mcs_nodes[0].count作为计数器,
- 记录此CPU加入了多少spinlock等待者到等待队列*/
- idx = node->count++;
- /*通过cpu编号和idx给tail赋值,实际上是对
- 当前待入队的spinlock所在cpu和所在队列(数组下标idx)进行编码*/
- tail = encode_tail(smp_processor_id(), idx);
-
- /*
- * 4 nodes are allocated based on the assumption that there will
- * not be nested NMIs taking spinlocks. That may not be true in
- * some architectures even though the chance of needing more than
- * 4 nodes will still be extremely unlikely. When that happens,
- * we fall back to spinning on the lock directly without using
- * any MCS node. This is not the most elegant solution, but is
- * simple enough.
- */
- /*虽然不可能,但如果发生nmi嵌套拿锁,idx>=4, 那么就不用mcs机制,
- 直接等待拿锁成功为止来解决*/
- if (unlikely(idx >= MAX_NODES)) {
- lockevent_inc(lock_no_node);
- while (!queued_spin_trylock(lock))//死循环等待拿锁成功
- cpu_relax();//barrier + 短暂让出cpu
- goto release;
- }
- //获取当前cpu的qnodes[idx].mcs作为我们待入队的spinlock节点
- node = grab_mcs_node(node, idx);
-
- /*
- * Keep counts of non-zero index values:
- */
- lockevent_cond_inc(lock_use_node2 + idx - 1, idx);
-
- /*
- * Ensure that we increment the head node->count before initialising
- * the actual node. If the compiler is kind enough to reorder these
- * stores, then an IRQ could overwrite osur assignments.
- */
- barrier();
-
- //初始化待入队节点mcs_spinlock *node;
- node->locked = 0;
- node->next = NULL;//node放到链表位
- //通过node初始化pv_node
- pv_init_node(node);
-
- /*
- * We touched a (possibly) cold cacheline in the per-cpu queue node;
- * attempt the trylock once more in the hope someone let go while we
- * weren't watching.
- */
- //前面一些操作后,再次尝试拿锁,如果拿成功,需要release流程清除等锁数量
- if (queued_spin_trylock(lock))
- goto release;
-
- /*
- * Ensure that the initialisation of @node is complete before we
- * publish the updated tail via xchg_tail() and potentially link
- * @node into the waitqueue via WRITE_ONCE(prev->next, node) below.
- */
- smp_wmb();
-
- /*
- * Publish the updated tail.
- * We have already touched the queueing cacheline; don't bother with
- * pending stuff.
- *
- * p,*,* -> n,*,*
- */
- //lock->tail设置为我们生成的新的tail,old为返回的旧的tail
- old = xchg_tail(lock, tail);
- next = NULL;
-
- /*
- * if there was a previous node; link it and wait until reaching the
- * head of the waitqueue.
- */
- //如果前一个tail不为0,即队列还有其余的spinlock等待者
- if (old & _Q_TAIL_MASK) {
- //解码旧的tail -> cpu 、idx,通过cpu,idx,得到链表尾节点mcs_spinlock *prev
- prev = decode_tail(old);
-
- /* Link @node into the waitqueue. */
- //node加入到等待队列
- WRITE_ONCE(prev->next, node);
-
- pv_wait_node(node, prev);
- /*加入等待队列并且非链表头的节点在自己的CPU副本上自旋,等待
- node的locked成员被前一个等锁者置位,即通知该我们拿锁了
- */
- arch_mcs_spin_lock_contended(&node->locked);
-
- /*
- * While waiting for the MCS lock, the next pointer may have
- * been set by another lock waiter. We optimistically load
- * the next pointer & prefetch the cacheline for writing
- * to reduce latency in the upcoming MCS unlock operation.
- */
- /*到达此处时,我们自己的node->locked被其他人置位了,这是
- 通知我们现在我们是等待链表的第一个了
- */
- next = READ_ONCE(node->next);
-
- /*我们现在还是不是尾结点?不是的话提前加载下一个节点node,
- 后面需要操作下一节点的数据的,这里是优化处理
- */
- if (next)
- prefetchw(next);//预取下一个链表节点
- }
-
- /*
- * we're at the head of the waitqueue, wait for the owner & pending to
- * go away.
- *
- * *,x,y -> *,0,0
- *
- * this wait loop must use a load-acquire such that we match the
- * store-release that clears the locked bit and create lock
- * sequentiality; this is because the set_locked() function below
- * does not imply a full barrier.
- *
- * The PV pv_wait_head_or_lock function, if active, will acquire
- * the lock and return a non-zero value. So we have to skip the
- * atomic_cond_read_acquire() call. As the next PV queue head hasn't
- * been designated yet, there is no way for the locked value to become
- * _Q_SLOW_VAL. So both the set_locked() and the
- * atomic_cmpxchg_relaxed() calls will be safe.
- *
- * If PV isn't active, 0 will be returned instead.
- *
- */
- if (val = pv_wait_head_or_lock(lock, node))
- goto locked;
-
- /*到达此处时,可能是等待链表为空,因此我们就是第一个节点
- 也可能是等待node->locked被置位,这是前一个节点
- 告诉我们现在我们是等待链表第一个节点了
-
- 此时,我们自旋等待lock的pending位和locked位全部被清零
- atomic_cond_read_acquire在lock中spin等待pending位或lock->val位被清零,这里
- 显然需要pending=0、locked为0才能成立
- */
- val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));
-
- locked:
- /*
- * claim the lock:
- *
- * n,0,0 -> 0,0,1 : lock, uncontended
- * *,*,0 -> *,*,1 : lock, contended
- * 现在,lock中pending位和locked位全部被清零
- * 我们是等待队列的第一个节点,该我们持锁了
- * If the queue head is the only one in the queue (lock value == tail)
- * and nobody is pending, clear the tail code and grab the lock.
- * Otherwise, we only need to grab the lock.
- */
-
- /*
- * In the PV case we might already have _Q_LOCKED_VAL set, because
- * of lock stealing; therefore we must also allow:
- *
- * n,0,1 -> 0,0,1
- *
- * Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the
- * above wait condition, therefore any concurrent setting of
- * PENDING will make the uncontended transition fail.
- *
- *如果tail还是我们设置的,说明我们同时是等待队列的最后一个节点,
- 后面没人了,设置lock->val = 1(_Q_LOCKED_VAL),这是设置locked位,
- 同时清零tail,因为我们是最后一个,等我们持锁等待队列就为空了
- */
- if ((val & _Q_TAIL_MASK) == tail) {
- if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
- goto release; /* No contention */
- }
-
- /*
- * Either somebody is queued behind us or _Q_PENDING_VAL got set
- * which will then detect the remaining tail and queue behind us
- * ensuring we'll see a @next.
- */
-
- /* 如果tail发生了改变,说明后面有新的节点,这里先持锁 lock->locked = 1 */
- set_locked(lock);
-
- /*
- * contended path; wait for next if not observed yet, release.
- */
- /*读取下一节点
-
- if (!next)
- next = smp_cond_load_relaxed(&node->next, (VAL));
-
- /* 还记得上面我们在等待队列中等待自己node->locked被置位不?
- * 这个是谁置位的?答案就在这里,上一个节点持锁完成之后干的
- * 这里就是将下一个节点的node->locked设置为1,通知他,
- * 你是等待队列的第一个了
- */
- arch_mcs_spin_unlock_contended(&next->locked);
- pv_kick_node(lock, next);
-
- release:
- /*
- * 现在我们离开了等待队列,mcs_nodes[0].count用来表示
- 我们在多少个spinlock的等待队列里面,现在该把它-1了
- */
- /*
- * release the node
- */
- __this_cpu_dec(qnodes[0].mcs.count);
- }
- EXPORT_SYMBOL(queued_spin_lock_slowpath);
-
-
对于ARM32而言:
- static inline void arch_spin_lock(arch_spinlock_t *lock)
- {
- unsigned long tmp;
- u32 newval;
- arch_spinlock_t lockval;
-
- prefetchw(&lock->slock); //gcc 内置预取指令,指定读取到最近的缓存以加速执行
- __asm__ __volatile__(
- "1: ldrex %0, [%3]\n" // lockval = &lock->slock,并设置&lock->lock为独占访问
- " add %1, %0, %4\n" /* newval = lockval + 1 << 16,
- 16bit位于next成员,等于 lockval.tickets.next +1;*/
- " strex %2, %1, [%3]\n" /* 如果内存独占,则更新内存lock->slock = newval,并设置tmp=0,
- 并清除独占标记,否则tmp=1 */
- " teq %2, #0\n " //tmp == 0
- " bne 1b" //以上测试不成立,则跳到标号 1 处从头执行,否则继续向下执行
- : "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
- : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
- : "cc");
-
- /* 以上next已经进行了+1操作,如果锁还没有释放,即owner还没进行 +1 操作前,先判断
- lockval.tickets.next 是否等于 lockval.tickets.owner,不相等时,调用 wfe 指令进入
- idle 状态,等待 CPU event,被唤醒后继续判断锁变量是否相等;要完整地理解加锁过程,就
- 必须要提到解锁,因为这两者是相对的,解锁的实现很简单:就是将 spinlock 结构体中的 owner
- 进行 +1 操作,因此,当一个 spinlock初始化时,next 和 onwer 都为 0。某个执行流 A 获得锁,
- next + 1,此时在其它执行流 B 眼中,next != owner,所以 B 等待。当 A 调用 spin_unlock时,
- owner + 1*/
-
- while (lockval.tickets.next != lockval.tickets.owner) {
- wfe(); //执行WFE指令,让core进入low-power state,释放spinlock时被SEV指令唤醒
- lockval.tickets.owner = READ_ONCE(lock->tickets.owner);
- }
-
- smp_mb();
- }
-
- static inline int arch_spin_trylock(arch_spinlock_t *lock)
- {
- unsigned long contended, res;
- u32 slock;
-
- prefetchw(&lock->slock); //gcc 内置预取指令,指定读取到最近的缓存以加速执行
- do {
- __asm__ __volatile__(
- " ldrex %0, [%3]\n" // slock = &lock->slock,并设置&lock->slock为独占访问
- " mov %2, #0\n" //res = 0
- " subs %1, %0, %0, ror #16\n" /*16bit位于next成员,等于
- contended = lockval.tickets.next - 1,溢出借位CF=1,否则CF=0*/
- " addeq %0, %0, %4\n" /* 如果CF=1,则slock = slock + 1 << 16,否则执行
- 下下一步*/
- " strexeq %2, %0, [%3]" /*如果内存独占,更新内存&lock->slock = slock,
- 并设置res = 0, 同时清除独占标志,这里给锁next+1,此时相当于拿锁了;否则设置res=1,这种
- 情况表示别人已经标记为独占在拿锁了*/
- : "=&r" (slock), "=&r" (contended), "=&r" (res)
- : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
- : "cc");
- } while (res);
-
- if (!contended) { // contended = 0, 则快速获取锁成功,否则失败
- smp_mb();
- return 1;
- } else {
- return 0;
- }
- }