• 信号量全面解析与使用


    一、引言

    并发编程中,信号量(Semaphore)是一个非常重要的同步原语。它允许一定数量的线程同时访问某一资源或代码段,有效地防止了资源冲突和死锁等问题。本文将全面解析信号量在C++中的使用、功能,并给出相应的代码示例。

    一、信号量的基本概念

    信号量是一个整数变量,可以用来控制多个线程对共享资源的访问。它通常用于保护对临界区的访问,防止多个线程同时修改数据造成数据不一致。信号量的值表示可用的资源数量,当一个线程需要访问共享资源时,它会尝试减少信号量的值;当线程释放资源时,它会增加信号量的值。

    什么是信号量?
    信号量是一种计数器,用于控制对共享资源的访问。它主要由一个整型值表示,可以对其进行加减操作。在多线程环境下,通常使用信号量来实现线程的同步和互斥。

    二、信号量的功能

    1. 同步:信号量可以实现线程之间的同步,确保线程在正确的时机访问共享资源。
    2. 互斥:通过设置信号量的初始值为1,可以实现互斥锁的功能,确保同一时间只有一个线程访问共享资源。
    3. 计数:信号量还可以用来限制对资源的并发访问数量,通过设置不同的初始值,可以控制同时访问资源的线程数。

    三、C++中使用信号量的示例

    在C++中,可以使用标准库中的std::condition_variablestd::mutex来实现信号量的功能。下面是一个简单的示例:

    #include 
    #include 
    #include 
    #include 
    
    class Semaphore {
    private:
        std::mutex mtx_;
        std::condition_variable cv_;
        int count_;
    
    public:
        Semaphore(int count) : count_(count) {}
    
        void wait() {
            std::unique_lock<std::mutex> lock(mtx_);
            cv_.wait(lock, [this] { return count_ > 0; });
            --count_;
        }
    
        void signal() {
            std::lock_guard<std::mutex> lock(mtx_);
            ++count_;
            cv_.notify_one();
        }
    };
    
    void worker(Semaphore& sem, int id) {
        sem.wait();  // 等待信号量可用
        std::cout << "Worker " << id << " is working." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
        sem.signal();  // 释放信号量
    }
    
    int main() {
        Semaphore sem(3);  // 创建一个信号量,初始值为3
        std::vector<std::thread> threads;
    
        for (int i = 0; i < 10; ++i) {
            threads.emplace_back(worker, std::ref(sem), i);
        }
    
        for (auto& t : threads) {
            t.join();
        }
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    在上面的示例中,我们定义了一个Semaphore类,它内部使用了std::mutexstd::condition_variable来实现信号量的功能。wait方法用于等待信号量可用,而signal方法用于释放信号量。在worker函数中,线程首先调用wait方法等待信号量可用,然后执行一些工作,最后调用signal方法释放信号量。在main函数中,我们创建了一个初始值为3的信号量,并启动了10个线程。由于信号量的初始值为3,因此前3个线程可以立即开始工作,而后面的线程则需要等待其他线程释放信号量后才能开始工作。

    信号量是一种强大的同步原语,可以用于控制多个线程对共享资源的访问。在C++中,我们可以通过结合使用std::mutexstd::condition_variable来实现信号量的功能。通过合理使用信号量,我们可以有效地防止资源冲突和死锁等问题,提高程序的并发性能。

    四、POSIX信号量

    在Linux环境下,信号量是一种重要的同步原语,用于在多线程环境下控制对共享资源的访问。下面我们将深入介绍信号量的概念、原理,并通过C语言代码示例详细解析在多线程环境下的应用。

    • sem_init(sem_t *sem, int pshared, unsigned int value):初始化信号量,pshared指定信号量的类型,value指定初始计数值。
    • sem_wait(sem_t *sem):等待信号量,如果信号量计数值为0,则线程会阻塞,直到计数值大于0。
    • sem_post(sem_t *sem):增加信号量的计数值。
    • sem_destroy(sem_t *sem):销毁信号量。
    #include 
    #include 
    #include 
    
    #define NUM_THREADS 2
    
    // 共享资源
    int shared_resource = 0;
    // 信号量
    sem_t semaphore;
    
    // 线程函数1,增加共享资源的值
    void *thread_function1(void *arg) {
        sem_wait(&semaphore); // 等待信号量
        shared_resource++; // 访问共享资源
        printf("Thread 1: shared_resource = %d\n", shared_resource);
        sem_post(&semaphore); // 释放信号量
        pthread_exit(NULL);
    }
    
    // 线程函数2,减少共享资源的值
    void *thread_function2(void *arg) {
        sem_wait(&semaphore); // 等待信号量
        shared_resource--; // 访问共享资源
        printf("Thread 2: shared_resource = %d\n", shared_resource);
        sem_post(&semaphore); // 释放信号量
        pthread_exit(NULL);
    }
    
    int main() {
        pthread_t threads[NUM_THREADS];
    
        // 初始化信号量,初始值为1
        sem_init(&semaphore, 0, 1);
    
        // 创建线程
        pthread_create(&threads[0], NULL, thread_function1, NULL);
        pthread_create(&threads[1], NULL, thread_function2, NULL);
    
        // 等待线程结束
        pthread_join(threads[0], NULL);
        pthread_join(threads[1], NULL);
    
        // 销毁信号量
        sem_destroy(&semaphore);
    
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    在上述代码中,我们使用了pthread库来创建两个线程,并使用semaphore来控制对共享资源的访问。在main函数中,我们首先初始化了信号量,然后创建了两个线程,分别执行不同的操作。在每个线程函数中,我们使用sem_wait来等待信号量,表示线程需要获取共享资源的访问权限。之后,线程对共享资源进行操作,并使用sem_post释放信号量,表示操作结束。

    在上述代码中,我们将信号量初始化为1,这里就相当于二元信号量(和互斥锁同理)

    五、基于环形队列的生产消费者模型

    在这里插入图片描述

    对于生产者和消费者来说,它们关注的资源是不同的:

    • 生产者关注的是环形队列当中是否有空间(empty),只要有空间生产者就可以进行生产。
    • 消费者关注的是环形队列当中是否有数据(full),只要有数据消费者就可以进行消费。
    #include 
    #include 
    #include 
    #include            // For O_* constants
    #include         // For mode constants
    #include           // For close()
    
    // 环形队列的固定大小
    const int QUEUE_SIZE = 10;
    
    // 环形队列结构
    struct CircularQueue {
        int buffer[QUEUE_SIZE];
        sem_t full;    // 表示队列中当前有多少个元素
        sem_t empty;   // 表示队列中有多少个空位可以插入新元素
        int head;      // 队列头指针
        int tail;      // 队列尾指针
    };
    
    // 初始化环形队列
    void initQueue(CircularQueue* queue) {
        queue->head = 0;
        queue->tail = 0;
        sem_init(&queue->full, 0, 0);  // 初始时队列为空
        sem_init(&queue->empty, 0, QUEUE_SIZE);  // 初始时队列有QUEUE_SIZE个空位
    }
    
    // 销毁环形队列
    void destroyQueue(CircularQueue* queue) {
        sem_destroy(&queue->full);
        sem_destroy(&queue->empty);
    }
    
    // 入队操作
    void enqueue(CircularQueue* queue, int item) {
        sem_wait(&queue->empty);  // 等待空位
        buffer[tail] = item;      // 将元素放入队列尾
        tail = (tail + 1) % QUEUE_SIZE;  // 更新尾指针
        sem_post(&queue->full);  // 增加队列中的元素数量
    }
    
    // 出队操作
    int dequeue(CircularQueue* queue) {
        int item;
        sem_wait(&queue->full);  // 等待队列中有元素
        item = buffer[head];     // 取出队列头的元素
        head = (head + 1) % QUEUE_SIZE;  // 更新头指针
        sem_post(&queue->empty);  // 增加队列中的空位数量
        return item;
    }
    
    int main() {
        CircularQueue queue;
        initQueue(&queue);
    
        // 生产者线程入队操作
        for (int i = 0; i < 15; ++i) {
            enqueue(&queue, i);
            std::cout << "Enqueued item: " << i << std::endl;
        }
    
        // 消费者线程出队操作
        for (int i = 0; i < 10; ++i) {
            int item = dequeue(&queue);
            std::cout << "Dequeued item: " << item << std::endl;
        }
    
        destroyQueue(&queue);
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    我们上述代码没有处理多线程环境下可能发生的竞态条件。在真实的多线程环境中,你需要确保enqueuedequeue操作的原子性,这通常通过使用互斥锁(如std::mutexpthread_mutex_t)来实现。

    此外,POSIX信号量在Unix-like系统上通常用于进程间同步,而不是线程间同步。
    
    • 1

    POSIX信号量(POSIX semaphores)确实最初设计用于进程间同步,允许不同进程之间协调对共享资源的访问。不过,这并不意味着信号量不能用于线程间同步。在多线程编程中,POSIX信号量同样可以发挥重要作用,特别是在需要跨线程边界进行同步时。

    POSIX标准定义了两种信号量:未命名的信号量(也叫做内部或基于内存的信号量)和命名的信号量(也叫做外部或文件系统的信号量)。未命名的信号量通常用于同一进程内的线程间同步,而命名的信号量可以用于进程间同步。

    在多线程应用中,如果你需要跨多个线程协调资源访问,或者确保某个代码段(临界区)不会被多个线程同时执行,可以使用未命名的POSIX信号量。这些信号量在进程地址空间内创建,可以被该进程内的所有线程共享和访问。

    因此,尽管POSIX信号量最初是为进程间同步设计的,但它们同样适用于线程间同步。实际上,在Unix-like系统上,使用POSIX信号量进行线程间同步是一种常见做法,尤其是在需要跨多个线程协调资源访问时。

    然而,对于线程间同步,许多现代编程语言和库也提供了其他机制,如互斥锁(mutexes)、条件变量(condition
    variables)和读写锁(read-write
    locks)等。这些机制通常更加轻量级,并且更易于在特定编程环境中使用。因此,在选择使用信号量还是其他同步机制时,应该根据具体的应用需求和编程环境来决定。

    对于线程间同步,通常使用Pthreads库中的pthread_mutex_tpthread_cond_t

    在实际应用中,确保正确处理信号量的初始化和销毁,避免资源泄漏,并且在不再需要时正确销毁它们。在上面的示例中,我们假设sem_initsem_destroy用于初始化和销毁信号量,这在POSIX环境中是标准的做法。

    于是我们进行下述修正

    1. 全局变量访问:在 enqueuedequeue 函数中,对 buffer、head、tail 的访问是全局的,可能导致竞态条件。需要在访问这些全局变量时使用互斥锁或其他同步机制。

    2. 信号量使用:虽然使用了信号量 fullempty 来表示队列中的元素数量和空位数量,但在 enqueuedequeue 函数中没有对这些信号量的操作进行保护,因此仍然可能存在竞态条件。

    3. 缺少互斥锁:在 enqueuedequeue 函数中需要使用互斥锁来保护对共享资源的访问,以确保在多个线程同时访问时的数据一致性。

    以下是修改后的代码示例,添加了互斥锁以保护对共享资源的访问:

    #include 
    #include 
    #include 
    #include            // For O_* constants
    #include         // For mode constants
    #include           // For close()
    #include          // For pthread_mutex_t
    
    // 环形队列的固定大小
    const int QUEUE_SIZE = 10;
    
    // 环形队列结构
    struct CircularQueue {
        int buffer[QUEUE_SIZE];
        sem_t full;    // 表示队列中当前有多少个元素
        sem_t empty;   // 表示队列中有多少个空位可以插入新元素
        int head;      // 队列头指针
        int tail;      // 队列尾指针
        pthread_mutex_t mutex; // 互斥锁
    };
    
    // 初始化环形队列
    void initQueue(CircularQueue* queue) {
        queue->head = 0;
        queue->tail = 0;
        sem_init(&queue->full, 0, 0);  // 初始时队列为空
        sem_init(&queue->empty, 0, QUEUE_SIZE);  // 初始时队列有QUEUE_SIZE个空位
        pthread_mutex_init(&queue->mutex, NULL); // 初始化互斥锁
    }
    
    // 销毁环形队列
    void destroyQueue(CircularQueue* queue) {
        sem_destroy(&queue->full);
        sem_destroy(&queue->empty);
        pthread_mutex_destroy(&queue->mutex); // 销毁互斥锁
    }
    
    // 入队操作
    void enqueue(CircularQueue* queue, int item) {
        sem_wait(&queue->empty);  // 等待空位
        pthread_mutex_lock(&queue->mutex); // 加锁
        queue->buffer[queue->tail] = item;      // 将元素放入队列尾
        queue->tail = (queue->tail + 1) % QUEUE_SIZE;  // 更新尾指针
        pthread_mutex_unlock(&queue->mutex); // 解锁
        sem_post(&queue->full);  // 增加队列中的元素数量
    }
    
    // 出队操作
    int dequeue(CircularQueue* queue) {
        int item;
        sem_wait(&queue->full);  // 等待队列中有元素
        pthread_mutex_lock(&queue->mutex); // 加锁
        item = queue->buffer[queue->head];     // 取出队列头的元素
        queue->head = (queue->head + 1) % QUEUE_SIZE;  // 更新头指针
        pthread_mutex_unlock(&queue->mutex); // 解锁
        sem_post(&queue->empty);  // 增加队列中的空位数量
        return item;
    }
    
    int main() {
        CircularQueue queue;
        initQueue(&queue);
    
        // 生产者线程入队操作
        for (int i = 0; i < 15; ++i) {
            enqueue(&queue, i);
            std::cout << "Enqueued item: " << i << std::endl;
        }
    
        // 消费者线程出队操作
        for (int i = 0; i < 10; ++i) {
            int item = dequeue(&queue);
            std::cout << "Dequeued item: " << item << std::endl;
        }
    
        destroyQueue(&queue);
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    通过在 enqueuedequeue 函数中加入互斥锁,保护了对共享资源的访问,从而确保了线程安全性。

    又通过上述可知,POSIX信号量通过初始化成二元信号量可以变成互斥锁

    使用二元信号量来模拟互斥锁,在初始化信号量时将初始值设置为1即可。在进入临界区之前,使用 sem_wait 函数来获得信号量;在离开临界区时,使用 sem_post 函数释放信号量。

    #include 
    #include 
    
    struct Mutex {
        sem_t semaphore;
    };
    
    void initMutex(Mutex* mutex) {
        sem_init(&mutex->semaphore, 0, 1); // 初始化二元信号量,初始值为1
    }
    
    void lockMutex(Mutex* mutex) {
        sem_wait(&mutex->semaphore); // 请求信号量
    }
    
    void unlockMutex(Mutex* mutex) {
        sem_post(&mutex->semaphore); // 释放信号量
    }
    
    int main() {
        Mutex mutex;
        initMutex(&mutex);
    
        // 进入临界区
        lockMutex(&mutex);
        std::cout << "Inside critical section" << std::endl;
        unlockMutex(&mutex);
        // 离开临界区
    
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    封装后的源码

    在这里插入图片描述

    #include 
    #include 
    #include  // For pthread_mutex_t
    
    class Mutex {
    public:
        Mutex() {
            sem_init(&semaphore, 0, 1); // 初始化二元信号量,初始值为1
        }
    
        ~Mutex() {
            sem_destroy(&semaphore); // 销毁信号量
        }
    
        void lock() {
            sem_wait(&semaphore); // 请求信号量
        }
    
        void unlock() {
            sem_post(&semaphore); // 释放信号量
        }
    
    private:
        sem_t semaphore;
    };
    
    // 环形队列的固定大小
    const int QUEUE_SIZE = 10;
    
    class CircularQueue {
    public:
        CircularQueue() : head(0), tail(0) {
            sem_init(&full, 0, 0);      // 初始时队列为空
            sem_init(&empty, 0, QUEUE_SIZE);  // 初始时队列有QUEUE_SIZE个空位
        }
    
        ~CircularQueue() {
            sem_destroy(&full);
            sem_destroy(&empty);
        }
    
        // 入队操作
        void enqueue(int item) {
            sem_wait(&empty);  // 等待空位
            mutex.lock(); // 加锁
            buffer[tail] = item;      // 将元素放入队列尾
            tail = (tail + 1) % QUEUE_SIZE;  // 更新尾指针
            mutex.unlock(); // 解锁
            sem_post(&full);  // 增加队列中的元素数量
        }
    
        // 出队操作
        int dequeue() {
            int item;
            sem_wait(&full);  // 等待队列中有元素
            mutex.lock(); // 加锁
            item = buffer[head];     // 取出队列头的元素
            head = (head + 1) % QUEUE_SIZE;  // 更新头指针
            mutex.unlock(); // 解锁
            sem_post(&empty);  // 增加队列中的空位数量
            return item;
        }
    
    private:
        int buffer[QUEUE_SIZE];
        sem_t full;    // 表示队列中当前有多少个元素
        sem_t empty;   // 表示队列中有多少个空位可以插入新元素
        int head;      // 队列头指针
        int tail;      // 队列尾指针
        Mutex mutex;   // 互斥锁
    };
    
    int main() {
        CircularQueue queue;
    
        // 生产者线程入队操作
        for (int i = 0; i < 15; ++i) {
            queue.enqueue(i);
            std::cout << "Enqueued item: " << i << std::endl;
        }
    
        // 消费者线程出队操作
        for (int i = 0; i < 10; ++i) {
            int item = queue.dequeue();
            std::cout << "Dequeued item: " << item << std::endl;
        }
    
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    恭喜你已经完整地理解到POSIX信号量了!

  • 相关阅读:
    传奇hero引擎版本服务端转换GEE引教程教学篇
    8.3:加强堆的应用
    Jenkins 添加 Slave Agent 节点时报类文件不匹配错误
    Docker一键过滤服务日志脚本
    Python算法于强化学习库之rlax使用详解
    php程序设计的基本原则
    热点报告 | 解压经济成为新风向,素人改造踩中用户痛点
    Spring Boot 实现统一异常处理:构建健壮的应用
    直播录屏软件哪个好?什么软件可以录屏直播会议?
    电脑经常弹出“不支持的硬件”解决办法
  • 原文地址:https://blog.csdn.net/sun_0228/article/details/138160558