临界资源和临界区
进程之间如果要进行通信我们需要先创建第三方资源,让不同的进程看到同一份资源,由于这份第三方资源可以由操作系统中的不同模块提供,于是进程间通信的方式有很多种。进程间通信中的第三方资源就叫做临界资源,访问第三方资源的代码就叫做临界区。
而多线程的大部分资源都是共享的,线程之间进行通信不需要费那么大的劲去创建第三方资源。
例如,我们在全局区定义一个count变量,让新线程每隔一秒对该变量加一操作,让主线程每隔一秒获取count变量的值进行打印:
#include
#include
#include
#include
using namespace std;
int count = 0;
void* Routine(void* args)
{
while(true)
{
count++;
sleep(1);
}
pthread_exit((void*)0);
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, Routine, nullptr);
while(true)
{
cout << "count: " <<count <<endl;
sleep(1);
}
pthread_join(tid, nullptr);
return 0;
}
运行代码:
此时我们相当于实现了主线程和新线程之间的通信,其中全局变量count就叫做临界资源,因为它被多个执行流共享,而主线程中的printf和新线程中count++就叫做临界区,因为这些代码对临界资源进行了访问。
互斥和原子性
在多线程情况下,如果这多个执行流都自顾自的对临界资源进行操作,那么此时就可能导致数据不一致的问题。解决该问题的方案就叫做互斥,互斥的作用就是,保证在任何时候有且只有一个执行流进入临界区对临界资源进行访问。
原子性指的是不可被分割的操作,该操作不会被任何调度机制打断,该操作只有两态,要么完成,要么未完成。
下面我们模拟实现一个抢票系统,我们将记录票的剩余张数的变量定义为全局变量,主线程创建三个新线程,让这三个新线程进行抢票,当票被抢完后这四个线程自动退出:
#include
#include
#include
#include
using namespace std;
int tickets = 10000;
void* getTickets(void* args)
{
(void*) args;
while(true)
{
if(tickets > 0)
{
usleep(1000);
printf("%p: %d\n", pthread_self(), tickets);
tickets--;
}
else
{
break;
}
}
pthread_exit((void*)0);
}
int main()
{
pthread_t t1, t2, t3;
pthread_create(&t1, nullptr, getTickets, nullptr);
pthread_create(&t2, nullptr, getTickets, nullptr);
pthread_create(&t3, nullptr, getTickets, nullptr);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
return 0;
}
运行代码,我们会发现,出现了票数为负的情况,显然这是不正常的:
该代码中记录剩余票数的变量tickets就是临界资源,因为它被多个执行流同时访问,而判断tickets是否大于0、打印剩余票数以及tickets--
这些代码就是临界区,这些代码对临界资源进行了访问。
剩余票数出现负数的原因:
ticket--
操作本身就不是一个原子操作。为什么
ticket--
不是原子操作?
我们对一个变量进行--
,我们实际需要进行以下三个步骤:
load
:将共享变量tickets从内存加载到寄存器中。update
:更新寄存器里面的值,执行-1操作。store
:将新值从寄存器写回共享变量tickets的内存地址。
操作对应的汇编代码如下:
当thread 1 刚把tickets的值读进CPU时就可能已经被切走了,从CPU剥离下来,并没有进行--
操作,假如我们此时thread 1的信息就是10000,当他被切换走以后,他的上下文信息就会被保存下来,此时thread 1就处于挂起状态。
此时,thread 2正好被调度了,而我们的thread 1 刚把tickets的值读进CPU时就被切换走了,还没有进行--
操作,此时thread 2也是从10000开始进行--
操作,假设OS分配给thread 2的时间比较长,此时thread 2进程1000次--
操作并且被切回内存。
此时,thread 2切换走,thread 1恢复回来,操作系统会让thread 1上下文数据恢复,继续执行刚刚的操作,此时我们的tickets就又变为了10000,进行一次--
操作以后变为9999,此时在切回内存。
在上述过程中,thread1抢了1张票,thread2抢了1000张票,而此时剩余的票数却是9999,也就相当于多出了1000张票。
因此对一个变量进行--
操作并不是原子的,虽然–tickets看起来就是一行代码,但这行代码被编译器编译后本质上是三行汇编,相反,对一个变量进行++
也需要对应的三个步骤,即++
操作也不是原子操作。
要解决上述抢票系统的问题,需要做到三点:
要做到这三点,本质上就是需要一把锁,Linux上提供的这把锁叫互斥量。
初始化互斥量
初始化互斥量有两种方法:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t
*restrict attr);
参数:
mutex
:要初始化的互斥量attr
:NULL返回值说明:
销毁互斥量
销毁互斥量的函数叫做pthread_mutex_destroy,该函数的函数原型如下:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
参数说明:
mutex
:需要销毁的互斥量。返回值说明:
销毁互斥量需要注意:
互斥量加锁
互斥量加锁的函数叫做pthread_mutex_lock,该函数的函数原型如下:
int pthread_mutex_lock(pthread_mutex_t *mutex);
参数说明:
mutex
:需要加锁的互斥量。返回值说明:
调用pthread_mutex_lock时,可能会遇到以下情况:
互斥量解锁
互斥量解锁的函数叫做pthread_mutex_unlock,该函数的函数原型如下:
int pthread_mutex_unlock(pthread_mutex_t *mutex);
参数说明:
mutex
:需要解锁的互斥量。返回值说明:
我们在上述的抢票系统中引入互斥量,每一个线程要进入临界区之前都必须先申请锁,只有申请到锁的线程才可以进入临界区对临界资源进行访问,并且当线程出临界区的时候需要释放锁,这样才能让其余要进入临界区的线程继续竞争锁。
#include
#include
#include
#include
#include
#include
using namespace std;
#define THREAD_NUM 1000
int tickets = 10000;
class ThreadData
{
public:
ThreadData(const string& name, pthread_mutex_t* mtx)
:tname(name)
,pmtx(mtx)
{}
public:
string tname;
pthread_mutex_t* pmtx;
};
void *getTickets(void *args)
{
ThreadData* td = (ThreadData*)args;
while (true)
{
int n = pthread_mutex_lock(td->pmtx);
assert(n==0);
if (tickets > 0)
{
usleep(1000);
printf("%s: %d\n", td->tname.c_str(), tickets);
tickets--;
n = pthread_mutex_unlock(td->pmtx);
assert(n==0);
}
else
{
n = pthread_mutex_unlock(td->pmtx);
assert(n==0);
break;
}
}
delete td;
pthread_exit((void *)0);
}
int main()
{
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, nullptr);
pthread_t t[THREAD_NUM];
for(int i = 0; i < THREAD_NUM; i++)
{
string name = "thread";
name += to_string(i+1);
ThreadData* td = new ThreadData(name, &mutex);
pthread_create(t+i, nullptr, getTickets, (void*)td);
}
for(int i = 0; i < THREAD_NUM; i++)
{
pthread_join(t[i], nullptr);
}
pthread_mutex_destroy(&mutex);
return 0;
}
运行代码,此时在抢票过程中就不会出现票数剩余为负数的情况了。
加锁后的原子性体现在哪里?
引入互斥量后,当一个线程申请到锁进入临界区时,在其他线程看来该线程只有两种状态,要么没有申请锁,要么锁已经释放了,因为只有这两种状态对其他线程才是有意义的。
例如,图中线程1进入临界区后,在线程2、3、4看来,线程1要么没有申请锁,要么线程1已经将锁释放了,因为只有这两种状态对线程2、3、4才是有意义的,当线程2、3、4检测到其他状态时也就被阻塞了。
此时对于线程2、3、4而言,它们就认为线程1的整个操作过程是原子的。
临界区内的线程可能进行线程切换吗?
临界区内的线程完全可能进行线程切换,但即便该线程被切走,其他线程也无法进入临界区进行资源访问,因为此时该线程是拿着锁被切走的,锁没有被释放也就意味着其他线程无法申请到锁,也就无法进入临界区进行资源访问了。
其他想进入该临界区进行资源访问的线程,必须等该线程执行完临界区的代码并释放锁之后,才能申请锁,申请到锁之后才能进入临界区。
锁是否需要被保护?
我们说被多个执行流共享的资源叫做临界资源,访问临界资源的代码叫做临界区。所有的线程在进入临界区之前都必须竞争式的申请锁,因此锁也是被多个执行流共享的资源,也就是说锁本身就是临界资源。
既然锁是临界资源,那么锁就必须被保护起来,但锁本身就是用来保护临界资源的,那锁又由谁来保护的呢?
锁实际上是自己保护自己的,我们只需要保证申请锁的过程是原子的,那么锁就是安全的。
如何保证申请锁的过程是原子的?
--
和++
操作不是原子操作,可能会导致数据不一致问题。下面我们来看看lock和unlock的伪代码:
我们可以认为mutex的初始值为1,al是计算机中的一个寄存器,当线程申请锁时,需要执行以下步骤:
接下来我们来详细解释一下:
我们就会发现,线程A进入判断步骤以后,al寄存器中的值此时不大于0,就会挂起等待:
注意:
线程安全: 多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现线程安全问题。
重入: 同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则是不可重入函数。
注意: 线程安全讨论的是线程执行代码时是否安全,重入讨论的是函数被重入进入。
如下图所示,线程A申请申请了锁1,并不会进行释放,线程B申请了锁2,也不会进行释放,此时线程A需要申请锁2,线程B需要申请锁1,显然是不可能成功的,这就叫做死锁。
单执行流可能产生死锁吗?
单执行流也有可能产生死锁,如果某一执行流连续申请了两次锁,那么此时该执行流就会被挂起。因为该执行流第一次申请锁的时候是申请成功的,但第二次申请锁时因为该锁已经被申请过了,于是申请失败导致被挂起直到该锁被释放时才会被唤醒,但是这个锁本来就在自己手上,自己现在处于被挂起的状态根本没有机会释放锁,所以该执行流将永远不会被唤醒,此时该执行流也就处于一种死锁的状态。
我们看下面这段代码:
#include
#include
#include
using namespace std;
pthread_mutex_t mutex;
void *Routine(void *args)
{
pthread_mutex_lock(&mutex);
while(true)
{
pthread_mutex_lock(&mutex);
cout << "new thread runing....." << endl;
}
pthread_exit((void*)0);
}
int main()
{
pthread_t tid;
pthread_mutex_init(&mutex, nullptr);
pthread_create(&tid, nullptr, Routine, nullptr);
pthread_join(tid, nullptr);
pthread_mutex_destroy(&mutex);
return 0;
}
运行代码,此时该程序实际就处于一种被挂起的状态:
用ps命令查看该进程时可以看到,该进程当前的状态是Sl+,其中的l实际上就是lock的意思,表示该进程当前处于一种死锁的状态。
什么叫做阻塞?
进程运行时是被CPU调度的,换句话说进程在调度时是需要用到CPU资源的,每个CPU都有一个运行等待队列(runqueue),CPU在运行时就是从该队列中获取进程进行调度的。
在运行等待队列中的进程本质上就是在等待CPU资源,实际上不止是等待CPU资源如此,等待其他资源也是如此,比如锁的资源、磁盘的资源、网卡的资源等等,它们都有各自对应的资源等待队列。
例如,当某一个进程在被CPU调度时,该进程需要用到锁的资源,但是此时锁的资源正在被其他进程使用:
总结下来就是:
注意: 这是死锁的四个必要条件,也就是说只有同时满足了这四个条件才可能产生死锁。
除此之外,还有一些避免死锁的算法,比如死锁检测算法和银行家算法。
首先,我们得理解,单纯的加锁并不存在什么什么问题,但是由于各个线程之间的竞争能力有所差异,可能就会存在某一个线程频繁地申请锁和释放锁,假设此时我们有一个线程A和B,线程A需要写入数据,线程B需要读取数据,此时线程A的能力强一点,他就会一直申请锁和释放锁。直到临界区数据被写满,此时线程B一直就处于阻塞状态,所以,单纯的加锁不存在什么问题,但是对于效率来说又比较影响。
那么如何解决这个问题呢?
我们增加一个规律,当一个线程释放锁后,这个线程不能立马再次申请锁,该线程必须排到这个锁的资源等待队列的最后;增加这个规则之后,下一个获取到锁的资源的线程就一定是在资源等待队列首部的线程,如果有十个线程,此时我们就能够让这十个线程按照某种次序进行临界资源的访问,这就叫做同步,引入同步后该问题就能很好的解决。
条件变量是利用线程间共享的全局变量进行同步的一种机制,条件变量是用来描述某种资源是否就绪的一种数据化描述。
条件变量主要包括两个动作:
条件变量通常需要配合互斥锁一起使用。
初始化
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
参数说明:
返回值说明:
调用pthread_cond_init函数初始化条件变量叫做动态分配,除此之外,我们还可以用下面这种方式初始化条件变量,该方式叫做静态分配:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
销毁
销毁条件变量的函数叫做pthread_cond_destroy,该函数的函数原型如下:
int pthread_cond_destroy(pthread_cond_t *cond);
参数说明:
返回值说明:
销毁条件变量需要注意:
使用PTHREAD_COND_INITIALIZER
初始化的条件变量不需要销毁。
等待条件满足
等待条件变量满足的函数叫做pthread_cond_wait,该函数的函数原型如下:
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
参数说明:
返回值说明:
唤醒等待
唤醒等待的函数有以下两个:
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
区别:
参数说明:
返回值说明:
下面我们用主线程创建四个新线程,让主线程控制这四个新线程活动。这四个新线程创建后都在条件变量下进行等待,直到主线程检测到键盘有输入时才唤醒一个等待线程,如此进行下去。
#include
#include
#include
#define TNUM 4
typedef void (*func_t)(const std::string &name, pthread_mutex_t *pmutex, pthread_cond_t *pcond);
volatile bool quit = false;
class ThreadData
{
public:
ThreadData(const std::string &name, func_t funcs, pthread_mutex_t *pmutex, pthread_cond_t *pcond)
: _name(name), _funcs(funcs), _mutex(pmutex), _cond(pcond)
{
}
public:
std::string _name;
func_t _funcs;
pthread_mutex_t *_mutex;
pthread_cond_t *_cond;
};
void func1(const std::string &name, pthread_mutex_t *pmutex, pthread_cond_t *pcond)
{
while (!quit)
{
//一定要在加锁和解锁之间进行wait
pthread_mutex_lock(pmutex);
// if(临界资源是否就绪-- 否)
pthread_cond_wait(pcond, pmutex);//默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
std::cout << name << " runing-- 下载" << std::endl;
pthread_mutex_unlock(pmutex);
}
}
void func2(const std::string &name, pthread_mutex_t *pmutex, pthread_cond_t *pcond)
{
while (!quit)
{
pthread_mutex_lock(pmutex);
pthread_cond_wait(pcond, pmutex);//默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
std::cout << name << " runing-- 播放" << std::endl;
pthread_mutex_unlock(pmutex);
}
}
void func3(const std::string &name, pthread_mutex_t *pmutex, pthread_cond_t *pcond)
{
while (!quit)
{
pthread_mutex_lock(pmutex);
pthread_cond_wait(pcond, pmutex);//默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
std::cout << name << " runing-- 刷新" << std::endl;
pthread_mutex_unlock(pmutex);
}
}
void func4(const std::string &name, pthread_mutex_t *pmutex, pthread_cond_t *pcond)
{
while (!quit)
{
pthread_mutex_lock(pmutex);
pthread_cond_wait(pcond, pmutex);//默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
std::cout << name << " runing-- 扫描用户信息" << std::endl;
pthread_mutex_unlock(pmutex);
}
}
void *Entry(void *args)
{
ThreadData *td = (ThreadData *)args;
td->_funcs(td->_name, td->_mutex, td->_cond);
delete td;
pthread_exit((void*)0);
}
int main()
{
// 创建锁
pthread_mutex_t mutex;
pthread_cond_t cond;
// 初始化锁
pthread_mutex_init(&mutex, nullptr);
pthread_cond_init(&cond, nullptr);
// 创建线程
pthread_t tid[TNUM];
func_t funcs[TNUM] = {func1, func2, func3, func4};
for (int i = 0; i < TNUM; i++)
{
std::string name = "thread";
name += std::to_string(i + 1);
ThreadData *td = new ThreadData(name, funcs[i], &mutex, &cond);
pthread_create(tid + i, nullptr, Entry, (void *)td);
}
int cnt = 10;
while(cnt)
{
std::cout << "resume thread run code ...." << cnt-- << std::endl;
pthread_cond_signal(&cond);
sleep(1);
}
std::cout << "ctrl done" << std::endl;
quit = true;
pthread_cond_broadcast(&cond);
//主线程进程等待
for(int i = 0; i < TNUM; i++)
{
pthread_join(tid[i], nullptr);
std::cout << "thread[" << tid[i] << "] quit!!!" << std::endl;
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
return 0;
}
此时我们会发现唤醒这4个线程时具有明显的顺序性,根本原因是当这若干个线程启动时默认都会在该条件变量下去等待,而我们每次都唤醒的是在当前条件变量下等待的头部线程,当该线程执行完打印操作后会继续排到等待队列的尾部进行wait,所以我们能够看到一个周转的现象。
如果我们想每次唤醒都将在该条件变量下等待的所有线程进行唤醒,可以将代码中的pthread_cond_signal函数改为pthread_cond_broadcast函数。
此时我们每一次唤醒都会将所有在该条件变量下等待的线程进行唤醒,也就是每次都将这4个线程唤醒。