以往创建进程会创建 task_struct,mm_struct,页表等,但如果创建 “进程” 只能创建 task_struct,不能创建 mm_struct,页表等,且创建出来的 task_struct 和父进程共享进程地址空间,页表等,这些"进程"被称作线程
在Linux中,站在CPU的角度,不能识别 task_struct 是进程还是线程,因为CPU只关心一个一个的独立执行流
进程 : 站在内核角度,承担分配系统资源的基本实体
我们之前所认识的进程都是只有一个 task_struct,也就是说进程只有一个执行流
Linux下,并不存在真正的多线程,如果支持真的线程,当线程足够多的时候,OS一定是要管理线程的,这样会提高设计OS的复杂程度,所以线程直接复用了进程的 task_struct ,即线程是用进程模拟出来的,Linux下的所有执行流,都叫做轻量级进程
Linux并没有真正意义上的线程,所以Linux也没有线程相关的系统调用,但会提供创建线程的接口(vfork() : 创建进程,父子共享空间),在用户角度,可以使用原生线程库来解决
分页存储 :
将内存空间分为一个个大小相等的分区(例如每个分区4KB),每个分区为一个页框/页帧/内存块/物理块/物理页面,每个页框有一个编号,即页框号/页帧号/内存块号/物理块号/物理页号,页框号从0开始
进程地址空间也分为与页框大小相等的一个个部分,每一部分称为页/页面,每个页面也有一个编号,即页号,页号也从0开始
页表 : 由一个个页表项组成,每个页表项由"页号"和"块号"组成
单级页表将逻辑地址转换成物理地址的步骤 (页面大小为4KB,页表项为4B):
(1). 逻辑地址/4KB得到逻辑地址所处的页面
(2). 逻辑地址 % 4KB 得到页内偏移量
(3). 查询页表得到对应的内存块号,内存块号 * 4KB + 页内偏移量得到物理地址
单级页表存在的问题 :
某计算机系统按字节寻址,支持32位的逻辑地址,采用分页存储管理,因为页面的大小固定为 4KB = 2^12B,所以前12位表示页内偏移量,后20位为页号,系统中最多有2 ^20个页面,页表项为4B,则页表所需要的内存空间为 2 ^20 * 4B = 2 ^22B = 4MB
二级页表 :
系统最多有2^20个页面,对应页表项有2 ^20个,每个页表项为4B,我们可以将2 ^10个页表项为一组,这样一组页表项的大小为 2 ^10 * 4B = 2 ^12B = 4KB,正好可以放到一个页框中,将其分为2 ^
10组,维护一个页目录表,用来记录每组的内存块号,页目录表的大小 2 ^10 * 4B = 2^12B = 4KB
二级页表将逻辑地址转换成物理地址的步骤 (页面大小为4KB,页表项为4B):
(1). 将逻辑地址分为3部分,根据后10位找到对应内存块的二级页表
(2). 根据中10位找到逻辑地址对应物理地址的起始地址
(3). 根据前12位得到偏移量,由第二步得到的起始地址 + 偏移量得到最终的物理地址
// 为什么不能改?
const char* msg = "hello world\n";
*msg = 'H';
页表里还有是否命中,权限等选项,前面说过页表也分为内核级页表和用户级页表,但不是物理上分开,而是通过权限分开的,页表中有RWX(读写可执行)权限,访问字符串时,需要访问页表得到物理地址,但RWX权限是只R的,写入会触发MMU硬件错误,操作系统识别到该硬件错误向该进程发送信号使进程终止
缺页中断 : 如果在访问期间,目标资源不在内存中,则会触发缺页中断,进行内存分配,页面换入,建立映射,在调用malloc,new时,只是在堆区空间开辟出来,也就是只给了你虚拟地址,当访问时,发现物理内存还没分配,操作系统会申请一块内存,然后进行虚拟地址和物理地址的映射,这种现象就叫做由于缺页中断引起的内存分配的现象
线程的优点 :
(1). 创建一个新线程的代价要比创建一个新进程小得多
创建进程要创建task_struct,mm_struct,页表等数据结构(所以创建进程本质上是系统不断的分配资源),而创建线程只需要创建出一个task_struct以及少量的数据结构,只需要分配进程的资源即可
(2). 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
进程的切换需要涉及到切换task_struct,mm_struct,页表,上下文信息等等,而线程切换只需要切换上下文即可
(3). 线程占用的资源要比进程少很多
(4). 在等待慢速IO操作结束的同时,程序可执行其他的计算任务(迅雷边下边播的功能就是用多线程实现的)
(5). 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
计算密集型 : 执行流的大部分任务,主要以计算为主 : 加密解密,排序查找
(6). I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。
I/O密集型 : 执行流的大部分任务,是以I/O为主的 : 刷磁盘,访问数据库,访问网络(百度网盘上传和下载)
线程的缺点 :
(1). 性能损失
一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型
线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的
同步和调度开销,而可用的资源不变。
(2). 健壮性降低
因为进程运行是具有独立性的,所以一个进程挂了不会影响另一个进程,而多线程地址空间共享,也就意味着通过地址空间,多线程看到的资源大部分是相同的,一个线程崩溃了,其他线程也会崩溃,进程也就随之崩溃了
(3). 缺乏访问控制
进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。
进程 vs 线程
(1). 进程是资源分配的基本单位,线程是调度的基本单位
(2). 线程共享进程数据,但也有自己的一部分数据
线程ID
一组寄存器(线程上下文数据)
栈
errno
信号屏蔽字
调度优先级
寄存器保存线程的上下文信息
线程运行时会产生各种临时数据,需要将数据进行压栈
进程的多个线程共享
同一地址空间,因此Text Segment,Data Segment 都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,各线程都可以访问到,除此之外,各线程还共享以下进程资源和环境
文件描述符表
每种信号的处理方式(SIG_IGN,SIG_DFL,或自定义处理函数)
当前工作目录
用户id和组id
功能:创建一个新的线程
原型 int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void * (*start_routine)(void*), void *arg);
参数:
pthread_t : 无符号长整型
thread:返回线程ID
attr:设置线程的属性,attr为NULL表示使用默认属性
start_routine:是个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0,失败返回错误码
#include
#include
#include
#include
void* Routine(void* arg)
{
while(1)
{
printf(" %s : pid : %d,ppid : %d\n ",(char*)arg,getpid(),getppid());
sleep(1);
}
}
int main()
{
pthread_t tid;
pthread_create(&tid,NULL,Routine,(void*)"thread 1");
while(1)
{
printf("main thread , pid : %d,ppid : %d\n",getpid(),getppid());
sleep(1);
}
return 0;
}
主线程和新创建的线程进程pid和ppid都是一样的,可以看出虽然这是两个线程,但是这两个线程同属于一个进程
我们可以使用 ps -aL 命令来查看线程
ps -aL | head -1 && ps -aL | grep mythread
-L : 显示当前的轻量级进程
所以实际上,操作系统调度的时候,根本看的不是进程pid,而是LWP(light weight process)
pthread_t pthread_self(void);
功能 : 可以获得线程自身的ID
#include
#include
#include
#include
#include
void* Routine(void* arg)
{
while(1)
{
printf(" %s : pid : %d,ppid : %d,tid : %lu\n ",(char*)arg,getpid(),getppid(),pthread_self());
sleep(1);
}
}
int main()
{
pthread_t tid[5];
for(int i = 0;i < 5;i++)
{
char* buffer = malloc(64);
sprintf(buffer,"thread %d",i);
pthread_create(&tid[i],NULL,Routine,(void*)buffer);
printf("%s tid : %lu\n",buffer,tid[i]);
}
while(1)
{
printf("main thread , pid : %d,ppid : %d,tid : %lu\n",getpid(),getppid(),pthread_self());
sleep(1);
}
return 0;
}
pthread_self()获得的是用户级原生线程库的线程 id,LWP是内核的轻量级进程id,这两个id是有对应关系的(1 : 1),一个用户级原生线程库的线程 id 对应 LWP 的某个值
原型 int pthread_join(pthread_t thread, void **value_ptr);
功能:等待线程结束(阻塞式等待)
参数
thread:线程ID
value_ptr:它指向一个指针,后者指向线程的返回值
返回值:成功返回0;失败返回错误码
#include
#include
#include
#include
#include
void* Routine(void* arg)
{
int count = 0;
while(count < 5)
{
printf("%s : pid : %d,ppid : %d,tid : %lu\n ",(char*)arg,getpid(),getppid(),pthread_self());
sleep(1);
count++;
}
return (void*)0; // (void*)10
}
int main()
{
pthread_t tid[5];
for(int i = 0;i < 5;i++)
{
char* buffer = (char*)malloc(64);
sprintf(buffer,"thread %d",i);
pthread_create(&tid[i],NULL,Routine,(void*)buffer);
printf("%s tid : %lu\n",buffer,tid[i]);
}
printf("main thread , pid : %d,ppid : %d,tid : %lu\n",getpid(),getppid(),pthread_self());
for(int i = 0;i < 5;i++)
{
void* ret = NULL;
pthread_join(tid[i],&ret);
printf("thread %d[%lu]....quit,code : %d\n",i,tid[i],(int)ret);
}
return 0;
}
线程终止 :
(1). return XXX 终止线程,但如果在main函数中return XXX,会导致进程退出,所有线程退出
(2). 在线程中使用 pthread_exit(void *value_ptr),用来终止某个线程,调用 exit() 会导致进程退出
(3). 使用 pthread_cancel(pthread_t thread)取消某个线程
原型 void pthread_exit(void *value_ptr);
功能:线程终止
参数 : value_ptr : 线程退出的退出码
返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
原型 int pthread_cancel(pthread_t thread);
功能:取消一个执行中的线程
参数 thread:线程ID
返回值:成功返回0;失败返回错误码
exit : 终止进程,在线程中使用exit(),会使进程直接退出,从而不会再执行 printf(“thread %d[%lu]…quit,code : %d\n”,i,tid[i],(int)ret);
#include
#include
#include
#include
#include
void* Routine(void* arg)
{
int count = 0;
while(count < 5)
{
printf("%s : pid : %d,ppid : %d,tid : %lu\n ",(char*)arg,getpid(),getppid(),pthread_self());
sleep(1);
count++;
}
exit(0);
}
int main()
{
pthread_t tid[5];
for(int i = 0;i < 5;i++)
{
char* buffer = (char*)malloc(64);
sprintf(buffer,"thread %d",i);
pthread_create(&tid[i],NULL,Routine,(void*)buffer);
printf("%s tid : %lu\n",buffer,tid[i]);
}
printf("main thread , pid : %d,ppid : %d,tid : %lu\n",getpid(),getppid(),pthread_self());
for(int i = 0;i < 5;i++)
{
void* ret = NULL;
pthread_join(tid[i],&ret);
printf("thread %d[%lu]....quit,code : %d\n",i,tid[i],(int)ret);
}
return 0;
}
#include
#include
#include
#include
#include
void* Routine(void* arg)
{
int count = 0;
while(count < 5)
{
printf("%s : pid : %d,ppid : %d,tid : %lu\n ",(char*)arg,getpid(),getppid(),pthread_self());
sleep(1);
count++;
}
pthread_exit((void*)10);
}
int main()
{
pthread_t tid[5];
for(int i = 0;i < 5;i++)
{
char* buffer = (char*)malloc(64);
sprintf(buffer,"thread %d",i);
pthread_create(&tid[i],NULL,Routine,(void*)buffer);
printf("%s tid : %lu\n",buffer,tid[i]);
}
printf("main thread , pid : %d,ppid : %d,tid : %lu\n",getpid(),getppid(),pthread_self());
for(int i = 0;i < 5;i++)
{
void* ret = NULL;
pthread_join(tid[i],&ret);
printf("thread %d[%lu]....quit,code : %d\n",i,tid[i],(int)ret);
}
return 0;
}
(1). 自已取消自己的时候最好后面还有代码,取消线程成功,退出码为 -1 (如果取消线程后面没有代码,返回值为0)
(2). 推荐在主线程中取消其他线程
#include
#include
#include
#include
#include
void* Routine(void* arg)
{
int count = 0;
while(count < 5)
{
printf("%s : pid : %d,ppid : %d,tid : %lu\n ",(char*)arg,getpid(),getppid(),pthread_self());
sleep(1);
count++;
}
pthread_exit((void*)10);
}
int main()
{
pthread_t tid[5];
for(int i = 0;i < 5;i++)
{
char* buffer = (char*)malloc(64);
sprintf(buffer,"thread %d",i);
pthread_create(&tid[i],NULL,Routine,(void*)buffer);
printf("%s tid : %lu\n",buffer,tid[i]);
}
printf("main thread , pid : %d,ppid : %d,tid : %lu\n",getpid(),getppid(),pthread_self());
pthread_cancel(tid[0]);
pthread_cancel(tid[1]);
for(int i = 0;i < 5;i++)
{
void* ret = NULL;
pthread_join(tid[i],&ret);
printf("thread %d[%lu]....quit,code : %d\n",i,tid[i],(int)ret);
}
return 0;
}
一般情况下,线程必须被等待,就如同子进程必须被等待一般,但是线程也可以不被等待,不过需要将线程进行分离
线程分离 : 一个线程被分离了,这个线程依旧会使用进程的资源,只不过这个线程是不需要主线程去等待的,如果这个线程退出了,系统会自动回收该线程的资源,不需要通过主线程去等待了(建议自己分离自己)
#include
int pthread_detach(pthread_t thread);
功能 : 进行线程分离
返回值 : 成功返回0,失败返回错误码
#include
#include
#include
#include
#include
void* Routine(void* arg)
{
pthread_detach(pthread_self());
int count = 0;
while(count < 5)
{
printf("%s : pid : %d,ppid : %d,tid : %lu\n ",(char*)arg,getpid(),getppid(),pthread_self());
sleep(1);
count++;
}
pthread_exit((void*)10);
}
int main()
{
pthread_t tid[5];
for(int i = 0;i < 5;i++)
{
char* buffer = (char*)malloc(64);
sprintf(buffer,"thread %d",i);
pthread_create(&tid[i],NULL,Routine,(void*)buffer);
printf("%s tid : %lu\n",buffer,tid[i]);
}
printf("main thread , pid : %d,ppid : %d,tid : %lu\n",getpid(),getppid(),pthread_self());
while(1)
{}
return 0;
}
while : ;do ps -aL |head -1 && ps -aL | grep mythread; echo "##################"; sleep 1; done
#include
#include
#include
#include
#include
void* Routine(void* arg)
{
while(1)
{
printf("new thread : %p\n",pthread_self());
sleep(1);
}
}
int main()
{
pthread_t tid;
pthread_create(&tid,NULL,Routine,NULL);
while(1)
{
printf("main thread id : %p\n",pthread_self());
sleep(2);
}
pthread_join(tid,NULL);
}
线程有很多,所以线程是需要被管理的,OS管理资源采用先描述再组织的方式,而Linux不存在真正意义上的多线程,只提供了LWP,OS只需要对LWP内核执行流进行管理,给用户使用的接口等其他数据,是由线程库pthread库来管理
pthread库是一个普通文件,运行起mythread程序,程序所依赖的pthread库被加载到内存中,映射到进程地址空间中的共享区,这样每一个线程都能看到pthread库
主线程使用进程的栈,而线程所私有的栈,寄存器等是在动态库中,如果要找到一个线程,只需要找到描述该线程的内存块的起始地址,线程的相关属性就可以被找到了
所谓的线程id本质上是一个地址,一个映射到pthread库中特定内存块的虚拟共享区地址
线程切换并不需要去执行内核代码来完成切换,只需要去动态库中把当前线程的临时数据保存到栈中,上下文信息保存到寄存器中,切换到下一个线程控制块,把下一个线程的相关信息交给CPU执行即可
有缺陷的抢票系统
#include
#include
#include
#include
#include
int tickets = 2000;
void* TicketGrabbing(void* name)
{
while(1)
{
if(tickets > 0)
{
usleep(100);
printf("[%s] got a tickets : %d\n",name,tickets--);
}
else
{
break;
}
}
printf("%s quit\n",name);
pthread_exit((void*)0);
}
int main()
{
pthread_t t1,t2,t3,t4;
pthread_create(&t1,NULL,TicketGrabbing,"thread 1");
pthread_create(&t2,NULL,TicketGrabbing,"thread 2");
pthread_create(&t3,NULL,TicketGrabbing,"thread 3");
pthread_create(&t4,NULL,TicketGrabbing,"thread 4");
pthread_join(t1,NULL);
pthread_join(t2,NULL);
pthread_join(t3,NULL);
pthread_join(t4,NULL);
return 0;
}
上述代码的4个线程同时访问的 tickets 就是临界资源,访问 tickets 的代码称为临界区
tickets- - 至少需要进行三步操作
(1). 将 tickets 的值读到CPU的寄存器中
(2). 将寄存器中的值 - -
(3). 最后将寄存器中的值写回 tickets 中
tickets- - 不是原子操作
这就会导致问题的出现 : 线程1刚把 tickets 的值 1000 读到寄存器中,有可能就被切换下去了(切换下去之前要先把上下文信息保存到私有寄存器中,临时数据保存到私有栈中),线程2开始执行,对 tickets 1000 进行- -,线程2优先级较高,直接将 tickets 减到了 900 ,线程1再次被执行,将寄存器中的1000减到999,写回 tickets 中,相当于无缘无故多出来 99 张票
tickets > 0 不是原子操作
线程1检测到 tickets 1000 > 0 进入循环后,被切换走,线程2检测到 tickets 1000 > 0 进入循环后,将 tickets 减到 0,线程1再次被执行,将 tickets 减到 -1
出现问题的本质是多个线程共同访问临界区导致的,所以我们需要保证临界区的安全,我们就需要用互斥来完成,当一个线程进入临界区执行的时候,不允许其他的线程进入,要做到这点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量,申请锁的时候如果锁已经被别人占了,那么当前线程必须等待,直到别人把锁释放才能申请到锁来访问临界区
在大部分情况下,加锁本身是有损于性能的,原来多线程可以并发或并行的去执行某种任务,现在因为多线程访问了临界资源,每一个线程在访问临界资源时都需要申请锁,申请锁成功后才能进入临界区,所以我们要尽可能地减少加锁带来的性能开销成本
加锁后的原子性体现在两方面
(1). 如果线程1申请了锁,线程2,线程3,线程4就会阻塞等待线程1释放锁,所以对于线程2,线程3,线程4来说,只有线程1没有申请锁或线程1释放锁后才有意义,站在线程2,线程3,线程4的视角来看是原子的
(2). 线程1在临界区内可以进行线程切换,但其他线程也无法进入临界区,因为线程1并没有释放锁,所以其他线程无法进入临界区
锁本身也是临界资源,因此申请锁的过程必须是原子的,即一个线程申请到锁,其他线程不可能再申请到锁
#include
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
参数 :
mutex:要初始化的互斥量
attr : 互斥锁的属性,不关心设置为NULL(默认属性)
返回值 : 成功返回0,失败设置错误码
#include
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
#include
int pthread_mutex_destroy(pthread_mutex_t *mutex);
#include
#include
#include
#include
#include
#define NUM 10000
int tickets = NUM;
pthread_mutex_t lock;
void* GetTickets(void* arg)
{
while(1)
{
pthread_mutex_lock(&lock);
if(tickets > 0)
{
usleep(100);
printf("thread %d got a ticket: %d\n",(int)arg,tickets--);
pthread_mutex_unlock(&lock);
}
else
{
pthread_mutex_unlock(&lock);
break;
}
}
}
int main()
{
pthread_t tid[5];
pthread_mutex_init(&lock,NULL);
for(int i = 0;i < 5;i++) pthread_create(&tid[i],NULL,GetTickets,(void*)i);
for(int i = 0;i < 5;i++) pthread_join(tid[i],NULL);
pthread_mutex_destroy(&lock);
return 0;
}
锁的原子性,是如何实现的? lock 和 ulock 的具体过程是怎样的呢?
(1). 经过上面的例子,我们已经知道 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题
(2). 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。lock和unlock的
伪代码如下
(1). 在内存中申请一个全局变量 mutex,将 mutex 的值置为1,线程A先将CPU的 al 寄存器值置为0,再将 al 寄存器和内存中的mutex的值进行交换(al寄存器中的值为1,mutex中的值为0),此时A就完成了加锁
(2). 如果线程A在执行交换指令后被切换(切换时会把CPU al 寄存器的内容保存到线程A私有的寄存器中,数据保存到线程A私有的栈中),线程B开始执行,将 al 寄存器值置为0,然后交换 al 寄存器和内存中 mutex 的值,但交换后 al 的寄存器仍然为0,因此B就会挂起等待,直到A解锁后将mutex值置为1,唤醒线程B,再次交换 al 寄存器和内存中 mutex 的值,B就完成了加锁
时钟中断是操作系统能够进行线程调度的基础,操作系统会在每次时钟中断时被唤醒,执行中断向量表,将程序流程转向中断服务程序的入口地址
线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会导致线程不安全
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
(1). 线程安全描述的是线程执行代码时是否安全的情况,重入重点是函数被重复进入
常见线程不安全的情况
(1). 不保护共享变量的函数(最开始抢票代码中的抢票函数没有对 tickets 进行保护)
(2). 函数状态随着被调用,状态发生变化的函数
(3). 返回指向静态变量指针的函数
(4). 调用线程不安全函数的函数
可重入函数是线程安全函数的一种
线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的(并一定多线程才会导致重入,在单执行流的情况下也可能会导致重入,如信号捕捉,如果在主线程中申请了锁,然后在执行信号处理函数中再次申请锁,就会导致死锁)
死锁
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态
什么叫做阻塞?
系统中有一个运行等待队列,CPU会去执行在运行等待队列中的线程
线程在等待锁或其他资源(磁盘,网卡,显示器,打印机)时,会把该线程从运行等待队列链接到资源等待队列中(由R状态设置为S状态,这种情况可以称之为该线程被挂起等待了),当锁或其他资源就绪之后,将该线程由资源等待队列中链接到运行等待队列中(由S状态设置为R状态),等待被调度
进程线程等待某种资源,在OS层面就是将进程线程的task_struct放入对应的等待队列里(R->S),这种情况就被称为进程线程被挂起等待了
死锁四个必要条件
互斥条件:一个资源每次只能被一个执行流使用
请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
不剥夺条件:一个执行流已获得的资源,在未使用完之前,不能强行剥夺
循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
为什么要存在同步?
单纯的加锁有没有问题?
个别线程竞争力很强,每次都能申请到锁,但是不办事情,有可能导致其他线程长时间竞争不到锁,从而引起饥饿问题
举例说明 : 一间教室只允许一个学生上自习,一个学生拿着锁进了教室,把教室锁上,开始在教室上自习,其他学生想要上自习只能等待该学生上完自习后把锁拿出来,该学生上完自习了,把锁拿了出来,但突然又想再学习一会,因为该学生离锁最近,于是该学生又把锁抢走,去上自习,其他学生又只能等待,但该学生没上几分钟又坐不住出去了,然后又想再学习一会回来,就这样循环往复,导致其他学生无法竞争到锁,为了让其他学生可以竞争到锁,就规定让该学生把锁拿出来后去后面排队。
上面例子中的教室就是临界资源,学生就是线程
条件变量
条件变量用来描述某种临界资源是否就绪的一种数据化描述
条件变量是Linux中提供的一种同步的机制,用来表示某种条件是否成立
struct pthread_cond_t
{
int flag;
task_struct* queue;
}
例子 : 一个学生拿着锁进了教室,其他学生来了只能等待锁,这时可以设置一个条件变量(可以看作一个结构体,flag == 0表示无钥匙,flag == 1表示有钥匙,task_struct* queue指向等待队列的队头线程),用来描述锁是否被拿出来了,其他学生就可以在条件变量下等待,flag == 1时就可以唤醒排在队列头的线程,因此条件变量就是用来描述某种临界资源状态是否就绪
条件变量需要配合mutex互斥锁一起使用
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 参数:
cond:要在这个条件变量上等待
mutex:释放互斥锁
调用该接口默认条件不满足,会开始阻塞
int pthread_cond_signal(pthread_cond_t *cond);
功能 : 唤醒在条件变量上等待的第一个线程
int pthread_cond_broadcast(pthread_cond_t *cond);
功能 : 唤醒在条件变量上等待的所有线程
1 #include<iostream>
2 #include<cstdio>
3 #include<pthread.h>
4 using namespace std;
5 pthread_mutex_t lock;
6 pthread_cond_t cond;
7 void* Run(void* arg)
8 {
9 pthread_detach(pthread_self());
10 cout<<(char*)arg<<" : "<<pthread_self()<<" run...."<<endl;
11 while(1)
12 {
13 pthread_cond_wait(&cond,&lock);
14 cout<<"thread : "<<pthread_self()<<" activity"<<endl;
15 }
16 }
17 int main()
18 {
19 pthread_mutex_init(&lock,nullptr);
20 pthread_cond_init(&cond,nullptr);
21
22 pthread_t t1,t2,t3;
23 pthread_create(&t1,nullptr,Run,(void*)"thread 1");
24 pthread_create(&t2,nullptr,Run,(void*)"thread 2");
25 pthread_create(&t3,nullptr,Run,(void*)"thread 3");
26
27 while(1)
28 {
29 getchar();
30 pthread_cond_signal(&cond);
// pthread_cond_broadcast(&cond); 唤醒所有等待线程
31 }
32 pthread_mutex_destroy(&lock);
33 pthread_cond_destroy(&cond);
34 return 0;
35 }
生产者消费者模型 : 是经典的多线程同步与互斥的一个场景
拿去超市买东西为例,超市本身不生产商品,会从生产者拿对应的商品,超市里的柜台中放着商品
假如某一天超市的柜台里没有商品了,消费者去超市里买商品,发现没有商品就回去了,消费者只能轮询的去超市查看有没有商品
假如某一天超市的柜台已经满了,生产者只能轮询的去查看超市是否缺货了
消费者,生产者每次轮询的去询问超市是否有货/缺货,太浪费时间了,所以消费者和生产者都加了超市工作人员的微信,等超市有货/缺货给消费者/生产者发送消息来告知他们,超市有货了给消费者发微信让消费者来消费,超市缺货了给生产者发微信让生产者来供货
超市在供货时是不能消费的,在消费时是不能供货的,所以需要使用mutex互斥锁来实现,消费者申请锁来超市消费,消费期间生产者不能进来,消费者消费完释放锁,生产者申请到锁来供货,供货期间消费者不能来消费,供货完生产者释放锁
三种关系 : 消费者和消费者,生产者和生产者,生产者和消费者
消费者和消费者之间是互斥关系(竞争关系)
生产者和生产者之间是互斥关系(竞争关系)
生产者和消费者之间是同步(多线程协同)和互斥关系(竞争关系)
两种角色 : 生产者和消费者(通常由特定的线程和进程来承担)
一个交易场所 : 通常是内存中的一段缓冲区
为什么存在生产者和消费者模型?
(1). 在函数调用中,main 函数 调用了 fun(int a,int b) 函数,main 函数必须等 fun 函数跑完才能回到 main 函数中继续执行,函数调用的本质就是把数据给一个函数,然后通过该函数拿到数据处理后的结果。所以函数调用是一种紧耦合
生产者把数据交给缓冲区,消费者从缓冲区中拿数据进行计算,对于生产者来讲,生产者把数据交给缓冲区,生产者就可以往后执行了,并不会因为计算没有出现结果而阻塞,双方就可以并发或并行的执行了,所以生产者和消费者模型实现了解耦
(2). 支持并发
(3). 如果生产数据过程特别慢,消费数据过程特别快,如果再把它们紧耦合起来,最终系统的效率会被拖慢,所以我们可以让生产者多一些,消费者少一些,即支持忙闲不均
为什么 pthread_cond_wait 中第二个参数要传入 mutex 互斥锁?
等待的时候往往是在临界区等待的,此时线程申请了锁,如果不把锁释放,会导致死锁问题,因此传入第二个参数mutex互斥锁,在进行等待时,会自动释放互斥锁,如果当前等待线程被唤醒,会自动获得互斥锁
BlockQueue.hpp
1 #pragma once
2 #include<iostream>
3 #include<queue>
4 #include<pthread.h>
5 #include<cstdlib>
6 #include<ctime>
7 #include<unistd.h>
8 #define NUM 5
9 using namespace std;
10 template<class T>
11 class BlockQueue
12 {
13 private:
14 bool IsFull()
15 {
16 return _q.size() == _cap;
17 }
18 bool IsEmpty()
19 {
20 return _q.empty();
21 }
22 public:
23 BlockQueue(int cap = NUM)
24 :_cap(cap)
25 {
26 pthread_mutex_init(&lock,nullptr);
27 pthread_cond_init(&full,nullptr);
28 pthread_cond_init(&empty,nullptr);
29 }
30 ~BlockQueue()
31 {
32 pthread_mutex_destroy(&lock);
33 pthread_cond_destroy(&full);
34 pthread_cond_destroy(&empty);
35 }
36 void Push(const T& val)
37 {
38 pthread_mutex_lock(&lock);
39 while(IsFull())
40 {
41 pthread_cond_wait(&full,&lock);
42 }
43 _q.push(val);
44 pthread_mutex_unlock(&lock);
45 pthread_cond_signal(&empty);
46 }
47 void Pop(T& val)
48 {
49 pthread_mutex_lock(&lock);
50 while(IsEmpty())
51 {
52 pthread_cond_wait(&empty,&lock);
53 }
54 val = _q.front();
55 _q.pop();
56 pthread_mutex_unlock(&lock);
57 pthread_cond_signal(&full);
58 }
59 private:
60 queue<T> _q;
61 int _cap;
62 pthread_mutex_t lock;
63 pthread_cond_t full;
64 pthread_cond_t empty;
65 };
main.cc
1 #include"BlockQueue.hpp"
2 void* Consumer(void* arg)
3 {
4 auto bq = (BlockQueue<int>*)arg;
5 while(1)
6 {
7 sleep(1); // 消费者消费慢
// 消费者消费快
8 int data = 0;
9 bq->Pop(data);
10 cout<<"Consumer : "<<data<<endl;
11 }
12 }
13 void* Producter(void* arg)
14 {
15 auto bq = (BlockQueue<int>*)arg;
16 while(1)
17 {
18 生产者生产快
// sleep(1); 生产者生产慢
19 int data = rand() % 100 + 1;
20 bq->Push(data);
21 cout<<"Producter : "<<data<<endl;
22 }
23 }
24 int main()
25 {
26 srand((unsigned long)time(nullptr)); // 产生随机数
27 BlockQueue<int>* bq = new BlockQueue<int>;
28 pthread_t c,p;
29 pthread_create(&c,nullptr,Consumer,bq);
30 pthread_create(&p,nullptr,Producter,bq);
31 pthread_join(c,nullptr);
32 pthread_join(p,nullptr);
33 return 0;
34 }
BlockQueue.hpp
1 #pragma once
2 #include<iostream>
3 #include<queue>
4 #include<pthread.h>
5 #include<cstdlib>
6 #include<ctime>
7 #include<unistd.h>
8 #define NUM 32
9 using namespace std;
10 template<class T>
11 class BlockQueue
12 {
13 private:
14 bool IsFull()
15 {
16 return _q.size() == _cap;
17 }
18 bool IsEmpty()
19 {
20 return _q.empty();
21 }
22 public:
23 BlockQueue(int cap = NUM)
24 :_cap(cap)
25 {
26 pthread_mutex_init(&lock,nullptr);
27 pthread_cond_init(&full,nullptr);
28 pthread_cond_init(&empty,nullptr);
29 }
30 ~BlockQueue()
31 {
32 pthread_mutex_destroy(&lock);
33 pthread_cond_destroy(&full);
34 pthread_cond_destroy(&empty);
35 }
36 void Push(const T& val)
37 {
38 pthread_mutex_lock(&lock);
39 while(IsFull())
40 {
41 pthread_cond_wait(&full,&lock);
42 }
43 _q.push(val);
if(_q.size() >= _cap / 2)
{
cout<<"数据很多了,消费者快来消费吧"<<endl;
pthread_cond_signal(&empty);
}
44 pthread_mutex_unlock(&lock);
46 }
47 void Pop(T& val)
48 {
49 pthread_mutex_lock(&lock);
50 while(IsEmpty())
51 {
52 pthread_cond_wait(&empty,&lock);
53 }
54 val = _q.front();
55 _q.pop();
if(_q.size() <= _cap / 2)
{
cout<<"空间很多了,快来生产吧"<<endl;
pthread_cond_signal(&full);
}
56 pthread_mutex_unlock(&lock);
58 }
59 private:
60 queue<T> _q;
61 int _cap;
62 pthread_mutex_t lock;
63 pthread_cond_t full;
64 pthread_cond_t empty;
65 };
main.cc
1 #include"BlockQueue.hpp"
2 void* Consumer(void* arg)
3 {
4 auto bq = (BlockQueue<int>*)arg;
5 while(1)
6 {
7 sleep(1);
8 int data = 0;
9 bq->Pop(data);
10 cout<<"Consumer : "<<data<<endl;
11 }
12 }
13 void* Producter(void* arg)
14 {
15 auto bq = (BlockQueue<int>*)arg;
16 while(1)
17 {
19 int data = rand() % 100 + 1;
20 bq->Push(data);
21 cout<<"Producter : "<<data<<endl;
22 }
23 }
24 int main()
25 {
26 srand((unsigned long)time(nullptr)); // 产生随机数
27 BlockQueue<int>* bq = new BlockQueue<int>;
28 pthread_t c,p;
29 pthread_create(&c,nullptr,Consumer,bq);
30 pthread_create(&p,nullptr,Producter,bq);
31 pthread_join(c,nullptr);
32 pthread_join(p,nullptr);
33 return 0;
34 }
为什么使用 while(IsFull()) 而不是 if(IsFull()) ?
pthread_cond_wait 是让当前执行流进行等待的函数,该函数有可能调用失败,或者被伪唤醒
对于生产者如果队列满了,调用pthread_cond_wait函数使生产者进行等待,但函数调用失败并没有成功让生产者等待,当前执行流还会继续向下执行_q.push(val)导致出错
对于消费者如果队列空了,调用pthread_cond_wait函数使消费者进行等待,但函数调用失败并没有成功让消费者等待,当前执行流还会继续向下执行_q.front(),_q.pop() 导致出错
单生产者多消费者情况下,没有数据时多消费者处于阻塞状态,生产者生产一个数据后如果调用 pthread_cond_broadcast(&empty) 唤醒所有消费者去消费,就会出现问题
所以使用while循环让线程被唤醒时并不着急向后执行,而是再重新检测是否还满足条件
生产者和消费者协同完成任务
Task.hpp
1 #pragma once
2 #include<iostream>
3 using namespace std;
4 class Task
5 {
6 public:
7 Task(int x,int y,char op)
8 :_x(x)
9 ,_y(y)
10 ,_op(op)
11 {}
12 Task()
13 {}
14 void Run()
15 {
16 int result = 0;
17 switch(_op)
18 {
19 case '+':
20 result = _x + _y;
21 break;
22 case '-':
23 result = _x - _y;
24 break;
25 case '*':
26 result = _x * _y;
27 break;
28 case '/':
29 if(_y == 0)
30 {
31 cout<<"Warning : div zero"<<endl;
32 result = -1;
33 }
34 else
35 {
36 result = _x / _y;
37 }
38 break;
39 default:
40 break;
41 }
42 cout<<_x<<_op<<_y<<"="<<result<<endl;
43 }
44 ~Task()
45 {}
46
47 private:
48 int _x;
49 int _y;
50 char _op;
51 };
POSIX信号量
什么是信号量(信号灯)?
信号量本质上是一个计数器,描述临界资源中资源数目的计数器,相比于互斥锁可以更细粒度的管理临界资源
申请到信号量的本质 : 并不是已经开始使用临界资源中申请的某个区域,而是拥有了申请的某个区域的权限(好比在网上买电影票预定座位,预定座位以后并不一定要坐到座位上,而是拥有了这个座位的权限)
申请信号量(P操作)的本质 : 让计数器- -
释放信号量(V操作)的本质 : 让计数器++
每个线程或执行流在进入临界资源的时候,都必须先申请信号量,操作特定的临界资源,最后释放信号量
每个线程在进入临界资源时都要申请信号量,所以信号量也是临界资源,信号量的P,V操作必须是原子的
进行PV操作时,一定有一种情况,P操作有可能申请不到,当前线程会在信号量的等待队列中阻塞等待
多个线程去申请信号量时,(信号量为0的话将当前线程的task_struct链入到等待队列中),申请到信号量的线程拿到锁,计数器- -,当别的线程进行了V操作,信号量就可以在等待队列中唤醒一个线程
struct
{
mutex_t lock;
int count;
task_struct* queue;
}
二元信号量实现互斥
1 #include<iostream>
2 #include<unistd.h>
3 #include<cstdio>
4 #include<pthread.h>
5 #include<semaphore.h>
6 using namespace std;
7 class Sem
8 {
9 public:
10 Sem(int num)
11 {
12 sem_init(&_sem,0,num);
13 }
14 void P()
15 {
16 sem_wait(&_sem);
17 }
18 void V()
19 {
20 sem_post(&_sem);
21 }
22 ~Sem()
23 {
24 sem_destroy(&_sem);
25 }
26 private:
27 sem_t _sem;
28 };
29 Sem sem(1);
30 int tickets = 10000;
31 void* Gettickets(void* arg)
32 {
33 while(1)
34 {
35 sem.P();
36 if(tickets > 0)
37 {
38 usleep(10000);
39 printf("%s got a ticket : %d\n",(char*)arg,tickets--);
40 sem.V();
41 }
42 else
43 {
44 sem.V();
45 break;
46 }
47 }
48 }
49 int main()
50 {
51 pthread_t tid1,tid2,tid3;
52 pthread_create(&tid1,nullptr,Gettickets,(void*)"thread 1");
53 pthread_create(&tid2,nullptr,Gettickets,(void*)"thread 2");
54 pthread_create(&tid3,nullptr,Gettickets,(void*)"thread 3");
55
56 pthread_join(tid1,nullptr);
57 pthread_join(tid2,nullptr);
58 pthread_join(tid3,nullptr);
59
60 return 0;
61 }
%s/ / /g
基于环形队列的生产消费模型
(1). 生产者和消费者同时执行,一定是生产者先运行,因为最开始没有数据资源让消费者消费,但是有空间资源让生产者生产
(2). 不会存在数据不一致的问题,因为只有生产者和消费者指向同一位置,才会有临界资源竞争的问题,而生产者和消费者指向同一位置只有两种情况,1. 环形队列为空的时候,一定是生产者先生产,消费者再消费 2. 环形队列满的时候,一定是消费者先消费,生产者再生产
RingQueue.hpp
#pragma once
2
3 #include<iostream>
4 #include<pthread.h>
5 #include<queue>
6 #include<cstdlib>
7 #include<unistd.h>
8 #include<vector>
9
10 template<class T>
11 class RingQueue{
12 private:
13 std::vector<T> q;
14 int cap;
15
16 sem_t blank_sem;
17 sem_t data_sem;
18
19 int p_pos;
20 int c_pos;
21
22 void P(sem_t& s){
23 sem_wait(&s);
24 }
25 void V(sem_t& s){
sem_post(&s);
27 }
28 public:
29 RingQueue(int _cap){
30
31 cap = _cap;
32 q.resize(cap);
33 sem_init(&blank_sem,0,cap);
34 sem_init(&data_sem,0,0);
35 p_pos = 0;
36 c_pos = 0;
37 }
38 void Push(const T& in){
39 P(blank_sem);
40 q[p_pos] = in;
41 V(data_sem);
42
43 p_pos++;
44 p_pos %= cap;
45 }
void Pop(T& out){
48
49 P(data_sem);
50 out = q[c_pos];
51 V(blank_sem);
52
53 c_pos++;
54 c_pos %= cap;
55 }
56 };
main.cc
#include"RingQueue.hpp"
2
3 void* Consumer(void* arg)
4 {
5 auto bq = (RingQueue<int>*)arg;
6 while(1)
7 {
8 sleep(3);
9 int data;
10 bq->Pop(data);
11 std::cout<<"Consumer : "<<data<<std::endl;
12 }
13 }
14
15 void* Producter(void* arg)
16 {
17 auto bq = (RingQueue<int>*)arg;
18 while(1)
19 {
20 int data = rand() % 100 + 1;
21 bq->Push(data);
22 std::cout<<"Producter : "<<data<<std::endl;
23 }
24 }
int main()
27 {
28 srand((unsigned long)time(nullptr)); // 产生随机数
29 RingQueue<int>* bq = new RingQueue<int>;
30 pthread_t c,p;
31 pthread_create(&c,nullptr,Consumer,bq);
32 pthread_create(&p,nullptr,Producter,bq);
33 pthread_join(c,nullptr);
34 pthread_join(p,nullptr);
35
36 return 0;
37 }
ThreadPool.hpp
#pragma once
2
3 #include<iostream>
4 #include<pthread.h>
5 #include<queue>
6 #include<cstdlib>
7 #include<unistd.h>
8 #include<vector>
9 #include<semaphore.h>
10 #include"Task.hpp"
11
12 #define NUM 5
13
14 template<class T>
15 class ThreadPool{
16 private:
17 int num;
18 std::queue<T> q;
19 pthread_mutex_t lock;
20 pthread_cond_t cond;
21 public:
22 ThreadPool(int _num = NUM):num(_num)
23 {
24 pthread_mutex_init(&lock,nullptr);
25 pthread_cond_init(&cond,nullptr);
}
27
28 void Lock(){
29 pthread_mutex_lock(&lock);
30 }
31
32 void UnLock(){
33 pthread_mutex_unlock(&lock);
34 }
35
36 void Wait(){
37 pthread_cond_wait(&cond,&lock);
38 }
39
40 void WakeUp(){
41 pthread_cond_signal(&cond);
42 }
43
44 bool Empty(){
45 return q.empty();
46 }
47
48 static void* Routine(void* arg){
49
50 pthread_detach(pthread_self());
ThreadPool<T>* self = (ThreadPool<T>*)arg;
52 while(1){
53 self->Lock();
54 while(self->Empty()){
55 self->Wait();
56 }
57 Task t;
58 self->Pop(t);
59 self->UnLock();
60
61 t.Run();
62 }
63 }
64
65 void InitThreadPool(){
66 pthread_t tid;
67 for(int i = 0;i < num;i++){
68 pthread_create(&tid,nullptr,Routine,this);
69 }
70 }
71
72
73 void Push(const T& in){
74 Lock();
75 q.push(in);
UnLock();
77 WakeUp();
78 }
79
80
81 void Pop(T& out){
82 out = q.front();
83 q.pop();
84 }
85
86 ~ThreadPool(){
87 pthread_mutex_destroy(&lock);
88 pthread_cond_destroy(&cond);
89 }
90
91 };
Task.hpp
#pragma once
2
3 class Task{
4 private:
5 int x;
6 int y;
7 char op;
8 public:
9 Task(){
10
11 }
12
13 Task(int _x,int _y,char _op)
14 :x(_x)
15 ,y(_y)
16 ,op(_op)
17 {}
18
19 void Run(){
20
21 int result = 0;
22 switch(op){
23 case '+' :
24 result = x + y;
25 break;
case '-' :
27 result = x - y;
28 break;
29 case '*' :
30 result = x * y;
31 break;
32 case '/' :
33 {
34 if(!y){
35 std::cout<<"除 0"<<std::endl;
36 return;
37 }
38 result = x / y;
39 }
40 default:
41 break;
42 }
43 std::cout<<x<<op<<y<<" = "<<result<<std::endl;
44 }
45 };
main.cc
#include"ThreadPool.hpp"
2
3 int main()
4 {
5 srand((unsigned long)time(nullptr)); // 产生随机数
6 ThreadPool<Task>* tp = new ThreadPool<Task>;
7 tp->InitThreadPool();
8 const char* arr = "+-*/";
9 while(1){
10 int x = rand() % 100 + 1;
11 int y = rand() % 100 + 1;
12 char op = arr[rand() % 4];
13
14 Task t(x,y,op);
15 tp->Push(t);
16 }
17
18 return 0;
19 }
挂起等待锁 : 申请资源失败,线程会被阻塞挂起
自旋锁 : 申请资源失败,线程会一直循环检测资源是否准备就绪
读写锁 :
#include
int pthread_spin_init(pthread_spinlock_t *lock,int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
// 自旋1次,如果申请锁不成功,返回让用户决定该怎么做
int pthread_spin_unlock(pthread_spinlock_t *lock);
读者写者问题(如同班级策划黑板报)
3种关系 :
读者和读者 : 没有关系(读者只是进行数据拷贝(只是对黑板报的内容进行读取),因此没有关系)
读者和写者 : 互斥(读黑板报的人不能在写黑板报的人还没写完之前就去读,因为会导致读取的内容错误)和同步关系(写者写完黑板报,需要让读者读)
写者和写者 : 互斥关系(两人不能同时在黑板的同一区域内进行写入)
应用场景 : 数据写入之后,剩下的操作就是读取(如写黑板报/写日记后让其他人来读),写入数据少,读取数据多
如注册(写入数据)和登录(读取数据)网站
读者和写者问题是由角色强决定的,读者按照读的方式进行加锁,写者按照写的方式进行加锁
多线程情况下,读写同时到来,如果还有人在读,接下来读者不要进入临界区读,等写者写入,然后读者再读(写者优先),反之就叫做读者优先