• Linux信号量


    POSIX信号量

    信号量的原理

    • 我们将可能会被多个执行流同时访问的资源叫做临界资源,临界资源需要进行保护否则会出现数据不一致等问题。
    • 当我们仅用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。
    • 但实际我们可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。

    信号量的概念

    信号量(信号灯)本质是一个计数器,是描述临界资源中资源数目的计数器,信号量能够更细粒度的对临界资源进行管理。

    每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特点的临界资源的权限,当操作完毕后就应该释放信号量。

    信号量的PV操作:

    • P操作:我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减1,因此P操作的本质就是让计数器减1。
    • V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加1,因此V操作的本质就是让计数器加1。

    注意:PV操作必须是原子性的,因为在多线程情况下会存在多个执行流去竞争一个信号量的情况,所以信号量也是一个临界资源,而信号量的本质是为了保护临界资源的,我们就不可能让信号量去保护信号量了,所以信号量的PV操作必须是原子操作。

    当执行流在申请信号量时,可能此时信号量的值为0,也就是说信号量描述的临界资源已经全部被申请了,此时该执行流就应该在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。也就是说,信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。

    信号量函数

    初始化信号量

    int sem_init(sem_t *sem, int pshared, unsigned int value);
    
    
    • 1
    • 2

    参数说明:

    • sem:需要初始化的信号量。
    • pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
    • value:信号量的初始值(计数器的初始值)。

    返回值说明:

    • 初始化信号量成功返回0,失败返回-1。

    注意: POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX信号量可以用于线程间同步。

    销毁信号量

    销毁信号量的函数叫做sem_destroy,该函数的函数原型如下:

    int sem_destroy(sem_t *sem);
    
    
    • 1
    • 2

    参数说明:

    • sem:需要销毁的信号量。

    返回值说明:

    • 销毁信号量成功返回0,失败返回-1。

    等待信号量(申请信号量)

    等待信号量的函数叫做sem_wait,该函数的函数原型如下:

    int sem_wait(sem_t *sem);
    
    
    • 1
    • 2

    参数说明:

    • sem:需要等待的信号量。

    返回值说明:

    • 等待信号量成功返回0,信号量的值减一。
    • 等待信号量失败返回-1,信号量的值保持不变。

    发布信号量(释放信号量)

    发布信号量的函数叫做sem_post,该函数的函数原型如下:

    int sem_post(sem_t *sem);
    
    
    • 1
    • 2

    参数说明:

    • sem:需要发布的信号量。

    返回值说明:

    • 发布信号量成功返回0,信号量的值加一。
    • 发布信号量失败返回-1,信号量的值保持不变。

    基于环形队列的生产者消费者模型

    在这里插入图片描述
    我们先来了解一下环形队列:

    1. 环形队列采用数组模拟,用模运算来模拟环状特性;
    2. 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
      在这里插入图片描述

    空间资源和数据资源

    在生产者与消费者模型中,生产者需要生产资源,所以需要的是空间资源,消费者需要消费资源,所以需要的是数据资源。

    对于生产者和消费者来说,它们关注的资源是不同的:

    • 生产者关注的是环形队列当中是否有空间(space),只要有空间生产者就可以进行生产。
    • 消费者关注的是环形队列当中是否有数据(data),只要有数据消费者就可以进行消费。
      在这里插入图片描述

    现在我们用信号量来描述环形队列当中的空间资源(space_sem)和数据资源(data_sem),在我们初始信号量时给它们设置的初始值是不同的:

    • space_sem的初始值我们应该设置为环形队列的容量,因为刚开始时环形队列当中全是空间。
    • data_sem的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。

    生产者和消费者遵守规则

    环形队列的生产者和消费者模型当中,生产者和消费者必须遵守如下两个规则:

    1.生产者和消费者不能对同一个位置进行访问。

    • 如果生产者和消费者指向了同一个位置,此时无非就是为空或者为满的情况,那么此时生产者和消费者就相当于同时对这一块临界资源进行了访问,这当然是不允许的,此时他们应该保持互斥关系。
    • 而如果生产者和消费者访问的是环形队列当中的不同位置,那么此时生产者和消费者是可以同时进行生产和消费的,就不会出现数据不一致等问题,保持并发关系。

    在这里插入图片描述

    2.无论是生产者还是消费者,都不应该将对方套一个圈以上。

    • 生产者从消费者的位置开始一直按顺时针方向进行生产,如果生产者生产的速度比消费者消费的速度快,那么当生产者绕着消费者生产了一圈数据后再次遇到消费者,此时生产者就不应该再继续生产了,因为再生产就会覆盖还未被消费者消费的数据。
    • 同理,消费者从生产者的位置开始一直按顺时针方向进行消费,如果消费者消费的速度比生产者生产的速度快,那么当消费者绕着生产者消费了一圈数据后再次遇到生产者,此时消费者就不应该再继续消费了,因为再消费就会消费到缓冲区中保存的废弃数据。

    在这里插入图片描述

    生产者和消费者申请和释放资源

    对于生产者来说,每一次操作生产者需要申请空间资源,释放数据资源。

    生产者每次生产数据前都需要先申请space_sem:

    • 如果space_sem的值不为0,则信号量申请成功,此时生产者可以进行生产操作。
    • 如果space_sem的值为0,则信号量申请失败,此时生产者需要在space_sem的等待队列下进行阻塞等待,直到环形队列当中有新的空间后再被唤醒。

    当生产者生产完数据后,应该释放data_sem:

    • 虽然生产者在进行生产前是对space_sem进行的P操作,但是当生产者生产完数据,应该对data_sem进行V操作;
    • 生产者在生产数据前申请到的是space位置,当生产者生产完数据后,该位置当中存储的是生产者生产的数据,在该数据被消费者消费之前,该位置不再是space位置,而应该是data位置。当生产者生产完数据后,意味着环形队列当中多了一个data位置,因此我们应该对data_sem进行V操作。

    消费者每一次操作需要申请数据资源,释放空间资源。

    对于消费者来说,消费者每次消费数据前都需要先申请data_sem:

    • 如果data_sem的值不为0,则信号量申请成功,此时消费者可以进行消费操作。
    • 如果data_sem的值为0,则信号量申请失败,此时消费者需要在data_sem的等待队列下进行阻塞等待,直到环形队列当中有新的数据后再被唤醒。

    当消费者消费完数据后,应该释放space_sem:

    • 虽然消费者在进行消费前是对data_sem进行的P操作,但是当消费者消费完数据,应该对space_sem进行V操作。
    • 消费者在消费数据前申请到的是data位置,当消费者消费完数据后,该位置当中的数据已经被消费过了,再次被消费就没有意义了,为了让生产者后续可以在该位置生产新的数据,我们应该将该位置算作space位置,而不是data位置。当消费者消费完数据后,意味着环形队列当中多了一个space位置,因此我们应该对space_sem进行V操作。

    代码实现

    首先,我们可以对信号量函数进行包装,方便调用:

    #ifndef _SEM_HPP_
    #define _SEM_HPP_
    
    #include 
    #include 
    
    class Sem
    {
    public:
        Sem(int value)
        {
            sem_init(&sem_, 0, value);
        }
        void p()
        {
            sem_wait(&sem_);
        }
        void v()
        {
            sem_post(&sem_);
        }
        ~Sem()
        {
            sem_destroy(&sem_);
        }
    private:
        sem_t sem_;
    };
    
    #endif
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    其次,我们得实现一个环形队列,我们可以使用STL库中的vector容器来实现:

    #pragma once
    
    #ifndef _Ring_QUEUE_HPP_
    #define _Ring_QUEUE_HPP_
    
    #include 
    #include 
    #include 
    #include 
    #include "Sem.hpp"
    
    #define NUM 5
    template <class T>
    class RingQueue
    {
    public:
        // 构造函数
        RingQueue(int num = NUM)
            : _ring_queue_(NUM)
            , num_(NUM)
            , c_step_(0)
            , p_step_(0)
            , space_sem_(NUM)
            , data_sem_(0)
        {
        }
    
        // 析构函数
        ~RingQueue()
        {
        }
    
        // 生产者生产资源
        void push(const T &in)
        {
            // 申请信号量
            space_sem_.p();
    
            _ring_queue_[p_step_++] = in;
            p_step_ %= num_;
    
            data_sem_.v();
        }
    
        // 消费者消费资源
        void pop(T &out)
        {
            // 申请信号量
            data_sem_.p();
    
            out = _ring_queue_[c_step_++];
            c_step_ %= num_;
    
            space_sem_.v();
        }
    
    private:
        std::vector<T> _ring_queue_;
        int num_;
        int c_step_; // 生产者下标
        int p_step_; // 消费者下标
        Sem space_sem_;
        Sem data_sem_;
    };
    
    #endif
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    代码说明

    1. 我们默认环形队列的下标是5;
    2. 由于我们使用vector容器来实现RingQueue,生产者每次生产的数据放到vector下标为p_step_的位置,消费者每次消费的数据来源于vector下标为c_step_的位置;
    3. 生产者每次生产数据后p_step_都会进行++,标记下一次生产数据的存放位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果;消费者每次消费数据后c_step_都会进行++,标记下一次消费数据的来源位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果;
    4. p_step_只会由生产者线程进行更新,c_step_只会由消费者线程进行更新,对这两个变量访问时不需要进行保护,因此代码中将p_step_和c_step_的更新放到了P操作之后,就是为了尽量减少临界区的代码。

    我们这里实现单生产者、单消费者的生产者消费者模型。于是在主函数我们就只需要创建一个生产者线程和一个消费者线程,生产者线程不断生产数据放入环形队列,消费者线程不断从环形队列里取出数据进行消费。

    #include "RingQueue.hpp"
    #include 
    #include 
    
    void *consumer(void *args)
    {
        RingQueue<int> *rq = (RingQueue<int> *)args;
    
        while (true)
        {
            int x;
            rq->pop(x);
    
            std::cout << "消费:" << x << std::endl;
        }
    }
    
    void *productor(void *args)
    {
        RingQueue<int> *rq = (RingQueue<int> *)args;
    
        while (true)
        {
            int x = rand() % 100 + 1;
    
            std::cout << "生产:" << x << std::endl;
            rq->push(x);
        }
    }
    
    int main()
    {
        srand((uint64_t)time(nullptr) ^ getpid());
        RingQueue<int> *rq = new RingQueue<int>();
        pthread_t c, p;
    
        pthread_create(&c, nullptr, consumer, (void *)rq);
        pthread_create(&p, nullptr, productor, (void *)rq);
    
        pthread_join(c, nullptr);
        pthread_join(p, nullptr);
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 环形队列要让生产者线程向队列中push数据,让消费者线程从队列中pop数据,因此这个环形队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将环形队列作为线程执行例程的参数进行传入。

    生产者与消费者步调一致

    由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的:
    在这里插入图片描述

    生产者生产的快,消费者消费的慢

    我们可以让生产者不停的进行生产,而消费者每隔一秒进行消费,此时由于生产者生产的很快,运行代码后一瞬间生产者就将环形队列打满了,此时生产者想要再进行生产,但空间资源已经为0了,于是生产者只能在space_sem的等待队列下进行阻塞等待,直到由消费者消费完一个数据后对space_sem进行了V操作,生产者才会被唤醒进而继续进行生产。

    但由于生产者的生产速度很快,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
    在这里插入图片描述

    生产者生产的慢,消费者消费的快

    我们也可以让生产者每隔一秒进行生产,而消费者不停的进行消费,虽然消费者消费的很快,但一开始环形队列当中的数据资源为0,因此消费者只能在data_sem的等待队列下进行阻塞等待,直到生产者生产完一个数据后对data_sem进行了V操作,消费者才会被唤醒进而进行消费。

    但由于消费者的消费速度很快,消费者消费完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
    在这里插入图片描述
    我们也可以在多线程情况下进行生产和消费,但是此时就需要引入互斥锁,因为信号量本身也属于临界资源,多线程情况下,我们无法保证单个执行流对其进行访问,所以我们需要对生产者和消费者分别引入一把互斥锁:

    #pragma once
    
    #ifndef _Ring_QUEUE_HPP_
    #define _Ring_QUEUE_HPP_
    
    #include 
    #include 
    #include 
    #include 
    #include "Sem.hpp"
    
    #define NUM 5
    template <class T>
    class RingQueue
    {
    public:
        // 构造函数
        RingQueue(int num = NUM)
            : _ring_queue_(NUM), num_(NUM), c_step_(0), p_step_(0), space_sem_(NUM), data_sem_(0)
        {
            pthread_mutex_init(&p_lock, nullptr);
            pthread_mutex_init(&c_lock, nullptr);
        }
    
        // 析构函数
        ~RingQueue()
        {
            pthread_mutex_destroy(&p_lock);
            pthread_mutex_destroy(&c_lock);
        }
    
        // 生产者生产资源
        void push(const T &in)
        {
            // 申请信号量
            space_sem_.p();
    
            // 申请锁
            pthread_mutex_lock(&p_lock);
    
            _ring_queue_[p_step_++] = in;
            p_step_ %= num_;
    
            // 释放锁
            pthread_mutex_unlock(&p_lock);
    
            data_sem_.v();
        }
    
        // 消费者消费资源
        void pop(T &out)
        {
            // 申请信号量
            data_sem_.p();
    
            // 申请锁
            pthread_mutex_lock(&c_lock);
    
            out = _ring_queue_[c_step_++];
            c_step_ %= num_;
    
            // 释放锁
            pthread_mutex_unlock(&c_lock);
    
            space_sem_.v();
        }
    
    private:
        std::vector<T> _ring_queue_;
        int num_;
        int c_step_; // 生产者下标
        int p_step_; // 消费者下标
        Sem space_sem_;
        Sem data_sem_;
        pthread_mutex_t p_lock;
        pthread_mutex_t c_lock;
    };
    
    #endif
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    主函数如下:

    #include "RingQueue.hpp"
    #include 
    #include 
    
    void *consumer(void *args)
    {
        RingQueue<int> *rq = (RingQueue<int> *)args;
    
        while (true)
        {
            sleep(1);
            int x;
            rq->pop(x);
    
            std::cout << "消费:" << x << "[" << pthread_self() << "]" << std::endl;
        }
    }
    
    void *productor(void *args)
    {
        RingQueue<int> *rq = (RingQueue<int> *)args;
    
        while (true)
        {
            int x = rand() % 100 + 1;
    
            std::cout << "生产:" << x << "[" << pthread_self() << "]" << std::endl;
            rq->push(x);
        }
    }
    
    int main()
    {
        srand((uint64_t)time(nullptr) ^ getpid());
        RingQueue<int> *rq = new RingQueue<int>();
    
        pthread_t c[3], p[2];
        pthread_create(c, nullptr, consumer, (void *)rq);
        pthread_create(c + 1, nullptr, consumer, (void *)rq);
        pthread_create(c + 2, nullptr, consumer, (void *)rq);
    
        pthread_create(p, nullptr, productor, (void *)rq);
        pthread_create(p + 1, nullptr, productor, (void *)rq);
    
        for (int i = 0; i < 3; i++)
            pthread_join(c[i], nullptr);
        for (int i = 0; i < 2; i++)
            pthread_join(p[i], nullptr);
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    信号量保护环形队列的原理

    在space_sem和data_sem两个信号量的保护后,该环形队列中不可能会出现数据不一致的问题。

    在环形队列中,只有生产者跟消费者访问同一位置才会出现数据不一致的问题,而生产者与消费者只有在两种情况下才会指向同一位置:

    1. 环形队列为空时;
    2. 环形队列为满时。

    但是在这两种情况下,生产者和消费者不会同时对环形队列进行访问:

    • 当环形队列为空的时,消费者一定不能进行消费,因为此时数据资源为0。
    • 当环形队列为满的时,生产者一定不能进行生产,因为此时空间资源为0。

    也就是说,当环形队列为空和满时,我们已经通过信号量保证了生产者和消费者的串行化过程。而除了这两种情况之外,生产者和消费者指向的都不是同一个位置,因此该环形队列当中不可能会出现数据不一致的问题。并且大部分情况下生产者和消费者指向并不是同一个位置,因此大部分情况下该环形队列可以让生产者和消费者并发的执行。

  • 相关阅读:
    Flink之KeyedState
    一个比 Nginx 还简单的 Web 服务器
    Flink SQL--- Over Aggregation
    axios的封装
    云计算:开辟数字时代的无限可能
    赋能千行百业,AI究竟走到哪一步了?
    acwing算法提高之图论--单源最短路的综合应用
    ECMAScript6 Proxy和Reflect 对象操作拦截以及自定义
    yolov3学习笔记
    HTTP状态码及其描述
  • 原文地址:https://blog.csdn.net/2303_77100822/article/details/133928631