概念:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
下面我们来解释一下什么叫做“饥饿问题”
还是举我们之前用过的例子。假如有一个 自习室,每次只允许一个人进去自习,自习室门外面的墙上挂着钥匙。ABCD几个人都想来上自习,A第一个来的,他拿着钥匙进入自习室并将门反锁了,后来的几个人BCD就只能在外面等着了。学了一会呢,A出来了,将钥匙挂在墙上了,可就在这时A突然又想继续学,他就又拿着钥匙进去学了,然后学了几分钟,A又坐不住了要出去,可是他一出去就又想再进入自习室。由于A离钥匙最近,别人争不过他,那么BCD就只能等,而A一直在重复这个过程,也没有好好地学习。这样就导致了BCD的“饥饿”。显然,这种做饭虽然保证了“互斥”,但是效率实在是低下。为了改善这种状况,我们修改规则,任何一个从自习室出来的人如果想要再进入 自习室,要重新排队,这样其他人就有效避免了“饥饿问题”。
条件变量,是用来描述某种临界资源是否就绪的一种数据化描述。通常,条件变量需要与mutex互斥锁一起使用
我们先来认识一组条件变量的接口
初始化
#include
//方式1
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
//cond:要初始化的条件变量 attr:通常设置为NULL
//方式2
pthread_cond_t cond = PTHREAD_COND_INITIALIZER
销毁
#include
int pthread_cond_destroy(pthread_cond_t *cond);
//cond:要销毁的条件变量
等待条件满足
#include
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
// cond 在cond条件下等待
//mutex:配合使用的互斥锁.通常我们在使用条件等待接口时,已经加锁了,如果等待,这个接口就可以自动解锁,让其他线程可以访问临界资源。如果被唤醒,这个接口又会重新进行加锁
为什么该接口需要互斥锁?
唤醒等待
#include
int pthread_cond_signal(pthread_cond_t *cond);
//cond:要唤醒的条件变量
接下来我们来一个测试用例,使用一下这些接口
#include
#include
using namespace std;
pthread_mutex_t lock;
pthread_cond_t cond;
void* Run(void* arg)
{
pthread_detach(pthread_self());
const char* msg = (char*)arg;
while(1)
{
pthread_cond_wait(&cond, &lock);
cout << msg << " run..." << endl;
}
return (void*)0;
}
int main()
{
//初始化锁与条件变量
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
//创建三个线程
pthread_t t1, t2, t3;
pthread_create(&t1, nullptr, Run, (void*)"thread 1");
pthread_create(&t2, nullptr, Run, (void*)"thread 2");
pthread_create(&t3, nullptr, Run, (void*)"thread 3");
//用主线程控制三个新线程,按一下回车就唤醒一个线程
while(1)
{
getchar();
pthread_cond_signal(&cond);
}
//最后销毁锁和条件变量
pthread_mutex_destroy(&lock); pthread_cond_destroy(&cond);
return 0;
}
我们首先来讲一个生活中常见的小例子。
那么为什么要使用生产者消费者模型呢?
下面我们来一段基于生产者消费者模型的代码,来帮助大家更好的理解
main.cc
#include "BlockQueue.hpp"
#include "task.hpp"
using namespace std;
void* Producter(void* arg)
{
auto bq = (BlockQueue<Task>*)arg;
const char* arr = "+-*/";
while(1)
{
//随机生产数字和操作符
int x = rand()%100+1;
int y = rand()%50;
char op = arr[rand()%4];
//生产任务并将任务放进阻塞队列
Task t(x, y, op);
bq->push(t);
cout << "product task done" << endl;
}
}
void* Consumer(void* arg)
{
auto bq = (BlockQueue<Task>*)arg;
while(1)
{
sleep(1);
Task t;
bq->pop(t);
t.Run();
}
}
int main()
{
srand((unsigned)time(nullptr));
BlockQueue<Task>* bq = new BlockQueue<Task>();
//创建两个线程,一个是生产者p一个是消费者c
pthread_t p, c;
pthread_create(&p, nullptr, Producter, (void*)bq);
pthread_create(&c, nullptr, Consumer, (void*)bq);
pthread_join(p, nullptr);
pthread_join(c, nullptr);
return 0;
}
BlcokQueue.hpp
#pragma once
#include
#include
#include
#include
#include
#include
template<typename T>
#define NUM 32 //默认阻塞队列的容量设置为32
class BlockQueue
{
private:
bool IsFull()
{
return _q.size() == _capacity;
}
bool IsEmpty()
{
return _q.empty();
}
public:
BlockQueue(int capacity = NUM)
:_capacity(capacity)
{
//在构造函数里先将锁与条件变量初始化
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&full, nullptr);
pthread_cond_init(&empty, nullptr);
}
~BlockQueue()
{
//在析构函数将锁与条件变量清除
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
}
//数据插入
void push(const T& in)
{
//插入之前先加锁
pthread_mutex_lock(&lock);
//先判断是不是满了
while(IsFull())//这里坚决不能使用if判断。原因有2。1:pthread_cond_wait调用失败,代码向下执行,本来就满了,还继续插入。2.如果是一个生产者三个消费者,生产者生产了一个数据就把三个等待的消费者唤醒了,可是数据只有一个啊,如果三个线 程都向下执行那就出错了,所以我们使用while,在进程被唤醒的时候多判断一次,而不是直接向下执行,这样避免了这种错误
{
//满了就等
pthread_cond_wait(&full, &lock);
}
_q.push(in);
if(_q.size() >= _capacity/2)
{
//阻塞队列里面数据数量大于一半,唤醒消费者来消费
std::cout << "数据很多了,快来消费吧" << std::endl;
pthread_cond_signal(&empty);
}
pthread_mutex_unlock(&lock);
}
//删除数据
void pop(T& out)
{
pthread_mutex_lock(&lock);
while(IsEmpty())
{
//如果是空的就等待
pthread_cond_wait(&empty, &lock);
}
out = _q.front();
_q.pop();
if(_q.size() < _capacity/10)
{//数据量小于十分之一,唤醒生产
std::cout << "数据很少了,快来生产把" << std::endl;
pthread_cond_signal(&full);
}
pthread_mutex_unlock(&lock);
}
private:
std::queue<T> _q;//用来保存临界资源
int _capacity;//容量
pthread_mutex_t lock;
pthread_cond_t empty;
pthread_cond_t full;
};
task.hpp
#pragma once
#include
class Task
{
public:
Task(int x, int y, char op)
:_x(x)
,_y(y)
,_op(op)
{}
Task()
{}
void Run()
{
int result = 0;
switch(_op)
{
case '+':
result = _x +_y;
break;
case '-':
result = _x - _y;
break;
case '*':
result = _x * _y;
break;
case '/':
if(_y == 0)
{
std::cout<< "除数为0" << std::endl;
result = -1;
break;
}
else
{
result = _x / _y;
break;
}
default:
break;
}
std::cout << _x << _op << _y << "=" << result << std::endl;
}
~Task()
{}
Task& operator=(const Task& t)
{
_x = t._x;
_y = t._y;
_op = t._op;
return *this;
}
private:
int _x;
int _y;
char _op;
};
运行结果
POSIX信号量可以用于线程间同步,达到无冲突的访问共享资源的目的
下面我们来认识一下POSIX信号量的几个接口
初始化信号量
#include
int sem_init(sem_t *sem, int pshared, unsigned int value);
//第一个参数sem:我们要初始化的信号量
//第二个参数pshared,如果为0,这个信号量被多线程共享,非0,被多个进程共享
//第三个参数,信号量的初始值
销毁信号量
#include
int sem_destroy(sem_t *sem);
等待信号量
#include
int sem_wait(sem_t *sem);
//等待信号量,会将信号量的值减1
发布信号量
#include
int sem_post(sem_t *sem);
//发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
信号量的本质就是一个计数器,是用于描述临界资源中资源数目个数的计数器
申请到信号量的本质,并不是你已经开始使用临界资源中你所申请的那个区域,而是有了使用特定资源的权限
申请信号量:count–,P操作
释放信号量:count++, V操作
信号量也是临界资源,所以信号量的申请和释放也必须是原子的。在信号量的个数为1时,基本等价于互斥锁
接下来,我们还是以抢票的例子,对以上几个接口进行一下简单的运用
#include
#include
#include
#include
#include
class Sem
{
public:
Sem(int num)
{
sem_init(&_sem, 0, num);
}
~Sem()
{
sem_destroy(&_sem);
}
void P()
{
sem_wait(&_sem);//count--
}
void V()
{
sem_post(&_sem);//count++
}
private:
sem_t _sem;
};
Sem sem(1);
int tickets = 2000;
void* GetTickets(void* arg)
{
std::string name = (char*)arg;
while(1)
{
sem.P();
if(tickets > 0)
{
usleep(10000);
tickets--;
std::cout << name << "抢到了一张票,剩余票数 " << tickets << std::endl;
sem.V();
}
else
{
sem.V();
break;
}
}
return (void*)0;
}
int main()
{
pthread_t tid1, tid2, tid3, tid4, tid5;
pthread_create(&tid1, nullptr, GetTickets, (void*)"thread 1");
pthread_create(&tid2, nullptr, GetTickets, (void*)"thread 2");
pthread_create(&tid3, nullptr, GetTickets, (void*)"thread 3");
pthread_create(&tid4, nullptr, GetTickets, (void*)"thread 4");
pthread_create(&tid5, nullptr, GetTickets, (void*)"thread 5");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
pthread_join(tid5, nullptr);
sem.~Sem();
return 0;
}
下面我们写一段环形队列的代码
Ring.hpp
#pragma once
#include
#include
#include
#include
#define NUM 5
template<typename T>
class RingQueue
{
private:
void P(sem_t& s)
{
sem_wait(&s);
}
void V(sem_t& s)
{
sem_post(&s);
}
public:
RingQueue(int cap = NUM)
:_cap(cap)
,c_pos(0)
,p_pos(0)
{
_q.resize(_cap);
sem_init(&blank_sem, 0, _cap);
sem_init(&data_sem, 0, 0);
}
~RingQueue()
{
sem_destroy(&blank_sem);
sem_destroy(&data_sem);
}
//生产数据,由生产者调用
void Push(const T& in)
{
//插入数据,格子减一,数据加一
P(blank_sem);
_q[p_pos] = in;
V(data_sem);
p_pos++;
p_pos %= _cap;
}
//消费数据,由消费者调用
void Pop(T& out)
{
//删除数据,数据减一,格子加一
P(data_sem);
out = _q[c_pos];
V(blank_sem);
c_pos++;
c_pos %= _cap;
}
private:
std::vector<T> _q;
int _cap;//环形队列的空间大小
int c_pos;//消费者指向的位置
int p_pos;//生产者指向的位置
sem_t blank_sem;
sem_t data_sem;
};
main.cc
#include "Ring.hpp"
#include
void* Product(void* arg)
{
RingQueue<int>* rq = (RingQueue<int>*)arg;
while(1)
{
int in = rand() % 100 + 1;
rq->Push(in);
std::cout<< "product done!" << in << std::endl;
}
}
void* Consume(void* arg)
{
RingQueue<int>* rq = (RingQueue<int>*)arg;
int out;
while(1)
{
rq->Pop(out);
std::cout<< "consume done!" << out << std::endl;
}
}
int main()
{
srand((unsigned long)time(nullptr));
RingQueue<int>* rq = new RingQueue<int>();
pthread_t p, c;
pthread_create(&p, nullptr, Product, (void*)rq);
pthread_create(&c, nullptr, Consume, (void*)rq);
pthread_join(p, nullptr);
pthread_join(c, nullptr);
return 0;
}
线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
下面我们来一段测试代码。设计一个线程池。
PthreadPool.hpp
#include
#include
#include
#include "Task.hpp"
#define NUM 5
template<typename T>
class PthreadPool
{
public:
PthreadPool(int num = NUM)
:thread_num(num)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
}
void Push(const T& in)
{
//插入任务先锁上
pthread_mutex_lock(&lock);
task_queue.push(in);
pthread_mutex_unlock(&lock);
WakeUp();
}
void Pop(T& out)
{
out = task_queue.front();
task_queue.pop();
}
bool IsEmpty()
{
return task_queue.size() == 0 ? true : false;
}
void Wait()
{
pthread_cond_wait(&cond, &lock);
}
void WakeUp()
{
pthread_cond_signal(&cond);
}
static void* Rountie(void* arg)
{
pthread_detach(pthread_self());
PthreadPool* self = (PthreadPool*)arg;
while(1)
{
//到任务队列取任务,先加锁,在判断任务队列是否为空
pthread_mutex_lock(&self->lock);
if(self->IsEmpty())
{
self->Wait();
}
//有任务
T t;
self->Pop(t);
//处理任务
t.Run();
pthread_mutex_unlock(&self->lock);
}
}
void InitThreadPool()
{
pthread_t tid;
for(int i = 0; i < thread_num; i++)
{
pthread_create(&tid, nullptr, Rountie, this);
}
}
~PthreadPool()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
int thread_num;
std::queue<T> task_queue;
pthread_mutex_t lock;
pthread_cond_t cond;
};
Task.hpp
#pragma once
#include
#include
class Task
{
public:
Task(int x, int y, char op)
:_x(x)
,_y(y)
,_op(op)
{}
Task()
{}
void Run()
{
int ret = 0;
switch(_op)
{
case '+':
ret = _x + _y;
break;
case '-':
ret = _x - _y;
break;
case '*':
ret = _x * _y;
break;
case '/':
if(_y == 0)
{
std::cout << "div zero!" << std::endl;
break;
}
ret = _x / _y;
break;
case '%':
if(_y == 0)
{
std::cout << "mod zero!" << std::endl;
break;
}
ret = _x % _y;
break;
default:
break;
}
std::cout << "thread:" << pthread_self() << " " << _x << _op << _y << "=" << ret << std::endl;
}
~Task()
{}
private:
int _x;
int _y;
char _op;
};
main.cc
#include "Task.hpp"
#include "PthreadPool.hpp"
#include
#include
int main()
{
PthreadPool<Task>* tp = new PthreadPool<Task>();
tp->InitThreadPool();
srand((unsigned long)time(nullptr));
const char* op = "+-*/%";
while(1)
{
int x = rand()%100 + 1;
int y = rand()%100 + 1;
int i = rand()%5;
Task t(x, y, op[i]);
tp->Push(t);
// sleep(1);
}
return 0;
}
default:
break;
}
std::cout << "thread:" << pthread_self() << " " << _x << _op << _y << "=" << ret << std::endl;
}
~Task()
{}
private:
int _x;
int _y;
char _op;
};
main.cc
```cpp
#include "Task.hpp"
#include "PthreadPool.hpp"
#include
#include
int main()
{
PthreadPool* tp = new PthreadPool();
tp->InitThreadPool();
srand((unsigned long)time(nullptr));
const char* op = "+-*/%";
while(1)
{
int x = rand()%100 + 1;
int y = rand()%100 + 1;
int i = rand()%5;
Task t(x, y, op[i]);
tp->Push(t);
// sleep(1);
}
return 0;
}