• 【精通内核】Linux内核并发控制原理信号量与P-V原语源码解析


     

    前言

    📫作者简介:小明java问道之路,专注于研究计算机底层/Java/Liunx 内核,就职于大型金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的架构设计📫 

    🏆 CSDN专家博主/Java优质创作者/CSDN内容合伙人、InfoQ签约作者、阿里云签约专家博主、华为云专家、51CTO专家/TOP红人 🏆

    🔥如果此文还不错的话,还请👍关注、点赞、收藏三连支持👍一下博主~

    目录

    本文导读

    一、Linux内核信号量

    1、atomic_t 结构体

    2、信号量的初始化的内核源码

    二、Linux内核P-V原语详解

    1、获取信号量 P 操作原理解析

    2、释放信号量 V 操作原理解析

    三、Linux内核互斥量代码详解

    总结


    本文导读

    本文深入Linux内核源码,从核心源码入口讲起,详细对信号量、互斥量的内核代码讲解,其中对P-V操作实现逐行剖析,Linux内核并发控制原理的锁实现和原理在后续文章中一一讲解,本文深入浅出Linux中断控制的实现原理。

    一、Linux内核信号量

    信号量原理是什么,多线程的时候,我们需要一个变量来表示信号量状态,同时需要一个队列,当线程无法获取信号量时,需要进行阻塞。

    1、atomic_t 结构体

    atomict 为原子性变量、_wait_queue_head 为等待队列头部、wait_queue 为等待任务的表节点、semaphore 为信号量结构体。我们看到原子性整型变量声明 ,其中通过 counter 变量来描述信号量的个数,然而这里为了避免编译器优化用 volatile 来修饰。

    1. typedef struct {
    2. volatile int counter;
    3. } atomic_t; // 等待队列头部,通过自旋锁来保证操作队列线程安全
    4. struct_waitqueue_head {
    5. spinlock_t lock; // 保护阻塞队列的自旋锁
    6. struct list_head task_list; // 任务阻塞队列
    7. };
    8. struct__wait_queue { // 等待任务节点
    9. unsigned int flags; // 任务结构体,这里只需要知道它就是进程控制块 PCB 代表了进程即可
    10. struct task_struct * task;
    11. wait_queue_func_t func; // 等待函数指针
    12. struct list head task list; // 所处链表的结构体
    13. };
    14. struct semaphore { // 信号量结构体
    15. atomic_t count; // 信号量计数
    16. int sleepers; // 等待任务数
    17. wait_queue_head_t wait; // 等待队列
    18. };

    接下来我们来看看信号量的初始化的内核源码。

    2、信号量的初始化的内核源码

    首先初始化信号量,原子性设置信号量的初始值,就是将(count)->counter=val,初始化 sleeper 为0,最后初始化等待链表。

    1. static inline void sema_init(struct semaphore*sem, int val) { // 初始化信号量
    2. atomic_set(&sem->count, val); // 原子性设置信号量的初始值,就是将(count)->counter=val
    3. sem -> sleepers = 0; // 初始化 sleeper 为0
    4. init_waitqueue_head( &sem -> wait); // 初始化等待链表
    5. }
    6. // 初始化操作
    7. static inline void init_waitqueue_head(wait_queue_head_t*q){
    8. //自旋锁状态初始为spinlock t结构体
    9. q->lock = SPIN LOCK UNLOCKED;
    10. //初始化等待链表,头尾相接:(ptr)-> next=(ptr); (ptr)->prev=(ptr);
    11. INIT_LIST_HEAD(&q->task_list);
    12. }

    二、Linux内核P-V原语详解

    1、获取信号量 P 操作原理解析

    获取信号量 P 操作,通过内联汇编原子性对 counte 操作。首先通过 decl 同时根据是否是多处理器加 lock 前缀,保证了单条指令的原子性,然后根据递减后的值是否为负数来判断获取信号量是否成功,如果失败,那么需要将线程进行睡眠,此时调用 _down_failed 函数完成此操作。

    具体实现原理如下。

    1. static inline void down(struct semaphore*sem) {
    2. _asm__volatile_( // 通过 lock 前缀实现原子性的-sem->count操作,decl指令相当于对操作数自减
    3. LOCK "decl %O" // 如果减完后发现sign标志位为1,则表明count值为负,往前跳到标号2处,调用 __down_failed处理,否则获取成功,直接退出
    4. "js 2f"
    5. "1: "
    6. LOCK_SECTION_START("")
    7. "2:call__down_failed"
    8. "jmp 1b"
    9. // 这里采用了 LOCK SECTION STARTLOCK SECTION END 宏定义,将call
    10. // __down_failed 和 jmp 1b的汇编代码放到.textlock段中
    11. // 所以如果执行完 __down_failed 方法后调用jmp 1b
    12. // 会回到 LOCK SECTION START之前的段中,即退出down方法
    13. LOCK SECTION END:
    14. : "=m" (sem -> count)
    15. :"c" (sem)
    16. :"memory");
    17. }
    18. // 通过汇编声明了 __down_failed的代码地址
    19. asm(
    20. ".text"
    21. ".align 4" // 4字节对齐
    22. ".globl___down_failed"
    23. "__down failed:"
    24. #if defined(CONFIG_FRAME_POINTER) // 如果定义了栈帧指针,那么开辟新的方法帧
    25. "pushl %ebp"
    26. "movl %esp, %ebp"
    27. #endif
    28. // 保存影响的寄存器值,因为随后要调用_down 函数, 可能会影响 eax、edx、ecx 寄存器,
    29. // 所以这里需要先对其进行保存,在方法返回后再还原
    30. "pushl %eax"
    31. "pushl %edx"
    32. "pushl %ecx"
    33. "call __down" // 调用 __down来执行当counter为0时的操作
    34. "popl %ecx" // 调用返回后恢复保存的寄存器
    35. "popl %edx"
    36. "popl %eax"
    37. #if defined(CONFIG_FRAME_POINTER) //还原方法帧
    38. "movl %ebp,%esp"
    39. "popl %ebp"
    40. #endif
    41. "ret"
    42. );

    我们最终是调用函数 __ down 来执行最终的 __down_failed 操作:

    下面是void__dow 函数源码,通过current 宏获取当前任务结构体,获取到了任务PCB,初始化wait_queuet,也就是等待线程代表,宏定义为;wait_queue_t name = {.task = tsk, .func = defauft_wake_function, .tasklist = {NULL, NULL}}

    设置任务状态为TASK_UNINTERRUPTIBLE,表明不可中断的阻塞,获取自旋锁,将等待任务节点插入等待链表的队尾处,增加等待计数,循环等待释放信号量,对等待线程减1后与当前信号量的counter值相加

    如果结果等于0则结束循环,这里等于0的条件就是等待信号量足够容纳更多的线程,所以不需要阻塞,设置等待任务数为1,释放自旋锁,唤醒调度器执行其他任务,当前任务就被阻塞在了等待队列里

    当任务重新被唤醒时,将重新获取自旋锁,重新设置任务状,唤醒等待任务,释放自旋锁,设置当前任务状态为TASK_RUNNING。

    以下代码我们可以看到,使用了自旋锁、P-V操作,并增加了阻塞队列实现信号量。如果读者对Linux进程调度原理不清楚,这里面方法 schedule ,其作用就是朱勇释放 CPU 的控制权,交给调度程序,然后由调度程序切换到其他进程执行,直到信号量释放后,再由其他进程将其状态设置为 RUNNABLE 后,交由调度进程重新调度执行。

    1. void__down(struct semaphore *sem) {
    2. // 通过current 宏获取当前任务结构体,获取到了任务PCB
    3. struct task_struct *tsk = current;
    4. // 初始化wait_queue t,也就是等待线程代表
    5. // 宏定义为;wait_queue_t name = {.task = tsk, .func = defauft_wake_function, .tasklist = {NULL, NULL}}
    6. DECLARE_WAITQUEUE(wait, tsk);
    7. unsigned long flags;
    8. tsk->state = TASK UNINTERRUPTIBLE; // 设置任务状态为TASK_UNINTERRUPTIBLE,表明不可中断的阻塞
    9. spin_lock_irqsave(&sem -> waitlock, flags); // 获取自旋锁
    10. add_wait_queue_exclusive_locked(&sem -> wait, &wait); // 将等待任务节点插入等待链表的队尾处
    11. sem -> sleepers++; // 增加等待计数
    12. for (; ; ) { // 循环等待释放信号量
    13. int sleepers = sem -> sleepers;
    14. // 对等待线程减1后与当前信号量的counter值相加
    15. // 如果结果等于0则结束循环,这里等于0的条件就是等待信号量足够容纳更多的线程,所以不需要阻塞
    16. if (!atomic_add_negative(sleepers - 1, & sem -> count)){
    17. sem -> sleepers = 0;
    18. break;
    19. }
    20. sem -> sleepers = 1; // 设置等待任务数为1
    21. spin_unlock_irqrestore( & sem -> wait.lock, flags); // 释放自旋锁
    22. // 唤醒调度器执行其他任务,当前任务就被阻塞在了等待队列里
    23. schedule();
    24. spin_lock_irqsave( & sem -> wait.lock flags); // 当任务重新被唤醒时,将重新获取自旋锁
    25. tsk -> state = TASK UNINTERRUPTIBLE; // 重新设置任务状态为不可中断状态,继续循环
    26. }
    27. // 至此任务已经获取了信号量,等待线程从队列中移出来
    28. remove_wait_queue_locked( & sem -> wait, &wait);
    29. wake_up_locked( & sem -> wait); // 唤醒等待任务
    30. spin_unlock_irqrestore( & sem -> wait.lock, flags); // 释放自旋锁
    31. tsk->state = TASK_RUNNING; // 设置当前任务状态为TASK_RUNNING
    32. }

    2、释放信号量 V 操作原理解析

    唤醒信号量通过对 semaphore 中的 counter 进行加1,同样通过 LOCK 前缀保证指令的原子性,根据返回值是否小于或等于0来判断是否有线程在等待,如果有线程等待,那么调用_up_wakeup 函数唤醒等待信号量的线程。

    1. static inline void up(struct semaphore*sem) {
    2. _asm___volatile_(
    3. LOCK "incl %O" // 原子性实现++sem->count
    4. "jle 2f" // 如果小于或等于0,则跳到2标志处,调用__up_wakeup函数唤醒等待任务
    5. "1:"
    6. LOCK SECTION_START ("")
    7. "2:call__up_wakeup"
    8. "jmp 1b"
    9. LOCK_SECTION_END
    10. ".subsection 0"
    11. :"=m” (sem->count)"
    12. :"c" (sem)"
    13. :" memory ");
    14. }
    15. // 汇编代码保存影响的寄存器,然后调用_up函数唤醒任务
    16. asm(
    17. ".text"
    18. ".align 4"
    19. ".globl___up_wakeup"
    20. "__up_wakeup: "
    21. "pushl %eax"
    22. "pushl %edx"
    23. "pushl %ecx"
    24. "call_ up" // 调用__up 函数
    25. "popl %ecx"
    26. "popl %edx"
    27. "popl %eax"
    28. "ret");

    下面看下__up唤醒的源码实现,__up直接调用wake_up,通过唤醒宏定义,调用唤醒函数实现__wake_up,获取自旋锁,调用__wake_up_common函数唤醒任务,注意这里传入的是sync为0,释放自旋锁的过程

    1. void __up(struct semaphore *sem) {
    2. wake_up( & sem -> wait); // 直接调用wake_up
    3. }
    4. //唤醒宏定义
    5. # define wake_up(x) __wake_up((x),TASK_UNINTERRUPTIBLE |TASK_INTERRUPTIBLE,1)
    6. // 唤醒函数实现
    7. void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive) {
    8. unsigned long flags;
    9. spin_lock_irqsave(&q->lock, flags); // 获取自旋锁
    10. // 调用__wake_up_common函数唤醒任务,注意这里传入的是sync0
    11. _wake_up_common(q, mode, nr_exclusive, 0);
    12. spin_unlock_irqrestore(&q->lock, flags); // 释放自旋锁
    13. }
    14. // 唤醒操作_wake_up_common 函数的实现原理。
    15. static void _wake_up_common(wait_queue_head_*qunsigned int mode, int r_exclusive, int sync) {
    16. struct list_head *tmp,*next;
    17. list_for_each_safe(tmp, next, & q -> task_list){ // 遍历等待列表
    18. wait_queue_t * curr;
    19. unsigned flags;
    20. //获取当前任务节点wait_queue_I
    21. curr = list_entry(tmp, wait_queue_, task_list);
    22. flags = curr -> flags//获取当前等待标志位
    23. // 调用唤醒函数
    24. // 如果成功唤醒任务、 当前任务标志位 WQ_FLAG_EXCLUSIVE、
    25. // nr_exclusive 互斥数量,自减为0,那么退出循环
    26. if (curr -> func(curr, mode, sync) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
    27. break;
    28. }
    29. }
    30. // 默认唤醒函数 default wake function。其实现过程如下
    31. int default_wake_function(walt_queue_curr, unsigned mode, int sync) {
    32. task_t * p = curr -> task; // 获取当前任务
    33. return try_to_wake_up(p, mode, sync); // 调用try_to_wake_up函数唤醒
    34. }

    我们可以看到是通过调用try_to_wake_up函数唤醒,其中涉及Linux调度器,且该方法涉及大量实现,这里我们给出源码的相关原理

    首先获取到当前CPU的执行队列 runqueue,并且关闭中断,保存当前状态信息,如果当前状态信息和传入状态不为0并且如果当前任务不属于任何优先级队列,执行

    任务重调度实现:如果是对称多处理器结构,那么需要通过跨CPU 调用,触发目标CPU调度器的调度工作。

    信号量代码逻辑较为简单,对于 P-V 操作,我们通过原子性对 counter 变量操作,然后其放入信号量的等待队列中,或者将其从等待队列中取出。

    由于是多线程操作阻塞队列,因此需要把自旋锁来保护阻塞队列。接着判断任务是否处于任务就绪队列 runqueue 中,如果在队列中,则设置标志为 TASKRUNNING 状态,否将入 runqueue 中。这里 runqueue 是每个 CPU 都拥有的任务就绪调度队列。

    三、Linux内核互斥量代码详解

    互斥量就是特殊版本的信号量,即 count 为1时的特殊信号量,互斥量就是特殊的信号量

    1. // 将struct semaphor 类型定义为 mutex_t
    2. typedef struct semaphore mutex_t;
    3. // mutex_init 其实传入的type和name都没用,直接是通过初始化信号量为1来代替
    4. #define mutex_init(lock, type, name) sema_init(lock,1)
    5. // mutex_destroy则为初始化信号量-99
    6. #define mutex_destroy(lock) sema_init(lock,-99)
    7. // 上锁调用的是信号量的down操作
    8. #define mutex_lock(lock, num) down(lock)
    9. // trylock 调用的是down_trylock,这里不再讲解。这里的trylock 就是非阻塞的lock,获取到锁,返回0,否则返回1
    10. #define mutex_trylock(lock) (down_trylock(lock)?0:1)
    11. //解锁直接调用up操作
    12. #define mutex_unlock(lock) up(lock)

    总结

    本文深入Linux内核源码,从核心源码入口讲起,详细对信号量、互斥量的内核代码讲解,其中对P-V操作实现逐行剖析,Linux内核并发控制原理的锁实现和原理在后续文章中一一讲解,本文深入浅出Linux中断控制的实现原理。

  • 相关阅读:
    光伏拉晶厂RFID智能化生产工序管理
    【无标题】
    Python通过selenium调用IE11浏览器报错解决方法
    Node.js | express 框架开篇
    3. Python 数据容器(列表、元组、字符串、集合、字典)
    89.(前端)商品分类显示优化——关闭选择框、下拉框、修改间隔、显示层级为tag标签、添加操作栏、添加索引
    【BA无标度网络(无限规模)的传播阈值推导过程】
    带你玩转Redis 的String 数据类型
    手写一套简单的dubbo(含注册中心)之编程思想
    linux中awk命令有何作用?
  • 原文地址:https://blog.csdn.net/FMC_WBL/article/details/126925259