• 信号量(信号量操作 & 基于信号量实现的生产者消费者模型)


      本篇文章重点对信号量的概念,信号量的申请、初始化、释放、销毁等操作进行讲解。同时举例把信号量应用到生产者消费者模型来理解。希望本篇文章会对你有所帮助。

    目录

    一、信号量概念

    1、1 什么是信号量

    1、2 为什么要有信号量

    1、3 信号量的PV操作

    二、信号量的相关接口

    2、1 sem_t

    2、2 sem_init

    2、3 sem_wait

    2、4 sem_post

    2、5 sem_destory

    三、基于信号量的生产者消费者模型

    3、1 信号量控制环形队列

    3、1、1 空间资源和数据资源

    3、1、2 保护原理(二元信号量)

    3、2 demo代码

    3、2、1 单生产与单消费

     3、2、2 多生产多消费


    🙋‍♂️ 作者:@Ggggggtm 🙋‍♂️

    👀 专栏:Linux从入门到精通  👀

    💥 标题:信号量💥

     ❣️ 寄语:与其忙着诉苦,不如低头赶路,奋路前行,终将遇到一番好风景 ❣️

    一、信号量概念

    1、1 什么是信号量

      我们之前学了互斥锁和体哦阿健变量可以实现线程的互斥与同步。那么还有其他方法吗?信号量也可以做到!

      信号量(Semaphore)是操作系统中一种用于实现线程间同步与互斥的机制。它本质就是一个计数器,用于控制多个线程对共享资源的访问信号量可以被视为一个简单的整数变量,并且可以进行原子操作,包括等待(wait)和释放(signal)

    1、2 为什么要有信号量

      信号量(Semaphore)是一种多线程同步的机制,用于解决并发环境中的资源竞争问题。在并发编程中,多个线程可能同时访问共享资源,如果不对资源进行合理的管理,就会导致数据不一致或错误的结果。

      我们在学习互斥锁时,一个线程在操作临界资源的时候,必须临界资源是满足条件的!可是公共资源是否满足生产或者消费条件,我们无法直接得知。因为你要检测,本质也是在访问临界资源。所以只能先加锁,再检测,再操作,再解锁。只要我们对资源进行整体加锁,就默认了我们对这个资源整体使用。但是,有时候会是一份临界资源同时访问不同的区域。这时互斥锁并不能很好的满足对临界资源的充分利用。在这种情况下就可以引入信号量来很好的解决。具体如下图:

      现在我们有一个共享资源,不当做一个整体,而让不同的执行流访问不同的区域的话,那么不就可以继续并发了。

    1、3 信号量的PV操作

      这里会有一个疑问,我们怎么知道临界资源内部一共有少个区域资源呢?我们又怎么知道内部一定还有资源呢?实际上,一般都是外部就会提供有多少资源。同时,信号量一定会保证內部是否还有资源

      信号量本质是一个计数器。一个线程在申请信号量,本质就是在对信号量的 -- 操作(对剩余资源数量的减减操作)。只要拥有信号量,就在未来一定能够拥有临界资源的一部分。申请信号量的本质:对临界资源中特定小块资源的预订机制。信号量因此保证了只要你申请成功,就代表一定还有资源。如果申请失败,就会进入等待。这不就是我们在访问真正的临界资源之前,我们其实就可以提前知道临界资源的使用情况!!!就不用再进行复杂的加锁、判断、解锁等操作了

      此时发现,线程要访问临界资源中的某一区域,就得先申请信号量。所有人必须的先看到统一信号量。信号量本身必须是公共资源。

      对信号量的操作就是申请和释放操作。信号量本质就是一个计数器,也就是在对信号量进行++和-- 操作。申请资源,可以看成对sem--,同时必须保证操作的原子性,我们也称之为 P 操作。释放资源,可以看成sem++,也必须保证操作的原子性,称之为 V 操作 。信号量核心操作:PV操作

    二、信号量的相关接口

    2、1 sem_t

      sem_t 是在 POSIX 系统中用来实现信号量机制的类型。它是一个不透明的数据结构,用于控制多个进程或线程对共享资源的访问。

      sem_t 提供了三个主要的函数接口:

    1. sem_init:用于初始化一个信号量。该函数接受三个参数,分别是指向 sem_t 对象的指针、信号量的共享标志和初始值。共享标志指定信号量的共享方式,根据具体需求可以选择在进程间共享(设置为0)或者在同一进程内的线程间共享(设置为非0)。初始值表示信号量的初始计数值。

    2. sem_wait:该函数使调用线程等待信号量。如果信号量的计数值大于0,则将计数值减一,并立即返回。如果计数值为0,则线程将阻塞,直到信号量的计数值大于0。

    3. sem_post:该函数用于释放信号量。它将信号量的计数值加一,并唤醒因等待该信号量而阻塞的线程。

    4. sem_destroy:该函数是用于销毁一个已经初始化的信号量的函数,在使用完信号量后,通过调用该函数可以释放相关资源。

      下面我们来看这几个函数的详细解释。

    2、2 sem_init

      sem_init函数是用于初始化一个信号量的函数,它在程序中创建一个新的信号量,并为其分配必要的资源。函数原型如下:

    int sem_init(sem_t *sem, int pshared, unsigned int value);

    参数: sem_init函数有三个参数:

    • sem:一个指向sem_t类型的指针,用于存储初始化后的信号量对象。
    • pshared:表示信号量的共享方式。
      • 如果pshared的值为0,表示信号量只能在调用它的进程内的线程之间共享。
      • 如果pshared的值为非零,表示信号量可以在多个进程之间共享。
    • value:表示信号量的初始值(初始临界资源內部有多少个小块资源)。

    返回值: sem_init函数的返回值是一个整数,用于表示函数调用是否成功。

    • 如果返回值为0,表示函数调用成功。
    • 如果返回值为-1,表示函数调用失败,此时可以通过查看全局变量errno获取错误码。

      以下是一个示例代码,演示了如何使用sem_init函数来初始化一个信号量:

    1. #include
    2. #include
    3. int main() {
    4. sem_t mySem;
    5. // 初始化一个非共享的信号量,初始值为1
    6. int ret = sem_init(&mySem, 0, 1);
    7. if (ret == -1) {
    8. perror("Failed to initialize semaphore");
    9. return 1;
    10. }
    11. // 进行其他操作...
    12. // 销毁信号量
    13. sem_destroy(&mySem);
    14. return 0;
    15. }

    2、3 sem_wait

      sem_wait函数用于对信号量进行等待操作,同时会将信号量减1。以实现对临界资源的互斥访问。函数原型如下:

    int sem_wait(sem_t *sem);

    参数: sem_wait函数只有一个参数:

    • sem:一个指向已经初始化的信号量对象。

    返回值: sem_wait函数的返回值是一个整数,用于表示函数调用是否成功。

    • 如果返回值为0,表示函数调用成功,信号量的值被成功减一。
    • 如果返回值为-1,表示函数调用失败,此时可以通过查看全局变量errno获取错误码。常见的错误码包括EINTR(被信号中断)和EDEADLK(死锁)等。

    2、4 sem_post

      sem_post函数用于对信号量进行发布操作,以增加信号量的值(对信号量加1)。它通常与sem_wait函数一起使用,用于在对共享资源的访问结束后释放信号量,以便其他线程可以获取到该资源。函数原型如下:

    int sem_post(sem_t *sem);

    参数: sem_post函数只有一个参数:

    • sem:一个指向已经初始化的信号量对象。

    返回值: sem_post函数的返回值是一个整数,用于表示函数调用是否成功。

    • 如果返回值为0,表示函数调用成功,信号量的值被成功增加。
    • 如果返回值为-1,表示函数调用失败,此时可以通过查看全局变量errno获取错误码。常见的错误码包括EINVAL(信号量未初始化)和EOVERFLOW(信号量值达到上限)等。

      需要注意的是,sem_post函数并不处理过度发布的情况,即如果信号量的值已经达到了其上限,再调用sem_post函数也无法将其继续增加。因此,在使用信号量时,必须正确地控制信号量的值以避免出现竞态条件或死锁等问题。

    2、5 sem_destory

      sem_destory函数用于销毁一个已经初始化的信号量对象,并释放相关的资源。当不再需要使用信号量时,应该调用sem_destroy函数进行清理操作。函数原型如下:

    int sem_destroy(sem_t *sem);

    参数: sem_destroy函数只有一个参数:

    • sem:一个指向已经初始化的信号量对象。

    返回值: sem_destroy函数的返回值是一个整数,用于表示函数调用是否成功。

    • 如果返回值为0,表示函数调用成功,信号量对象被成功销毁并释放了相关的资源。
    • 如果返回值为-1,表示函数调用失败,此时可以通过查看全局变量errno获取错误码。常见的错误码包括EINVAL(信号量未初始化)和EBUSY(仍有线程在等待该信号量)等。

    三、基于信号量的生产者消费者模型

    3、1 信号量控制环形队列

      我们之前学习了​ 生产者消费者问题(条件变量 & 互斥锁)。之前学习的时由阻塞队列来实现的。通过互斥锁与条件变量很好的维护了生产者与消费者之前的同步与互斥关系。那么我么那接下来看看用信号量来维护生产者和消费者之间的同步与互斥关系的环形队列。

    3、1、1 空间资源和数据资源

      我们先看下图:

      生产者就是要生产数据放进环形队列中去。那么生产者所需要的就是申请环形队列空间资源。这个空间的大小我们可以自定义,比如环形队列由10个空间资源

      消费者就是去环形队列拿数据。消费者所需要的就是申请数据资源。也就是看环形队列中是否还有数据资源。 

    3、1、2 保护原理(二元信号量)

      我们发现:生产和消费在队列为空的时候或者满的时,可能访问同一个位置。那我们必须保证生产者生产的数据个数最多不能超过环形队列的容量,其次消费者在空的时候不能再拿数据

      那我们就可以用两个信号量来很好的维护这两个角色的需求。生产者对应空间资源信号量,消费者对应数据资源信号量。当申请对应的信号量资源失败时,也会进入阻塞式等待。直到有信号量资源才会继续执行。

      当只有单生产单消费时,我们只需要维护好生产与消费的同步与互斥关系。多生产与多消费时,还需维护生产与生产、消费与消费的互斥关系。下面我们直接看代码。

    3、2 demo代码

    3、2、1 单生产与单消费

    ringQueue.hpp

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. using namespace std;
    10. #include"Task.hpp"
    11. #include"LogTest.hpp"
    12. static const int g_cap=5;
    13. template<class T>
    14. class RingQueue
    15. {
    16. private:
    17. void P(sem_t& sem)
    18. {
    19. int n = sem_wait(&sem);
    20. assert(n == 0);
    21. (void)n;
    22. }
    23. void V(sem_t& sem)
    24. {
    25. int n = sem_post(&sem);
    26. assert(n == 0);
    27. (void)n;
    28. }
    29. public:
    30. RingQueue(int cap = g_cap)
    31. :_cap(cap)
    32. {
    33. int n = sem_init(&_spaceSem, 0, _cap);
    34. assert(n == 0);
    35. n = sem_init(&_dataSem, 0, 0);
    36. assert(n == 0);
    37. _queue.resize(_cap);
    38. _productorStep = _consumerStep = 0;
    39. }
    40. // 生产者
    41. void push(const T& in)
    42. {
    43. P(_spaceSem);
    44. _queue[_productorStep++] = in;
    45. _productorStep %= _cap;
    46. V(_dataSem);
    47. }
    48. // 消费者
    49. void pop(T* out)
    50. {
    51. P(_dataSem);
    52. *out = _queue[_consumerStep++];
    53. _consumerStep %= _cap;
    54. V(_spaceSem);
    55. }
    56. ~RingQueue()
    57. {
    58. sem_destroy(&_spaceSem);
    59. sem_destroy(&_dataSem);
    60. }
    61. private:
    62. sem_t _spaceSem; // 生产者——空间资源
    63. sem_t _dataSem; // 消费者——数据资源
    64. vector _queue;
    65. int _cap;
    66. int _productorStep;
    67. int _consumerStep;
    68. };

    testMain.cpp

    1. #include "ringQueue.hpp"
    2. int myAdd(int x, int y)
    3. {
    4. return x + y;
    5. }
    6. void* ProductorRoutine(void *arg)
    7. {
    8. RingQueue *rq = (RingQueue*) arg;
    9. while(true)
    10. {
    11. int x = rand() % 10 +1;
    12. int y = rand() % 100 + 1;
    13. Task t(x, y, myAdd);
    14. rq->push(t);
    15. LogMessage(1,"%s:%d + %d = ?","生产者申请了一个空间", x, y);
    16. //sleep(1);
    17. }
    18. }
    19. void* ConsumerRoutine(void* arg)
    20. {
    21. RingQueue *rq = (RingQueue*)arg;
    22. while(true)
    23. {
    24. Task t;
    25. rq->pop(&t);
    26. LogMessage(1,"%s:%d + %d = %d","消费者消费了一个数据", t.x_, t.y_, t());
    27. sleep(1);
    28. }
    29. }
    30. int main()
    31. {
    32. srand((unsigned int)time(nullptr) ^ 0x666888);
    33. RingQueue *rq = new RingQueue();
    34. pthread_t c, p;
    35. pthread_create(&p, nullptr, ProductorRoutine, rq);
    36. pthread_create(&c, nullptr, ConsumerRoutine, rq);
    37. pthread_join(p, nullptr);
    38. pthread_join(c, nullptr);
    39. delete rq;
    40. return 0;
    41. }

      通过上述代码我们发现:信号量有点类似于互斥锁+条件变量的结合,不还是在竞争资源串行访问吗?实际上刚开始我们并不知道是先生产,还是先消费。如果先消费,则需等待。如果满的情况下,生产也需要等待。其他情况下大部分时间都是在并发执行的生产和消费可同时进行。

     3、2、2 多生产多消费

      当多生产和多消费时,我们还需维护生产与生产、消费与消费的互斥关系。这时候就需要互斥锁来维护了。代码如下:

    ringQueue.hpp

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. using namespace std;
    11. #include"Task.hpp"
    12. #include"LogTest.hpp"
    13. static const int g_cap=5;
    14. template<class T>
    15. class RingQueue
    16. {
    17. private:
    18. void P(sem_t& sem)
    19. {
    20. int n = sem_wait(&sem);
    21. assert(n == 0);
    22. (void)n;
    23. }
    24. void V(sem_t& sem)
    25. {
    26. int n = sem_post(&sem);
    27. assert(n == 0);
    28. (void)n;
    29. }
    30. public:
    31. RingQueue(int cap = g_cap)
    32. :_cap(cap)
    33. {
    34. int n = sem_init(&_spaceSem, 0, _cap);
    35. assert(n == 0);
    36. n = sem_init(&_dataSem, 0, 0);
    37. assert(n == 0);
    38. _queue.resize(_cap);
    39. _productorStep = _consumerStep = 0;
    40. }
    41. // 生产者
    42. void push(const T& in)
    43. {
    44. P(_spaceSem);
    45. pthread_mutex_lock(&_pmutex);
    46. _queue[_productorStep++] = in;
    47. _productorStep %= _cap;
    48. pthread_mutex_unlock(&_pmutex);
    49. V(_dataSem);
    50. }
    51. // 消费者
    52. void pop(T* out)
    53. {
    54. P(_dataSem);
    55. pthread_mutex_lock(&_cmutex);
    56. *out = _queue[_consumerStep++];
    57. _consumerStep %= _cap;
    58. pthread_mutex_unlock(&_cmutex);
    59. V(_spaceSem);
    60. }
    61. ~RingQueue()
    62. {
    63. sem_destroy(&_spaceSem);
    64. sem_destroy(&_dataSem);
    65. }
    66. private:
    67. sem_t _spaceSem; // 生产者——空间资源
    68. sem_t _dataSem; // 消费者——数据资源
    69. vector _queue;
    70. int _cap;
    71. int _productorStep;
    72. int _consumerStep;
    73. pthread_mutex_t _pmutex;
    74. pthread_mutex_t _cmutex;
    75. };

    testMain.cpp

    1. #include "ringQueue.hpp"
    2. int myAdd(int x, int y)
    3. {
    4. return x + y;
    5. }
    6. void* ProductorRoutine(void *arg)
    7. {
    8. RingQueue *rq = (RingQueue*) arg;
    9. while(true)
    10. {
    11. int x = rand() % 10 +1;
    12. int y = rand() % 100 + 1;
    13. Task t(x, y, myAdd);
    14. rq->push(t);
    15. LogMessage(1,"%s:%d + %d = ?","生产者申请了一个空间", x, y);
    16. //sleep(1);
    17. }
    18. }
    19. void* ConsumerRoutine(void* arg)
    20. {
    21. RingQueue *rq = (RingQueue*)arg;
    22. while(true)
    23. {
    24. Task t;
    25. rq->pop(&t);
    26. LogMessage(1,"%s:%d + %d = %d","消费者消费了一个数据", t.x_, t.y_, t());
    27. sleep(1);
    28. }
    29. }
    30. int main()
    31. {
    32. srand((unsigned int)time(nullptr) ^ 0x666888);
    33. RingQueue *rq = new RingQueue();
    34. pthread_t p[4], c[8];
    35. for(int i = 0; i < 4; i++) pthread_create(p+i, nullptr, ProductorRoutine, rq);
    36. for(int i = 0; i < 8; i++) pthread_create(c+i, nullptr, ConsumerRoutine, rq);
    37. for(int i = 0; i < 4; i++) pthread_join(p[i], nullptr);
    38. for(int i = 0; i < 8; i++) pthread_join(c[i], nullptr);
    39. delete rq;
    40. return 0;
    41. }

      这里又有一个小细节:我们在插入或者删除时,是先加锁再申请信号量呢,还是先申请信号量再加锁呢? 首先申请信号量的操作时原子的,这个问题不用担心。关键在于插入和删除的过程。其实先加锁再申请信号量是肯定可行的,就是先申请信号量再加锁可以吗?答案是可以的!申请信号量的本质就是在对资源的预定。只要你申请信号量成功,就一定有资源可用。当我们先把信号量申请完,就是把资源先分配给了不同线程。反而会更好一点。后续各个线程不用竞争信号资源了,只需竞争锁资源就可以了。

  • 相关阅读:
    有效备考浙大MEM的五个好习惯建议
    实时数据处理:使用Apache Spark进行流数据分析
    [项目管理-5]:软硬件项目管理 - 项目人力资源管理 (人)
    Fluent Meshing的workflow方法
    【操作系统】聊聊文件系统是如何工作的
    MS4553S双向电平转换器可pin对pin兼容TXB0102/TXS0102
    Java语法基础案例
    js如何区分数组和对象
    华纳云:怎么判断VPS的ip是不是公网ip
    前端bootstrap+fileInput+django后端是用ajax方式上传form表单数据及多个文件保存到数据库
  • 原文地址:https://blog.csdn.net/weixin_67596609/article/details/133149353