线程饥饿是指一个或多个线程无法获取到所需的资源或执行时间片,从而长时间处于阻塞或等待状态,无法继续执行的情况。
举一个简单的示例来理解线程饥饿问题:多个线程通过加锁的方式完成了线程互斥控制,但是由于第一个申请到锁的线程在释放锁后,立刻又申请锁,导致其他的线程一直无法申请到锁。
线程同步: 在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
以前面提到的线程饥饿问题中所举的例子理解线程同步:同样通过加锁的方式让多个线程互斥,第一个申请到锁的线程在释放锁后,按照顺序,下一个申请锁的是其他线程,从而完成对线程饥饿问题的解决。
条件变量是Linux操作系统原生线程库中提供的pthread_cond_t
数据类型,通过对条件变量的使用能够完成线程的同步控制。
条件变量内部维护着一个循环队列,将线程交给条件变量后,条件变量就可以通过自身的循环队列结构让线程按照顺序运行。
Linux操作系统下提供了pthread_cond_init
函数用于初始化条件变量。
//pthread_cond_init所在的头文件和函数声明
#include
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER
的方式进行初始化。Linux操作系统下提供了pthread_cond_destroy
函数用于销毁锁。
//pthread_mutex_destroy所在的头文件和函数声明
#include
int pthread_cond_destroy(pthread_cond_t *cond);
pthread_cond_destroy
函数用于局部条件变量的销毁。//pthread_cond_wait所在的头文件和函数声明
#include
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
pthread_cond_wait
函数用于让线程等待条件变量准备就绪,当线程开始等待条件变量后,只有条件变量准备就绪线程才能继续运行。pthread_cond_wait
函数首先会对传入的锁进行释放,以便于其他线程访问临界区,然后阻塞等待条件变量就绪,条件变量就绪后,会再次对传入的锁申请,保证线程安全。//pthread_cond_signal所在的头文件和函数声明
#include
int pthread_cond_signal(pthread_cond_t *cond);
pthread_cond_signal
函数用于唤醒等待条件变量的线程中处于第一个顺位。该函数就是告诉等待条件变量的线程条件变量已经准备就绪,线程被唤醒后就会继续运行。//pthread_cond_broadcast所在的头文件和函数声明
#include
int pthread_cond_broadcast(pthread_cond_t *cond);
pthread_cond_broadcast
函数用于唤醒等待条件变量的所有线程。该函数就是告诉所有等待条件变量的线程条件变量已经准备就绪,线程都被唤醒后会按照顺序继续运行。为了验证条件变量能让线程按顺序执行,编写代码验证,具体代码如下:
#include
#include
#include
#define NUM 5
using namespace std;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//锁的初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//条件变量的初始化
void *active(void *args)
{
const char* tname = static_cast<const char*>(args);
while(true)
{
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);//等待函数内会自动释放锁
cout << tname << " active" << endl;
pthread_mutex_unlock(&mutex);
}
return nullptr;
}
int main()
{
pthread_t tids[NUM];
for (int i = 0; i < NUM; i++)//线程创建
{
char* name = new char[64];
snprintf(name, 64, "thread-%d", i+1);
pthread_create(tids+i, nullptr, active, name);
}
sleep(2);
while(true)//唤醒线程
{
pthread_cond_signal(&cond);
sleep(1);
}
for (int i = 0; i < NUM; i++)//线程回收
{
pthread_join(tids[i], nullptr);
}
return 0;
}
编译代码运行并查看结果:
在上面的验证代码中,线程加锁进入临界区后首先会等待条件变量就绪,并且根据线程的调用顺序在该条件变量的循环队列中等待,条件变量会根据循环队列中的线程顺序进行唤醒,因此线程会按照一定的顺序进行运行。
为了理解生产者消费者模型,我们举一个生活中的例子,以超市作为载体形成的生产者消费者模型:
供货商作为生产者生产商品并派送到超市,顾客作为消费者从超市消费商品。供货商作为生产者大量生产商品交给超市销售,顾客作为消费者集中到超市购买商品,既提高了生产效率,也提高了消费效率。供货商作为生产者可以大量生产商品交给超市,顾客作为消费者只需要关心超市的商品,因此即使消费者暂时不消费,供货商也可以继续生产,即使供货商暂时不生产商品了,消费者也可以继续消费,这也允许了生产和消费的步调不一致,生产工作和消费消费能够忙闲不均,以至于生产和消费双方不耽误。
对应到线程概念中:
利用缓冲区作为交易场所,让生产者和消费者不必关心对方,因此生产者消费者模型优点:
解耦
支持并发
支持忙闲不均
生产者消费者模型也可以说是一种线程线程通信,它就类似于进程间通信所使用的管道。该模型中的缓冲区作为作为线程通信场所,要被生产者线程和消费者线程看到,因此该缓冲区是一种共享资源,作为共享资源就要对其进行保护,避免线程安全问题,要对生产者消费者模型进行保护。
生产者消费者模型特点总结:
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
编写代码体会基于BlockingQueue的生产者消费者模型和条件变量的运用:
blockqueue.hpp
:实现阻塞队列的文件
#include
#include
const int gcap = 5;
template <class T>
class blockqueue
{
private:
bool isFull() { return _q.size() == _cap; }
bool isEmpty() { return _q.empty(); }
public:
blockqueue(int cap = gcap) : _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_consumercond, nullptr);
pthread_cond_init(&_productorcond, nullptr);
}
void push(const T &in)
{
pthread_mutex_lock(&_mutex);
while (isFull()) // 队列已满,采用循环判断是为了避免多个生产者都被消费者唤醒后造成并发问题,比如队列只剩一个空间,但多个生产者线程被唤醒后进行数据操作
{
pthread_cond_wait(&_productorcond, &_mutex);
}
_q.push(in);
pthread_cond_signal(&_consumercond);
pthread_mutex_unlock(&_mutex);
}
void pop(T *out)
{
pthread_mutex_lock(&_mutex);
while (isEmpty())
{
pthread_cond_wait(&_consumercond, &_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_productorcond);
pthread_mutex_unlock(&_mutex);
}
~blockqueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumercond);
pthread_cond_destroy(&_productorcond);
}
private:
std::queue<T> _q;
int _cap; // 记录容量
pthread_mutex_t _mutex; // 控制线程互斥
pthread_cond_t _consumercond; // 控制消费者线程
pthread_cond_t _productorcond; // 控制生产者线程
};
Task.hpp
:实现生产者和消费者处理的数据的类。
#include
#include
class Task
{
public:
Task()
{}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitcode(0)
{}
void operator()()//对传入数据进行操作
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
if (_y == 0) _exitcode = -1;
else
_result = _x / _y;
break;
case '%':
if (_y == 0) _exitcode = -2;
else
_result = _x % _y;
break;
default:
break;
}
}
const std::string operatorArgs()//打印要处理的数据
{
return "(" + std::to_string(_x) + _op + std::to_string(_y) + ")" + "(" + std::to_string(_exitcode) + ")";
}
const std::string operatorRes()//打印数据处理结果
{
return std::to_string(_x) + _op + std::to_string(_y) + "=" + std::to_string(_result);
}
private:
int _x;//左操作数
int _y;//右操作数
char _op;//操作符
int _result;//算数结果
int _exitcode;//退出码
};
main.cc
:实现生产者消费者模型的文件。
#include "blockqueue.hpp"
#include "Task.hpp"
#include
#include
#include
void *consumer(void *args)//消费者方法
{
blockqueue<Task> *bq = static_cast<blockqueue<Task>*>(args);
while(true)
{
Task t;
bq->pop(&t);//从阻塞队列中取出数据
t();//对获取的数据进行处理
std::cout << "consumer: " << t.operatorRes() << std::endl;
}
}
void *productor(void* args)//生产者方法
{
blockqueue<Task> *bq = static_cast<blockqueue<Task>*>(args);
std::string ops = "+-*/%";
while(true)
{
int x = rand() % 20;//生成数据
int y = rand() % 10;
char op = ops[rand() % ops.size()];
Task t(x, y, op);
bq->push(t);//将数据交给阻塞队列
std::cout << "productor: " << t.operatorArgs() << "?" << std::endl;
}
}
int main()
{
srand((uint32_t)time(nullptr) ^ getpid());//设置随机数
blockqueue<Task>* q = new blockqueue<Task>;//创建阻塞队列
pthread_t c[2];
pthread_t p[3];
pthread_create(&c[0], nullptr, consumer, q);
pthread_create(&c[1], nullptr, consumer, q);
pthread_create(&p[0], nullptr, productor, q);
pthread_create(&p[1], nullptr, productor, q);
pthread_create(&p[2], nullptr, productor, q);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
return 0;
}
编译代码运行并查看结果:
上述代码的整体逻辑是生产者线程产生数据交给阻塞队列,消费者线程从阻塞队列中获取数据并且其进行处理,生产者产生算数表达式,消费者对表达式进行运算。其中所有线程因为只有一个队列共用一把锁,所有线程都是互斥的,通过条件变量的等待和唤醒完了生产者和消费者的同步。
说明: 生产者消费者模型提高效率的原理是生产者在获取数据时,消费者可以从缓冲区中获取数据,消费者在处理数据时,生产者可以将数据传入缓冲区;一个消费者处理数据,另外的消费者可以从缓冲区中获取数据,一个生产者获取数据,另外的消费者可以将数据传入缓冲区,所有线程都可以并发执行,不会是串行化的消费者等待数据来,生产者等待消费者处理完数据。
POSIX信号量用于线程间同步操作,达到无冲突的访问共享资源目的。
POSIX信号量是Linux操作系统中提供的sem_t
数据类型,通过对POSIX信号量的使用能够完成线程的同步控制。
信号量的本质就是一个计数器,记录临界资源的数目。 线程要访问临界资源需要进行P操作(申请资源),使用完了临界资源后要进行V操作(释放资源),信号量就是通过线程访问临界资源需要申请的操作来控制线程的同步的。PV操作是原子的。
信号量控制线程同步的机制: 将申请锁然后判断临界资源转换成了申请信号量。 信号量申请操作都在申请锁和临界资源操作前,当多个线程要访问到同一个资源时,只有申请到了信号量的资源可以进行申请锁和临界资源操作,其他线程只能等待信号量,保证了线程按照顺序执行,不会导致申请不到锁和申请到锁没有临界资源的饥饿问题。
//sem_init函数所在的头文件和函数声明
#include
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem_init
函数用于初始化信号量。//sem_wait函数所在的头文件和函数声明
#include
int sem_wait(sem_t *sem);
sem_wait
函数用于信号量的等待申请,也就是P操作。//sem_post函数所在的头文件和函数声明
#include
int sem_post(sem_t *sem);
sem_post
函数用于信号量的释放,也就是V操作。//sem_destroy函数所在的头文件和函数声明
#include
int sem_destroy(sem_t *sem);
sem_destroy
函数用于信号量销毁。利用信号量做同步控制,并以环形队列作为缓冲区形成的生产者消费者模型中有如下特点:
编写代码体会基于环形队列的生产消费模型和信号量的运用:
ringqueue.hpp:
实现循环队列文件:
#include
#include
#include
using namespace std;
const int N = 5;
template<class T>
class ringqueue
{
private:
void P(sem_t& sem) { sem_wait(&sem); }//P操作
void V(sem_t& sem) { sem_post(&sem); }//V操作
void Lock(pthread_mutex_t& mutex) { pthread_mutex_lock(&mutex); }//申请锁操作
void UnLock(pthread_mutex_t& mutex) { pthread_mutex_unlock(&mutex); }//释放锁操作
public:
ringqueue(int num = N):_cap(num), _ring(num), _c_step(0), _p_step(0)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, num);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
void push(const T& in)
{
P(_space_sem);//申请空间信号量
Lock(_p_mutex);//对生产者锁申请锁
_ring[_c_step++] = in;
_c_step %= _cap;
UnLock(_p_mutex);//对生产者锁释放锁
V(_data_sem);//释放数据信号量
}
void pop(T* out)
{
P(_data_sem);//申请数据信号量
Lock(_c_mutex);//对消费者锁申请锁
*out = _ring[_c_step++];
_c_step %= _cap;
UnLock(_c_mutex);//对消费者锁释放锁
V(_space_sem);//释放空间信号量
}
~ringqueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
vector<T> _ring;
int _cap;//队列容量
sem_t _data_sem;//数据信号量--只有消费者关心
sem_t _space_sem;//空间信号量--只有生产者关系
int _c_step;//消费者访问位置
int _p_step;//生产者访问位置
pthread_mutex_t _c_mutex;//消费者互斥控制锁
pthread_mutex_t _p_mutex;//生产者互斥控制锁
};
说明∶ 在生产者或消费者进入临界区前,必须先申请信号量,后申请锁,否则可能会造成死锁。比如缓冲区为满时,如果生产者先申请锁,后申请空间信号量,会进入阻塞状态,等待空间信号量,但是由于锁被申请走了,消费者无法释放空间信号量,造成死锁问题。
Task.hpp
:实现生产者和消费者处理的数据的类。
#include
#include
#include
class Task
{
public:
Task()
{}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitcode(0)
{}
void operator()()//对传入数据进行操作
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
if (_y == 0) _exitcode = -1;
else
_result = _x / _y;
break;
case '%':
if (_y == 0) _exitcode = -2;
else
_result = _x % _y;
break;
default:
break;
}
usleep(100000);
}
const std::string operatorArgs()//打印要处理的数据
{
return "(" + std::to_string(_x) + _op + std::to_string(_y) + ")" + "(" + std::to_string(_exitcode) + ")";
}
const std::string operatorRes()//打印数据处理结果
{
return std::to_string(_x) + _op + std::to_string(_y) + "=" + std::to_string(_result);
}
private:
int _x;//左操作数
int _y;//右操作数
char _op;//操作符
int _result;//算数结果
int _exitcode;//退出码
};
main.cc:
实现生产者消费者模型的文件。
#include "ringqueue.hpp"
#include "Task.hpp"
#include
#include
const char* ops = "+-*/%";
void *consumerRoutine(void *args)//消费者方法
{
ringqueue<Task>* rq = static_cast<ringqueue<Task>*>(args);
while(true)
{
Task t;
rq->pop(&t);
t();
cout << "consumer done, 处理完成的任务是: " << t.operatorRes() << endl;
}
}
void *productorRoutine(void *args)//生产者方法
{
ringqueue<Task>* rq = static_cast<ringqueue<Task>*>(args);
while(true)
{
int x = rand() % 100;
int y = rand() % 100;
char op = ops[(x + y) % strlen(ops)];
Task t(x, y, op);
rq->push(t);
cout << "productor done, 生产的任务是: " << t.operatorArgs() << endl;
}
}
int main()
{
ringqueue<Task>* rq= new ringqueue<Task>();
pthread_t c[3], p[2];
for (int i = 0; i < 3; i++)
pthread_create(c + i, nullptr, consumerRoutine, rq);
for (int i = 0; i < 2; i++)
pthread_create(p + i, nullptr, productorRoutine, rq);
for (int i = 0; i < 3; i++)
pthread_join(c[i], nullptr);
for (int i = 0; i < 2; i++)
pthread_join(p[i], nullptr);
return 0;
}
编译代码运行并查看结果:
利用信号量实现生产者消费者模型的意义: 多生产,多消费的时候,向队列放数据前并发构建Task,获取数据后多线程可以并发处理task,因为这些操作没有加锁!