信号量(Semaphore)有Dijkstra构思,来源于铁路的运行:在一条单轨铁路上,只允许一列列车行驶。信号量就是管理这条铁路的信号。任何一列火车必须等到铁路可以行驶的信号后才可以加入轨道。当一列火车加入单轨运行后,需要将信号改为禁止进入,从而防止别的火车同时进入轨道。当火车驶入单轨后,则需要将信号变回允许进入状态。
信号量是所有原语里面功能最强大的。它不光是一个通信原语(火车案例),还是一个同步原语。
Sepaphore就是一个计数器,其取值为当前累计的信号量数,支持两个操作:加法操作Up(P),减法操作(V),具体介绍如下:
加法操作Up(P):
1.将信号量的值增加1(唤醒一个在该信号量上等待的线程)
2.线程继续运行。
减法操作Down(V):
1.判断信号量的取值是否>=1
2.如果是,将信号量的值减1,继续执行
3.否则,在该信号量上等待(该线程被挂起)
binary_semaphore其实就是锁,down(1->0)对应获得锁的控制权,up(0->1)对应释放锁的控制权。
加法操作Up(P):
1.将信号量的值设置为1
2.唤醒在该信号量等待的第一个线程
3.线程继续运行。
减法操作Down(V):
1.判断信号量的取值是否=1
2.如果是,将信号量的值设置为0,继续执行
3.否则,在该信号量上等待(该线程被挂起)
在计算机中,信号量实际就是一个简单整数,它要么为binary_semaphore二元信号量,要么为counting_semaphore实现非负资源计数的信号量。在二进制信号量中,一个进程在信号量为1的情况下推进,同时将信号量减减变为0,防止别的进程推进;在任务完成后,将信号量变为1推动别的进程运行。
两种信号量的具体在下面的内容中具体介绍。
条件变量是允许多个线程相互交流的同步原语。它允许一定量的线程等待(可以定时)另一线程的提醒,然后再继续。条件变量始终关联到一个互斥。
信号量 (semaphore) 是一种轻量的同步原件,用于制约对共享资源的并发访问(控制线程的并发数量)。在可以使用两者时,信号量能比条件变量更有效率。
函数 | 功能 |
---|---|
counting_semaphore( std::ptrdiff_t desired ); | 构造一个 std::counting_semaphore 类型对象,初始化其计数器的值为 desired 。 |
counting_semaphore( const counting_semaphore& ) = delete; | 复制构造函数被删除。 |
operator= | operator=[被删除] counting_semaphore 不可赋值 |
void release( std::ptrdiff_t update = 1 ); | 原子地将内部计数器的值增加 update 。唤醒等待的线程。 |
void acquire(); | 若内部计数器大于 0 则尝试将它减少 1 ;否则阻塞直至它大于 0 且能成功减少内部计数器。 唤醒线程继续执行 |
bool try_acquire() ; | 若内部计数器大于 0 则尝试原子地将它减少 1 ;不出现阻塞。 |
template | |
bool try_acquire_until( const std::chrono::time_point | 若内部计数器大于 0 则尝试原子地将它减少 1 ;否则阻塞直至它大于 0 且能成功地减少内部计数器,或已经经过 abs_time 时间点。(尝试减少内部计数器,阻塞直至一个时间点) |
constexpr std::ptrdiff_t max() noexcept; | 返回内部计数器的最大可能值,它大于或等于 LeastMaxValue 。 |
函数 | 功能 |
---|---|
counting_semaphore( std::ptrdiff_t desired ); | 构造一个 std::counting_semaphore 类型对象,初始化其计数器的值为 desired 。 |
counting_semaphore( const counting_semaphore& ) = delete; | 复制构造函数被删除。 |
void release( std::ptrdiff_t update = 1 ); | 原子地将内部计数器的值增加 update 。唤醒等待的线程。 |
void acquire(); | 若内部计数器大于 0 则尝试将它减少 1 ;否则阻塞直至它大于 0 且能成功减少内部计数器。 唤醒线程继续执行 |
bool try_acquire() ; | 若内部计数器大于 0 则尝试原子地将它减少 1 ;不出现阻塞。 |
template | |
bool try_acquire_for( const std::chrono::duration | 若内部计数器大于 0 则尝试原子地将它减少 1 ;否则阻塞直至它大于 0 且能成功地减少内部计数器,或已经超出 rel_time 时间点。(尝试减少内部计数器,至多阻塞一段时长) |
template | |
bool try_acquire_until( const std::chrono::time_point | 若内部计数器大于 0 则尝试原子地将它减少 1 ;否则阻塞直至它大于 0 且能成功地减少内部计数器,或已经经过 abs_time 时间点(尝试减少内部计数器,阻塞直至一个时间点) |
constexpr std::ptrdiff_t max() noexcept; | 返回内部计数器的最大可能值,它大于或等于 LeastMaxValue 。对于 binary_semaphore , LeastMaxValue 等于 1 。 |
Semaphore 是 synchronized(同步) 的加强版,作用是控制线程的并发数量。
关于信号量 Semaphore 的 acquire 与 release 的说明
1、Semaphore 信号量作为一种流控手段,可以对特定资源的允许同时访问的操作数量进行控制,例如池化技术(连接池)中的并发数,有界阻塞容器的容量等。
2、Semaphore 中包含初始化时固定个数的许可,在进行操作的时候,需要先 acquire 获取到许可,才可以继续执行任务,如果获取失败,则进入阻塞;处理完成之后需要 release 释放许可。
3、acquire 与 release 之间的关系:
在实现中不包含真正的许可对象,并且 Semaphore 也不会将许可与线程关联起来,因此在一个线程中获得的许可可以在另一个线程中释放。
可以将 acquire 操作视为是消费一个许可,而 release 操作是创建一个许可,Semaphore 并不受限于它在创建时的初始许可数量。
也就是说 acquire 与 release 并没有强制的一对一关系,release 一次就相当于新增一个许可,许可的数量可能会由于没有与 acquire 操作一对一而导致超出初始化时设置的许可个数。
#include
#include
#include
using namespace std;
counting_semaphore sema(1);
counting_semaphore semb(0);
counting_semaphore semc(0);
void threadfun1()//大于A
{
int i = 0;
while (i < 3)
{
sema.acquire();
cout << "A" << endl;
std::this_thread::sleep_for(std::chrono::microseconds(200));
i++;
semb.release();
}
}
void threadfun2()//打印B
{
int i = 0;
while (i < 3)
{
semb.acquire();
cout << "B" << endl;
i++;
std::this_thread::sleep_for(std::chrono::microseconds(200));
semc.release();
}
}
void threadfun3()//打印C
{
int i = 0;
while (i < 3)
{
semc.acquire();
cout << "C" << endl;
i++;
std::this_thread::sleep_for(std::chrono::microseconds(200));
sema.release();
}
}
int main()
{
thread th1(threadfun1);
thread th2(threadfun2);
thread th3(threadfun3);
th1.join();//等待th1线程运行完毕
th2.join();
th3.join();
return 0;
}
#include
#include
#include
using namespace std;
constexpr auto N = 5; //缓冲区大小;
binary_semaphore mutex(1);//互斥信号量
counting_semaphore full(0);//缓冲区计数信号量,计数缓冲区的商品数量
counting_semaphore Empty(N);//缓冲区计数信号量,计数缓冲区的空位数量
int num = 0;
void Produce()//大于A
{
while (num < 3)
{
cout << "生产者开始生产:" << num<<endl;
mutex.acquire();
Empty.acquire();
num++;
cout << "已经生产好了" << endl;
full.release();
mutex.release();
}
}
void Consumer()//打印B
{
while (num<3)
{
full.acquire();
mutex.acquire();
cout << "消费者消费商品: " << endl;
Empty.release();
mutex.release();
}
}
int main()
{
thread th1(Produce);
thread th2(Consumer);
th1.join();//等待th1线程运行完毕
th2.join();
return 0;
}
条件变量是允许多个线程相互交流的同步原语。它允许一定量的线程等待(可以定时)另一线程的提醒,然后再继续。条件变量始终关联到一个互斥。
函数 | 功能 |
---|---|
condition_variable(); | 构造函数,构造 std::condition_variable 类型对象。 |
condition_variable(const condition_variable&) = delete; | 复制构造函数被删除。 |
void notify_one() noexcept; | 通知一个等待的线程,若任何线程在 *this 上等待,则调用 notify_one 会解阻塞等待线程之一。 |
void notify_all() ; | 通知所有等待的线程,解阻塞全部当前等待于 *this 的线程。 |
void wait( std::unique_lockstd::mutex& lock ); | wait 导致当前线程阻塞直至条件变量被通知,或虚假唤醒发生,可选地循环直至满足某谓词。 |
template< class Rep, class Period >std::cv_status wait_for( std::unique_lockstd::mutex& lock, const std::chrono::duration | 阻塞当前线程,直到条件变量被唤醒,或到指定时限时长后;原子地释放 lock ,阻塞当前线程,并将它添加到等待在 *this 上的线程列表。 |
template< class Clock, class Duration >std::cv_status wait_until( std::unique_lockstd::mutex& lock, const std::chrono::time_point | 原子地释放 lock ,阻塞当前线程,并将它添加到等待在 *this 上的线程列表。 |
创建3个线程,循环打印ABC
#include
#include
#include
#include
using namespace std;
std::mutex mtx;
condition_variable cv;
int isReady = 1;//1-->A,2-->B,2-->C
void threadfun1()//打印A
{
unique_lock<mutex> lc(mtx);
int i = 0;
while (i < 3)
{
while (isReady!=1)
{
cv.wait(lc);//阻塞当前线程,直至条件变量被唤醒
}
cout << "A" << endl;
isReady = 2;
std::this_thread::sleep_for(std::chrono::microseconds(200));
cv.notify_all(); //通知所有等待的线程
i++;
}
}
void threadfun2()//打印B
{
unique_lock<mutex> lc(mtx);
int i = 0;
while (i < 3)
{
while (isReady != 2)
{
cv.wait(lc);//阻塞当前线程,直至条件变量被唤醒
}
cout << "B" << endl;
isReady = 3;
i++;
std::this_thread::sleep_for(std::chrono::microseconds(200));
cv.notify_all(); //通知所有等待的线程
}
}
void threadfun3()//打印C
{
unique_lock<mutex> lc(mtx);
int i = 0;
while (i < 3)
{
while (isReady != 3)
{
cv.wait(lc);//阻塞当前线程,直至条件变量被唤醒
}
cout << "C" << endl;
isReady = 1;
i++;
std::this_thread::sleep_for(std::chrono::microseconds(200));
cv.notify_all(); //通知所有等待的线程
}
}
int main()
{
thread th1(threadfun1);
thread th2(threadfun2);
thread th3(threadfun3);
th1.join();//等待th1线程运行完毕
th2.join();
th3.join();
return 0;
}
互斥量(互斥锁)包含于C11标准程序头文件中。
互斥量类 | 功能 |
---|---|
mutex | 该类表示普通的互斥锁, 不能递归使用。 |
recursive_mutex | 该类表示递归互斥锁。递归互斥锁可以被同一个线程多次加锁,以获得对互斥锁对象的多层所有权。 |
time_mutex | 该类表示定时互斥锁,不能递归使用 |
recursive_timed_mutex | 带定时的递归互斥锁。 |
std::mutex 是 C++11 中最基本的互斥量,用于保护共享数据免受多个线程同时访问的同步原语。,std::mutex对象提供了独占(排他性)所有权的特性,不支持递归地对 std::mutex 对象上锁。
std::mutex 不允许拷贝构造,也不允许移动拷贝
成员函数 | 功能 |
---|---|
mutex(); | 构造互斥。调用后互斥在未锁定(unlocked)状态。 |
~mutex(); | 销毁互斥。若互斥为任何线程占有,或若任何线程在保有任何互斥的所有权时终止,则行为未定义。 |
void lock(); | 锁定互斥。①如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock 之前,该线程一直拥有该锁。 |
②如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。③如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。 | |
bool try_lock(); | 尝试锁定互斥。立即返回;成功获得锁时返回 true ,否则返回 false 。① 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock 释放互斥量。② 如果当前互斥量被其他线程锁住,则当前调用线程返回 false,而并不会被阻塞掉。③如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。 |
void unlock(); | 解锁互斥。释放对互斥量的所有权。 |
#include
#include
#include
using namespace std;
std::mutex mtx;//程序执行时被创建,全局生存期开始
void threadfun1(int a)
{
mtx.lock();//拥有所有权
for (int i = 0; i < 5; i++)
{
cout << "threadfun1: " << a << endl;
}
mtx.unlock();//释放拥有权
}
int main()
{
thread th1(threadfun1,1);
thread th2(threadfun1,2);
th1.join();//等待th1线程运行完毕
th2.join();
return 0;
}
通常不直接使用 std::mutex 对象调用 lock 和 unlock 函数加锁和解释。而使用 lock_guard,scoped_lock,unique_lock 、shared_lock 类型管理器以更加安全的方式管理锁定。
#include
#include
#include
using namespace std;
std::mutex mtx;//程序执行时被创建,全局生存期开始
void threadfun1(int a)
{
for (int i = 0; i < 5; i++)
{
cout << "threadfun1: " << a << endl;
}
}
int main()
{
thread th1(threadfun1,1);
thread th2(threadfun1,2);
th1.join();//等待th1线程运行完毕
th2.join();
return 0;
}
成员函数 | 功能 |
---|---|
timed_mutex(); | 构造互斥。调用后互斥在未锁定(unlocked)状态。 |
~timed_mutex() | ;销毁互斥。若互斥为任何线程占有,或若任何线程在保有任何互斥的所有权时终止,则行为未定义。 |
void lock(); | 锁定互斥。①如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock 之前,该线程一直拥有该锁。 |
②如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。③如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。 | |
bool try_lock(); | 尝试锁定互斥。立即返回;成功获得锁时返回 true ,否则返回 false 。① 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock 释放互斥量。② 如果当前互斥量被其他线程锁住,则当前调用线程返回 false,而并不会被阻塞掉。③如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。 |
void unlock(); | 解锁互斥。释放对互斥量的所有权。 |
template< class Rep, class Period >bool try_lock_for( const std::chrono::duration | 尝试锁互斥。阻塞直到经过指定的 timeout_duration 或得到锁,取决于何者先到来。成功获得锁时返回 true , 否则返回 false 。 |
template< class Clock, class Duration >bool try_lock_until( const std::chrono::time_point | 尝试所互斥。阻塞直至抵达指定的 timeout_time 或得到锁,取决于何者先到来。成功获得锁时返回 true ,否则返回 false 。 |
该类表示递归(再入)互斥锁。递归互斥锁可以被同一个线程多次加锁,以获得对互斥锁对象的多层所有权。
例如,同一个线程多个函数访问临界区时都可以各自加锁,执行后各自解锁。
std::recursive_mutex 释放互斥量时需要调用与该锁层次深度相同次数的 unlock(),即 lock()次数和 unlock()次数相同。
可见,线程申请递归互斥锁时,如果该递归互斥锁已经被当前调用线程锁住,则不会产生死锁。
此外,std::recursive_mutex 的功能与 std::mutex 大致相同。
成员函数 | 功能 |
---|---|
recursive_mutex(); | 构造互斥。调用后互斥在未锁定(unlocked)状态。 |
~recursive_mutex(); | 销毁互斥。若互斥为任何线程占有,或若任何线程在保有任何互斥的所有权时终止,则行为未定义。 |
void lock(); | 锁定互斥。①如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock 之前,该线程一直拥有该锁。 |
②如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。③如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。 | |
bool try_lock(); | 尝试锁定互斥。立即返回;成功获得锁时返回 true ,否则返回 false 。① 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock 释放互斥量。② 如果当前互斥量被其他线程锁住,则当前调用线程返回 false,而并不会被阻塞掉。③如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。 |
void unlock(); | 解锁互斥。释放对互斥量的所有权。 |
成员函数 | 功能 |
---|---|
recursive_timed_mutex(); | 构造互斥。调用后互斥在未锁定(unlocked)状态。 |
~recursive_timed_mutex(); | 销毁互斥。若互斥为任何线程占有,或若任何线程在保有任何互斥的所有权时终止,则行为未定义。 |
void lock(); | 锁定互斥。①如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock 之前,该线程一直拥有该锁。 |
②如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。③如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。 | |
bool try_lock(); | 尝试锁定互斥。立即返回;成功获得锁时返回 true ,否则返回 false 。① 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock 释放互斥量。② 如果当前互斥量被其他线程锁住,则当前调用线程返回 false,而并不会被阻塞掉。③如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。 |
void unlock(); | 解锁互斥。释放对互斥量的所有权。 |
lock_guard 类不可复制(拷贝和赋值)。只有构造和析构函数。
类 lock_guard 是互斥体包装器,为在作用域块期间占有互斥提供便利 RAII 风格机制。创建 lock_guard 对象时,它试图接收给定互斥的所有权。控制离开创建 lock_guard 对象的作用域时,销毁 lock_guard 并释放互斥。
scoped_lock 类不可复制(拷贝和赋值)。只有构造和析构函数。
类 scoped_lock 是提供便利 RAII 风格机制的互斥包装器,它在作用域块的存在期间占有一或多个互斥。创建scoped_lock 对象时,它试图取得给定互斥的所有权。
控制离开创建 scoped_lock 对象的作用域时,析构 scoped_lock 并以逆序释放互斥。若给出数个互斥,则使用免死锁算法,如同以 std::lock 。
有锁定,修改和观察器函数。
类 unique_lock 是通用互斥包装器,允许延迟锁定、锁定的有时限尝试、递归锁定、所有权转移和条件变量一同使用。类 unique_lock 可移动,但不可复制——它满足可移动构造 (MoveConstructible) 和可移动赋值 (MoveAssignable) 但不满足可复制构造 (CopyConstructible) 或可复制赋值 (CopyAssignable) 。
有锁定,修改和观察器函数。
类 shared_lock 是通用共享互斥所有权包装器,允许延迟锁定、定时锁定和锁所有权的转移。锁定 shared_lock ,会以共享模式锁定关联的共享互斥( std::unique_lock 可用于以排他性模式锁定)。
shared_lock 类可移动,但不可复制——它满足可移动构造 (MoveConstructible) 与可移动赋值 (MoveAssignable) 的要求,但不满足可复制构造 (CopyConstructible) 或可复制赋值 (CopyAssignable) 。
共享所有权模式等待于共享互斥,可使用 std::condition_variable_any ( std::condition_variable 要求std::unique_lock 故而只能以唯一所有权模式等待)。
信号量 (semaphore) 是一种轻量的同步原件,用于制约对共享资源的并发访问。在可以使用两者时,信号量能比条件变量更有效率。
互斥(mutex)算法避免多个线程同时访问共享资源。这会避免数据竞争,并提供线程间的同步支持。
条件变量(condition_variable)是允许多个线程相互交流的同步原语。它允许一定量的线程等待(可以定时)另一线程的提醒,然后再继续。条件变量始终关联到一个互斥。
1: semaphore 对 acquire 和 release 操作没有限制,可以在不同线程操作;可以仅在线程 A 里面 acquire,仅在线程 B 里面 release。
mutex 的 lock 和 unlock 必须在同一个线程配对使用;也就是说线程 A 内 mutex 如果 lock了,必须在线程 A 内 unlock,线程 B 内 lock 了,也必须在线程 B 内 unlock。
2: semaphore 和 mutex 是可以独立使用的;condition_variable 必须和 mutex 配对使用。
3: semaphore 一般用于控制多个并发资源的访问或者控制并行数量;mutex 一般是起到同步访问一个资源的作用。同一时刻,mutex 保护的资源只能被一个线程访问;semaphore 的保护对象上面是可以有多个线程在访问的。mutex 是同步,semaphore 是并行。
4: 由于 condition_variable 和 mutex 结合使用,condition_variable 更多是为了通知、顺序之类的控制。
5: C++语言中的 mutex、semaphore、condition 和系统级的概念不同。都是线程级别的,也就是不能跨进程控制的。要区别于 windows api 的 mutex、semaphore、event。windows 系统上这几个 api 创建有名对象时,是进程级别的。