一、引言
在并发编程中,信号量(Semaphore)是一个非常重要的同步原语。它允许一定数量的线程同时访问某一资源或代码段,有效地防止了资源冲突和死锁等问题。本文将全面解析信号量在C++中的使用、功能,并给出相应的代码示例。
信号量是一个整数变量,可以用来控制多个线程对共享资源的访问。它通常用于保护对临界区的访问,防止多个线程同时修改数据造成数据不一致。信号量的值表示可用的资源数量,当一个线程需要访问共享资源时,它会尝试减少信号量的值;当线程释放资源时,它会增加信号量的值。
什么是信号量?
信号量是一种计数器,用于控制对共享资源的访问。它主要由一个整型值表示,可以对其进行加减操作。在多线程环境下,通常使用信号量来实现线程的同步和互斥。
在C++中,可以使用标准库中的std::condition_variable
和std::mutex
来实现信号量的功能。下面是一个简单的示例:
#include
#include
#include
#include
class Semaphore {
private:
std::mutex mtx_;
std::condition_variable cv_;
int count_;
public:
Semaphore(int count) : count_(count) {}
void wait() {
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] { return count_ > 0; });
--count_;
}
void signal() {
std::lock_guard<std::mutex> lock(mtx_);
++count_;
cv_.notify_one();
}
};
void worker(Semaphore& sem, int id) {
sem.wait(); // 等待信号量可用
std::cout << "Worker " << id << " is working." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
sem.signal(); // 释放信号量
}
int main() {
Semaphore sem(3); // 创建一个信号量,初始值为3
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i) {
threads.emplace_back(worker, std::ref(sem), i);
}
for (auto& t : threads) {
t.join();
}
return 0;
}
在上面的示例中,我们定义了一个Semaphore
类,它内部使用了std::mutex
和std::condition_variable
来实现信号量的功能。wait
方法用于等待信号量可用,而signal
方法用于释放信号量。在worker
函数中,线程首先调用wait
方法等待信号量可用,然后执行一些工作,最后调用signal
方法释放信号量。在main
函数中,我们创建了一个初始值为3的信号量,并启动了10个线程。由于信号量的初始值为3,因此前3个线程可以立即开始工作,而后面的线程则需要等待其他线程释放信号量后才能开始工作。
信号量是一种强大的同步原语,可以用于控制多个线程对共享资源的访问。在C++中,我们可以通过结合使用
std::mutex
和std::condition_variable
来实现信号量的功能。通过合理使用信号量,我们可以有效地防止资源冲突和死锁等问题,提高程序的并发性能。
在Linux环境下,信号量是一种重要的同步原语,用于在多线程环境下控制对共享资源的访问。下面我们将深入介绍信号量的概念、原理,并通过C语言代码示例详细解析在多线程环境下的应用。
sem_init(sem_t *sem, int pshared, unsigned int value):
初始化信号量,pshared指定信号量的类型,value指定初始计数值。sem_wait(sem_t *sem):
等待信号量,如果信号量计数值为0,则线程会阻塞,直到计数值大于0。sem_post(sem_t *sem):
增加信号量的计数值。sem_destroy(sem_t *sem):
销毁信号量。#include
#include
#include
#define NUM_THREADS 2
// 共享资源
int shared_resource = 0;
// 信号量
sem_t semaphore;
// 线程函数1,增加共享资源的值
void *thread_function1(void *arg) {
sem_wait(&semaphore); // 等待信号量
shared_resource++; // 访问共享资源
printf("Thread 1: shared_resource = %d\n", shared_resource);
sem_post(&semaphore); // 释放信号量
pthread_exit(NULL);
}
// 线程函数2,减少共享资源的值
void *thread_function2(void *arg) {
sem_wait(&semaphore); // 等待信号量
shared_resource--; // 访问共享资源
printf("Thread 2: shared_resource = %d\n", shared_resource);
sem_post(&semaphore); // 释放信号量
pthread_exit(NULL);
}
int main() {
pthread_t threads[NUM_THREADS];
// 初始化信号量,初始值为1
sem_init(&semaphore, 0, 1);
// 创建线程
pthread_create(&threads[0], NULL, thread_function1, NULL);
pthread_create(&threads[1], NULL, thread_function2, NULL);
// 等待线程结束
pthread_join(threads[0], NULL);
pthread_join(threads[1], NULL);
// 销毁信号量
sem_destroy(&semaphore);
return 0;
}
在上述代码中,我们使用了pthread库来创建两个线程,并使用semaphore来控制对共享资源的访问。在main函数中,我们首先初始化了信号量,然后创建了两个线程,分别执行不同的操作。在每个线程函数中,我们使用sem_wait来等待信号量,表示线程需要获取共享资源的访问权限。之后,线程对共享资源进行操作,并使用sem_post释放信号量,表示操作结束。
在上述代码中,我们将信号量初始化为1,这里就相当于二元信号量(和互斥锁同理)
对于生产者和消费者来说,它们关注的资源是不同的:
#include
#include
#include
#include // For O_* constants
#include // For mode constants
#include // For close()
// 环形队列的固定大小
const int QUEUE_SIZE = 10;
// 环形队列结构
struct CircularQueue {
int buffer[QUEUE_SIZE];
sem_t full; // 表示队列中当前有多少个元素
sem_t empty; // 表示队列中有多少个空位可以插入新元素
int head; // 队列头指针
int tail; // 队列尾指针
};
// 初始化环形队列
void initQueue(CircularQueue* queue) {
queue->head = 0;
queue->tail = 0;
sem_init(&queue->full, 0, 0); // 初始时队列为空
sem_init(&queue->empty, 0, QUEUE_SIZE); // 初始时队列有QUEUE_SIZE个空位
}
// 销毁环形队列
void destroyQueue(CircularQueue* queue) {
sem_destroy(&queue->full);
sem_destroy(&queue->empty);
}
// 入队操作
void enqueue(CircularQueue* queue, int item) {
sem_wait(&queue->empty); // 等待空位
buffer[tail] = item; // 将元素放入队列尾
tail = (tail + 1) % QUEUE_SIZE; // 更新尾指针
sem_post(&queue->full); // 增加队列中的元素数量
}
// 出队操作
int dequeue(CircularQueue* queue) {
int item;
sem_wait(&queue->full); // 等待队列中有元素
item = buffer[head]; // 取出队列头的元素
head = (head + 1) % QUEUE_SIZE; // 更新头指针
sem_post(&queue->empty); // 增加队列中的空位数量
return item;
}
int main() {
CircularQueue queue;
initQueue(&queue);
// 生产者线程入队操作
for (int i = 0; i < 15; ++i) {
enqueue(&queue, i);
std::cout << "Enqueued item: " << i << std::endl;
}
// 消费者线程出队操作
for (int i = 0; i < 10; ++i) {
int item = dequeue(&queue);
std::cout << "Dequeued item: " << item << std::endl;
}
destroyQueue(&queue);
return 0;
}
我们上述代码没有处理多线程环境下可能发生的竞态条件。在真实的多线程环境中,你需要确保enqueue
和dequeue
操作的原子性,这通常通过使用互斥锁(如std::mutex
或pthread_mutex_t
)来实现。
此外,POSIX信号量在Unix-like系统上通常用于进程间同步,而不是线程间同步。
POSIX信号量(POSIX semaphores)确实最初设计用于进程间同步,允许不同进程之间协调对共享资源的访问。不过,这并不意味着信号量不能用于线程间同步。在多线程编程中,POSIX信号量同样可以发挥重要作用,特别是在需要跨线程边界进行同步时。
POSIX标准定义了两种信号量:未命名的信号量(也叫做内部或基于内存的信号量)和命名的信号量(也叫做外部或文件系统的信号量)。未命名的信号量通常用于同一进程内的线程间同步,而命名的信号量可以用于进程间同步。
在多线程应用中,如果你需要跨多个线程协调资源访问,或者确保某个代码段(临界区)不会被多个线程同时执行,可以使用未命名的POSIX信号量。这些信号量在进程地址空间内创建,可以被该进程内的所有线程共享和访问。
因此,尽管POSIX信号量最初是为进程间同步设计的,但它们同样适用于线程间同步。实际上,在Unix-like系统上,使用POSIX信号量进行线程间同步是一种常见做法,尤其是在需要跨多个线程协调资源访问时。
然而,对于线程间同步,许多现代编程语言和库也提供了其他机制,如互斥锁(mutexes)、条件变量(condition
variables)和读写锁(read-write
locks)等。这些机制通常更加轻量级,并且更易于在特定编程环境中使用。因此,在选择使用信号量还是其他同步机制时,应该根据具体的应用需求和编程环境来决定。
对于线程间同步,通常使用Pthreads库中的pthread_mutex_t
和pthread_cond_t
。
在实际应用中,确保正确处理信号量的初始化和销毁,避免资源泄漏,并且在不再需要时正确销毁它们。在上面的示例中,我们假设sem_init
和sem_destroy
用于初始化和销毁信号量,这在POSIX环境中是标准的做法。
于是我们进行下述修正
全局变量访问:在 enqueue
和 dequeue
函数中,对 buffer、head、tail
的访问是全局的,可能导致竞态条件。需要在访问这些全局变量时使用互斥锁或其他同步机制。
信号量使用:虽然使用了信号量 full
和 empty
来表示队列中的元素数量和空位数量,但在 enqueue
和 dequeue
函数中没有对这些信号量的操作进行保护,因此仍然可能存在竞态条件。
缺少互斥锁:在 enqueue
和 dequeue
函数中需要使用互斥锁来保护对共享资源的访问,以确保在多个线程同时访问时的数据一致性。
以下是修改后的代码示例,添加了互斥锁以保护对共享资源的访问:
#include
#include
#include
#include // For O_* constants
#include // For mode constants
#include // For close()
#include // For pthread_mutex_t
// 环形队列的固定大小
const int QUEUE_SIZE = 10;
// 环形队列结构
struct CircularQueue {
int buffer[QUEUE_SIZE];
sem_t full; // 表示队列中当前有多少个元素
sem_t empty; // 表示队列中有多少个空位可以插入新元素
int head; // 队列头指针
int tail; // 队列尾指针
pthread_mutex_t mutex; // 互斥锁
};
// 初始化环形队列
void initQueue(CircularQueue* queue) {
queue->head = 0;
queue->tail = 0;
sem_init(&queue->full, 0, 0); // 初始时队列为空
sem_init(&queue->empty, 0, QUEUE_SIZE); // 初始时队列有QUEUE_SIZE个空位
pthread_mutex_init(&queue->mutex, NULL); // 初始化互斥锁
}
// 销毁环形队列
void destroyQueue(CircularQueue* queue) {
sem_destroy(&queue->full);
sem_destroy(&queue->empty);
pthread_mutex_destroy(&queue->mutex); // 销毁互斥锁
}
// 入队操作
void enqueue(CircularQueue* queue, int item) {
sem_wait(&queue->empty); // 等待空位
pthread_mutex_lock(&queue->mutex); // 加锁
queue->buffer[queue->tail] = item; // 将元素放入队列尾
queue->tail = (queue->tail + 1) % QUEUE_SIZE; // 更新尾指针
pthread_mutex_unlock(&queue->mutex); // 解锁
sem_post(&queue->full); // 增加队列中的元素数量
}
// 出队操作
int dequeue(CircularQueue* queue) {
int item;
sem_wait(&queue->full); // 等待队列中有元素
pthread_mutex_lock(&queue->mutex); // 加锁
item = queue->buffer[queue->head]; // 取出队列头的元素
queue->head = (queue->head + 1) % QUEUE_SIZE; // 更新头指针
pthread_mutex_unlock(&queue->mutex); // 解锁
sem_post(&queue->empty); // 增加队列中的空位数量
return item;
}
int main() {
CircularQueue queue;
initQueue(&queue);
// 生产者线程入队操作
for (int i = 0; i < 15; ++i) {
enqueue(&queue, i);
std::cout << "Enqueued item: " << i << std::endl;
}
// 消费者线程出队操作
for (int i = 0; i < 10; ++i) {
int item = dequeue(&queue);
std::cout << "Dequeued item: " << item << std::endl;
}
destroyQueue(&queue);
return 0;
}
通过在 enqueue
和 dequeue
函数中加入互斥锁,保护了对共享资源的访问,从而确保了线程安全性。
又通过上述可知,POSIX信号量通过初始化成二元信号量可以变成互斥锁
使用二元信号量来模拟互斥锁,在初始化信号量时将初始值设置为1即可。在进入临界区之前,使用 sem_wait 函数来获得信号量;在离开临界区时,使用 sem_post 函数释放信号量。
#include
#include
struct Mutex {
sem_t semaphore;
};
void initMutex(Mutex* mutex) {
sem_init(&mutex->semaphore, 0, 1); // 初始化二元信号量,初始值为1
}
void lockMutex(Mutex* mutex) {
sem_wait(&mutex->semaphore); // 请求信号量
}
void unlockMutex(Mutex* mutex) {
sem_post(&mutex->semaphore); // 释放信号量
}
int main() {
Mutex mutex;
initMutex(&mutex);
// 进入临界区
lockMutex(&mutex);
std::cout << "Inside critical section" << std::endl;
unlockMutex(&mutex);
// 离开临界区
return 0;
}
#include
#include
#include // For pthread_mutex_t
class Mutex {
public:
Mutex() {
sem_init(&semaphore, 0, 1); // 初始化二元信号量,初始值为1
}
~Mutex() {
sem_destroy(&semaphore); // 销毁信号量
}
void lock() {
sem_wait(&semaphore); // 请求信号量
}
void unlock() {
sem_post(&semaphore); // 释放信号量
}
private:
sem_t semaphore;
};
// 环形队列的固定大小
const int QUEUE_SIZE = 10;
class CircularQueue {
public:
CircularQueue() : head(0), tail(0) {
sem_init(&full, 0, 0); // 初始时队列为空
sem_init(&empty, 0, QUEUE_SIZE); // 初始时队列有QUEUE_SIZE个空位
}
~CircularQueue() {
sem_destroy(&full);
sem_destroy(&empty);
}
// 入队操作
void enqueue(int item) {
sem_wait(&empty); // 等待空位
mutex.lock(); // 加锁
buffer[tail] = item; // 将元素放入队列尾
tail = (tail + 1) % QUEUE_SIZE; // 更新尾指针
mutex.unlock(); // 解锁
sem_post(&full); // 增加队列中的元素数量
}
// 出队操作
int dequeue() {
int item;
sem_wait(&full); // 等待队列中有元素
mutex.lock(); // 加锁
item = buffer[head]; // 取出队列头的元素
head = (head + 1) % QUEUE_SIZE; // 更新头指针
mutex.unlock(); // 解锁
sem_post(&empty); // 增加队列中的空位数量
return item;
}
private:
int buffer[QUEUE_SIZE];
sem_t full; // 表示队列中当前有多少个元素
sem_t empty; // 表示队列中有多少个空位可以插入新元素
int head; // 队列头指针
int tail; // 队列尾指针
Mutex mutex; // 互斥锁
};
int main() {
CircularQueue queue;
// 生产者线程入队操作
for (int i = 0; i < 15; ++i) {
queue.enqueue(i);
std::cout << "Enqueued item: " << i << std::endl;
}
// 消费者线程出队操作
for (int i = 0; i < 10; ++i) {
int item = queue.dequeue();
std::cout << "Dequeued item: " << item << std::endl;
}
return 0;
}
恭喜你已经完整地理解到POSIX信号量了!