在上一篇文章当中我们写过这么一段代码:
pthread_mutex_lock(&_mutex);
//申请成功之后就判断一下当前容器中的数据是否是满的
while(is_full())
{
//如果是满的就挂起等待
pthread_cond_wait(&_pcond,&_mutex);
}
_q.push(in);
//如果走到了这里就说明当前的存储空间一定是有数据的
//这个时候就得将将消费者线程唤醒
pthread_cond_signal(&_ccond);
pthread_mutex_unlock(&_mutex);
这是生产者消费者模型的一段代码,该代码先枷锁访问临界资源然后判断一下当前的资源是否满足操作的需求,如果不满足就将其挂起,如果满足了就执行对应的操作,等操作完成之后就讲锁资源释放,虽然上面的代码在运行的时候没有任何的问题但是存在这么一个现象:一个线程在操作临界资源的时候,临界资源必须是要满足条件的,可是公共资源是否满足生产或者消费条件我们是没有办法事先得知的,就好比买彩票的时候不知道中获号码一样,所以我们只能先对共享资源枷锁,然后检测资源是否满足操作的条件,然后执行对应操作,最后将锁资源释放,即是我们不对资源进行操作只是单纯的检测的话也得这么做因为检测的本质就是在访问临界资源,并且只要我们对资源整体进行枷锁就默认了我们对这个资源整体进行使用,但是实际上存在这么一种情况:一份资源是公共但是它并不是整体被使用而是分成了多个区域并且允许不同的线程访问公共资源的不同区域,那么面对上面的这两种情况,就可以使用信号量来进行解决和实现。
在之前的学习中我们知道信号量的本质就是一把计数器,在当时的学习中我们通过电影院的例子带着大家理解信号,我们说并不是因为我们人坐在了电影院的座椅上才完成了电影票的购买,而是在网上直接使用一些程序来完成电影票的购买,购买成功之后电影院中就一定存在一个位置属于我们,所以在访问一份公共资源的时候我们就可以先申请信号量,信号量申请成功之后程序员就可以通过编码的方式来保证公共资源中一定存在一份资源属于当前的线程,也可以通过编码的方式来保证不同的线程可以并发的访问公共资源的不同区域。信号量是一个计数器,他是用来衡量临界资源中资源数目多少的计数器,电影院中有100个座位所以在售卖电影票的时候不可能出现卖出去101张102张电影票的情况,这样就保证了消费者只要买到了电影票,电影院中就一定有对应的位置提供给消费者,那么信号量也是同样的道理,只要拥有信号量那么在未来就一定能够拥有临界资源的一部分,所以申请信号量的本质就是对临界资源中特定小块资源的预定机制。因为信号量的本质是一个计数器,计数器的大小表示共享资源的多少,在访问资源之前得先申请信号量所以我们就可以通过信号量的大小来得知共享资源的使用情况(就是是否还有资源可供使用),而不用使用枷锁解锁的方式来对共享资源的使用情况进行检查,那么这就是信号量的概念以及意义。
线程要访问临界资源中的某一区域就得先申请信号量,而线程申请信号量之前得先看到信号量,而程序在运行的过程中可能会存在多个线程,所以信号量就可能会被多个线程同时访问,所以信号量本身就得是公共资源,既然是计数器那么就一定存在递增和递减的操作,当我们向公共资源中申请资源时资源的数目就会减少所以信号量的大小也要减少,所以对信号量递减就是申请资源,同理当我们将共享资源归还时信号量的大小就得递增,我们把申请资源称为p操作,归还资源称为v操作,所以对信号量的操作就是PV操作,而我们知道信号量的递增和递减都不是原子性的,在翻译的过程中对应着好几个汇编代码,又因为信号量会被多个线程访问所以信号量就必须得保证其操作的原子性,所以信号量得拥有自己的类型不能是一个随便的整形,那么信号量的类型就是sem_t在使用信号量之前得用该类型创建一个信号量对象,然后就使用sem_init函数来对其进行初始化:
函数的第一个参数表示对哪个信号量初始化将其信号量的地址传递过去即可,第二个参数表示该信号量是线程间共享还是进程间共享,非0表示进程间共享0就是线程间共享,第三个参数表示信号量一开始的初始值,如果我们想要信号量的初始值为10就将10传递给第三个参数。当信号量用完之后就可以对其进行销毁,那么这时就可以用到sem_destory函数
想要销毁哪个对象就将该对象的地址传递过去,上面两个函数负责信号量对象的初始化和销毁那么下面的两个函数就是用来对信号量的值+1或者-1,sem_wait函数就是对信号量-1
我们知道信号量是一个计数器,该计数器描述的是资源数量的多少,资源要么就是有要么就是没有不存在为负数的情况,所以计数器的值不可能为负数,所以当信号量的值为0时线程依然使用sem_wait对其减一的话该函数就会将该线程挂起等待,等到信号量的值大于0时再将其唤醒,最后来看看该函数的返回值:
如果返回成功就返回0,如果失败就返回其对应的错误码。然后就是sem_post函数该函数的作用就是对信号量的值+1,
同样的道理如果函数调用成功就返回0,如果调用失败就返回对应的错误码:
那么这就是信号量对应的一些操作,接下来我们就可以使用这些函数来实现环形队列。
虽然名字叫做环形队列,但是他本质上其实是一个数组,我们将这个数组想象成一个圆形的公共资源
这就是我们想象出来的环形队列,这个队列里面可以存放11个数据,但实际上这个队列实际上是用11个元素的数组模拟出来的:
然后我们就可以创建一个消费者线程和一个生产者线程分别从这个数组里面获取数据和存储数据:
蓝色箭头表示生产者线程,红色箭头表示消费者线程,蓝色的格子就表示已经生产好的数据,所以此时待消费的数据个数为4待生产的空位为7,每生产一个数据就将蓝色箭头指向的位置涂成蓝色然后往后挪动一位,比如说当前再生产了4个数据那么蓝色的箭头就应该指向8:
同样的道理每当消费者消费一个数据就将红色的箭头指向的位置涂成白色然后往后挪动一个位置,比如说消费者消费了6个线程那么图片就会变成下面这样:
当生产者继续往后生产时生产者的下表可能会超过数组的最大下标,那么这个时候就得让其回到数组的来头重新向后移动,比如说当前生产者的下表为8再生产5个数据就应该来到了13,但是数组的最大下表为10,所以我们就得将13模上数组的元素个数11也就是13%11=2,所以生产者线程的下标变成2,那么这里的图片就变成了下面这样:
对于消费者线程也是同样的道理,这也是为什么能将数组看成圆形的原因,那么这里有几个问题需要大家思考一下,上图中的消费者线程能一次性的读取9个数据吗?答案是肯定不行的因为当前队列存储的数据没有9个所以是不行的,反应到图片上呈现的现象就是不能出现红色的格子
那生产者能一直生产吗?答案肯定也是不行的,因为环形队列存储的数据是有限,如果一直生产的话就会导致之前生产的数据还没有被消费就已经被覆盖了,反应到图片上的现象就是蓝色的箭头最多只能超过红色箭头一整圈
那么总结一下上面的内容就是红色的箭头不能超过蓝色的箭头,蓝色的箭头最多只能超过红色箭头一整圈,当蓝色的箭头或者红色的箭头计算得到的下表超出了数组的最大下表就得将其模上数组的元素个数让其从数组的头部重新往后挪动,有了上面的理解我们就可以模拟实现一下环形队列。
首先我们要创建一个类来描述这个环形队列,并且因为类对象会存储各种各样的数据,所以该类是一个模版类,模版的参数只有一个,表述存储数据的类型:
template<class T>
class RingQueue
{
public:
private:
};
因为要用数组来表示环形队列所以类中得有一个vector对象,然后再添加一个整形变量来表示该数组最多存储的数据个数,又因为消费者和生产者会位于数组中的任意位置,所以还得添加两个整形变量用来表示消费者和生产者所处于的位置:
template<class T>
class RingQueue
{
public:
private:
vector<T> _queue;//环形队列
int _cap;//队列中的数据容量
int _prodstep;//生产者在队列的位置
int _consumerstep;//消费者在队列的位置
};
在上面的讲解中我们知道消费者线程所处于的位置不能超过生产者线程,生产者线程所在的位置最多只能超过消费者线程的一整圈,那么为了不破坏者两个规则就得再添加两个信号量分别表示两个线程所拥有的资源多少:
template<class T>
class RingQueue
{
public:
private:
vector<T> _queue;//环形队列
int _cap;//队列中的数据容量
int _prodstep;//生产者在队列的位置
int _consumerstep;//消费者在队列的位置
sem_t _spacesem;//空余位置的信号量
sem_t _datasem;//已有数据的信号量
};
那么这就是环形队列的准备工作。
构造函数只需要一个参数表示队列最多容纳的数据就行,为了方便以后对象的创建我们可以给这个参数一个缺省值:
RingQueue(const int cap=5)
{}
因为一开始消费者线程和生产者线程所处的位置都是数组的开头所以将_prodstep变量和_consumerstep变量初始化为0,然后将_cap变量的值初始化为cap,将数组的大小也初始化为cap,这里的几个步骤都可以在构造函数的初始化列表中完成:
RingQueue(const int cap)
:_cap(cap)
,_prodstep(0)
,_consumerstep(0)
,_queue(cap)
{}
在函数体里面就完成对两个信号量变量的初始化,初始化这两个对象得调用函数sem_init
对于生产者线程他看重的是当前队列还能再容纳多少个数据,对于消费者线程他看重的是当前队列已经拥有了多少个数据,而一开始当前队列没有任何数据,所以将消费者线程的信号量初始化为0,将生产者线程的信号量初始化为cap,那么构造函数完整的代码如下:
RingQueue(const int cap)
:_cap(cap)
,_prodstep(0)
,_consumerstep(0)
,_queue(cap)
{
sem_init(&_spacesem,0,_cap);
sem_init(&_datasem,0,0);
}
构造函数里面初始化了两个信号量,所以析构函数里面就只用销毁这两个信号量即可:
~RingQueue()
{
sem_destroy(&_spacesem);
sem_destroy(&_datasem);
}
将队列的构造函数和析构函数实现完了接下来就要实现这个队列对外提供的操作接口,既然是一块空间那么肯定就得提供往这块空间插入数据的接口,该函数就只有一个参数用来表示插入空间中的数据,因为该函数不会被修改所以其参数的类型是const引用:
void push(T& out)
{}
在往空间中插入元素之前,首先得判断一下是否还有多余的空间可供使用,在没学习信号量之前我们的做法是先枷锁,再判断公共资源的数据个数是否等于变量_cap,如果不等于就表明当前还有空间,如果等于_cap就表明当前没有空间最后再解锁,但是现在有信号量了就不需要枷锁解锁直接调用函数sem_wait函数将信号量_spacesem的值减一即可,如果当前容器有空间该函数就直接返回,如果没有空间了该函数就会将当前的线程挂起等待,所以这里存在对信号量加减的操作,那么这里为了以后的方便我们可以将对信号量的加减操作的封装成两个函数,这两个函数的名字就叫P和V,P表示将传过来的信号量减一V表示将传过来的信号量+1:
void P(sem_t& sem)
{
//对信号量减一
}
void V(sem_t&sem)
{
//对信号量加一
}
在函数里面调用对应的函数即可,然后创建一个变量接收函数的返回值并判断函数的调用是否成功,那么这里就可以使用assert函数:
void P(sem_t& sem)
{
int n=sem_wait(&sem);//对信号量减一
assert(n==0);
}
void V(sem_t&sem)
{
int n=sem_post(&sem);//对信号量加一
assert(n==0);
}
有了这两个函数之后就可以接着实现push函数,首先使用P函数对_spacesem信号量减一,只要能够继续执行就表明当前的容器一定有空间,所以这个时候就可以将数组中下表为_prodstep的元素赋值为out然后对变量_prodstep加加
void push(const T& out)
{
P(_spacesem);
_queue[_prodstep]=out;
_prodstep++;
}
因为_prodstep的值加一之后可能会超过数组最大的下表,所以我们得将_prodstep的值模上数组的元素个数,又因为数组中多了一个元素,而信号量_datasem表示空间中已有的数据个数,所以这个时候还得调用函数V对信号量_datasem的值加一,所以push函数的完整代码如下:
void push(const T& out)
{
P(_spacesem);
_queue[_prodstep]=out;
_prodstep++;
_prodstep%=_cap;
V(_datasem);
}
pop函数的返回值为void,但是它需要一个指针类型的参数用来作为输入型参数,pop函数也是相同的道理先对_datasem的值减一用来判断一下当前的空间中是否还有数据,只要该函数返回了就表明是还有数据的所以这个时候可以将_consumerstep指向的元素赋值给指针指向的对象,然后将_consumerstep的值加一并模上数组的元素个数即可:
void pop(T*in)
{
P(_datasem);
*in=_queue[_consumerstep];
_consumerstep++;
_consumerstep%=_cap;
}
因为消费者消费了一个数据,所以当前队列又多了一个地方可以存放数据,而信号量_spacesem表示当前还可以存放数据的个数,所以在函数的最后还得调用函数V对信号量_spacesem加一
void pop(T*in)
{
P(_datasem);
*in=_queue[_consumerstep];
_consumerstep++;
_consumerstep%=_cap;
V(_spacesem);
}
//RingQueue.hpp
#include
#include
#include
#include
#include
using namespace std;
template<class T>
class RingQueue
{
void P(sem_t& sem)
{
int n=sem_wait(&sem);//对信号量减一
assert(n==0);
}
void V(sem_t&sem)
{
int n=sem_post(&sem);//对信号量加一
assert(n==0);
}
public:
RingQueue(const int cap)
:_cap(cap)
,_prodstep(0)
,_consumerstep(0)
,_queue(cap)
{
sem_init(&_spacesem,0,_cap);
sem_init(&_datasem,0,0);
}
~RingQueue()
{
sem_destroy(&_spacesem);
sem_destroy(&_datasem);
}
void push(const T& out)
{
P(_spacesem);
_queue[_prodstep]=out;
_prodstep++;
_prodstep%=_cap;
V(_datasem);
}
void pop(T*in)
{
P(_datasem);
*in=_queue[_consumerstep];
_consumerstep++;
_consumerstep%=_cap;
V(_spacesem);
}
private:
vector<T> _queue;//环形队列
int _cap;//队列中的数据容量
sem_t _spacesem;//空余位置的信号量
sem_t _datasem;//已有数据的信号量
int _prodstep;//生产者在队列的位置
int _consumerstep;//消费者在队列的位置
pthread_mutex_t _pmutex;//生产者的锁
pthread_mutex_t _cmutex;//消费者的锁
};
在main.cc文件里面我们就完成对这个类的测试,测试的方法就是创建一个消费者线程和一个生产者线程,然后消费者线程往循环队列里面放随机数据,然后生产者线程从队列里面获取随机数据,那么在main函数里面首先使用srand函数设计一个时间节点,然后创建两个pthread_t对象和一个环环形队列的对象,该环形队列就先保存整形类型的数据:
int main()
{
srand((unsigned int)time(nullptr));
pthread_t p,c;
RingQueue<int>* rq=new RingQueue<int>(10);
return 0;
}
因为创建线程的时候得告诉他们要执行的函数,所以我们还得创建两个返回值和参数都为void*的函数:
void* product_func(void* args)
{}
void* consumer_func(void*args)
{}
这两个函数我们最后实现,有了这两个函数之后我们就可以使用pthread_create函数创建线程,因为在线程执行的函数里面要访问main中创建的环形队列,所以这里传递的参数就是环形队列的地址也就是rp,线程执行完之后就可以使用pthread_join函数回收线程,那么这里的代码如下:
int main()
{
srand((unsigned int)time(nullptr));
pthread_t p,c;
RingQueue<int>* rq=new RingQueue<int>(10);
pthread_create(&p,nullptr,product_func,(void*)rq);
pthread_create(&c,nullptr,consumer_func,(void*)rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
消费者函数和生产者函数干的第一件事就是对参数进行转换,转换成为RingQueue
类型,这样就可以正常的访问对象里面内容:
void* product_func(void* args)
{
RingQueue<int>* rq=static_cast<RingQueue<int>*>(args);
}
void* consumer_func(void*args)
{
RingQueue<int>* rq=static_cast<RingQueue<int>*>(args);
}
在生产者线程里面我们可以先创建一个while循环然后在循环里面使用rand函数获取一个随机数,然后就可以调用rq中的push函数将数据插入,并像屏幕上打印的一句话用来标记以传入数据:
void* product_func(void* args)
{
RingQueue<int>* rq=static_cast<RingQueue<int>*>(args);
while(true)
{
int data=rand()%10;
rq->push(data);
cout<<"发送了一个数据: "<< data <<endl;
}
return nullptr;
}
对于消费者线程也是同样的道理创建一个循环,在循环里面创建一个变量data用来接收数据,然后调用rq中的pop函数将data的地址传入,最后打印一句话用来表示已经收到了数据即可:
void* consumer_func(void*args)
{
RingQueue<int>* rq=static_cast<RingQueue<int>*>(args);
while(true)
{
int data=0;
rq->pop(&data);
cout<<"收到了一个数据: "<< data <<endl;
}
}
main.cc完整的代码如下:
#include
#include
#include"RingQueue.hpp"
void* product_func(void* args)
{
RingQueue<int>* rq=static_cast<RingQueue<int>*>(args);
while(true)
{
int data=rand()%10;
cout<<"发送了一个数据: "<< data <<endl;
rq->push(data);
}
return nullptr;
}
void* consumer_func(void*args)
{
RingQueue<int>* rq=static_cast<RingQueue<int>*>(args);
while(true)
{
int data=0;
sleep(1);
rq->pop(&data);
cout<<"收到了一个数据: "<< data <<endl;
}
}
int main()
{
srand((unsigned int)time(nullptr));
pthread_t p,c;
RingQueue<int>* rq=new RingQueue<int>(10);
pthread_create(&p,nullptr,product_func,(void*)rq);
pthread_create(&c,nullptr,consumer_func,(void*)rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
函数实现完成之后就可以运行一下进行测试,当前的函数都没有休眠所以运行起来没有什么明显的现象,所以我们先让生产者函数没生产一个数据就休息一秒这样我们看到的现象就是生产者生产一个数据,消费者就消费一个数据:
符合我们的预期,将sleep函数放到消费者线程里面就可以看到这里的结果是生产者一开始生产了一堆数据,然后消费者1秒钟消费一个数据,并且生产者也跟着1秒钟生产一个数据:
符合我们的预期。在之前的文章中我们实现过一个名为Task.hpp
的文件,这个文件里面有一个类用来存储一个用于计算的函数和数据,还有一个实现计算的函数:
//main.cc
#include
#include
#include
#include
using namespace std;
const string oper="+-*/%";
class CalTask
{
using func_t=function<int(int,int,char)>;
public:
CalTask()
{}
CalTask(int x,int y,char op,func_t func)
:_x(x)
,_y(y)
,_op(op)
,_callback(func)
{}
string operator()()
{
int result=_callback(_x,_y,_op);
char buffer[64];
snprintf(buffer,sizeof(buffer),"%d %c %d = %d",_x,_op,_y,result);
return buffer;
}
string toTaskString()
{
char buffer[64];
snprintf(buffer,sizeof(buffer),"%d %c %d = ?",_x,_op,_y);
return buffer;
}
private:
int _x;
int _y;
char _op;
func_t _callback;
};
int mymath(int x,int y,char oper)
{
int result=0;
switch(oper)
{
case '+':
result=x+y;
break;
case '-':
result=x-y;
break;
case '*':
result=x*y;
break;
case '/':
if(y==0)
{
cerr << "div zero error!" << endl;
result = -1;
break;
}
result=x/y;
break;
case '%':
if(y==0)
{
cerr<<"div zero error!"<<endl;
result=-1;
break;
}
result=x%y;
break;
default:
break;
}
return result;
}
有了这个文件之后就可以对两个线程函数进行改造,生产者往环形队列中存放CalTask对象,消费者获取CalTask对象并执行这个对象的操作符重载函数,那么改造之后的函数如下:
#include
#include
#include"RingQueue.hpp"
#include"Task.hpp"
void* product_func(void* args)
{
RingQueue<CalTask>* rq=static_cast<RingQueue<CalTask>*>(args);
while(true)
{
int x=rand()%10;
int y=rand()%10;
int z=rand()%oper.size();
CalTask data(x,y,oper[z],mymath);
cout<<"发送了一个数据: "<< data.toTaskString() <<endl;
rq->push(data);
sleep(1);
}
return nullptr;
}
void* consumer_func(void*args)
{
RingQueue<CalTask>* rq=static_cast<RingQueue<CalTask>*>(args);
while(true)
{
CalTask data;
rq->pop(&data);
cout<<"数据执行的结果:" << data() <<endl;
}
}
int main()
{
srand((unsigned int)time(nullptr));
pthread_t p,c;
RingQueue<CalTask>* rq=new RingQueue<CalTask>(10);
pthread_create(&p,nullptr,product_func,(void*)rq);
pthread_create(&c,nullptr,consumer_func,(void*)rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
函数执行的结果如下:
符合预期。
上面的代码实现了单生产单消费的环形队列,因为两个信号量的存在让这两个线程保持了互斥和同步的关系,这个关系符合3 2 1原则,而我们知道多生产和多消费之间除了消费者和生产者之间的关系,还有消费者与消费者之间的互斥关系和生产者与生产者之间的互斥关系,这两个关系在上面的代码中是没有维护的,所以要想实现上面的这两个关系就得在RingQueue类中添加两个两个锁对象一个给生产者一个给消费者:
template<class T>
class RingQueue
{
public:
private:
vector<T> _queue;//环形队列
int _cap;//队列中的数据容量
sem_t _spacesem;//空余位置的信号量
sem_t _datasem;//已有数据的信号量
int _prodstep;//生产者在队列的位置
int _consumerstep;//消费者在队列的位置
pthread_mutex_t _pmutex;//生产者的锁
pthread_mutex_t _cmutex;//消费者的锁
};
生产者的锁用于push函数,因为对信号量的操作是原子性的,所以可以在P(_spacesem);之后就可以对其进行枷锁,完成一系列的操作之后就可以在V(_datasem)之前进行解锁,对于pop函数也是同样的道理,pop函数使用的是消费者的锁:
void push(const T& out)
{
P(_spacesem);
pthread_mutex_lock(&_pmutex);
_queue[_prodstep]=out;
_prodstep++;
_prodstep%=_cap;
pthread_mutex_unlock(&_pmutex);
V(_datasem);
}
void pop(T*in)
{
P(_datasem);
pthread_mutex_lock(&_cmutex);
*in=_queue[_consumerstep];
_consumerstep++;
_consumerstep%=_cap;
pthread_mutex_unlock(&_cmutex);
V(_spacesem);
}
有了锁之后在main函数里面就可以使用for函数一次性创建多个生产者和消费者的线程
int main()
{
srand((unsigned int)time(nullptr));
pthread_t p[5],c[5];
RingQueue<CalTask>* rq=new RingQueue<CalTask>(10);
for(int i=0;i<5;i++)
{
pthread_create(&p[i],nullptr,product_func,(void*)rq);
pthread_create(&c[i],nullptr,consumer_func,(void*)rq);
}
for(int i=0;i<5;i++)
{
pthread_join(p[i],nullptr);
pthread_join(c[i],nullptr);
}
return 0;
}
那么代码运行的结果就如下:
没有任何的问题,那么这就是多生产多消费的模拟实现。