• Linux —— 线程


    目录

    一,线程概念

    二,Linux进程与线程

    三,Linux线程控制

    线程创建

    线程终止

    线程等待

    线程分离

    四,Linux线程互斥

    互斥量mutex

    可重入和线程安全

    死锁

    五,Linux线程同步

    条件变量函数

    pthread_cond_wait需要互斥量的原因

    条件变量使用规范

    六,生产者消费者模型

    优点

    基于BlockingQuene的生产者消费者模型

    七,信号量

    基于环形队列的生产者消费者模型

    八,线程池

    九,线程安全单例模式

    十,STL、智能指针和线程安全

    十一,读者写者问题


    一,线程概念

            在一程序内,一个执行路线称为线程thread,即线程是一个进程内部的控制序列;

    • 一切进程至少都有一个执行线程;
    • 线程在进程内部运行,本质是在进程地址空间内运行;
    • 在Linux系统中,CPU看到的PCB都要比传统的进程更加轻量化;
    • 透过进程虚拟地址空间,可看到进程的大部分资源;将进程资源合理分配给每个执行流,就形成线程执行流;

    线程优点

    • 创建新线程的代价要比创建新进程小的多;
    • 与进程间的切换相比,线程间的切换需要操作系统所做的工作要少很多;
    • 线程占用资源比进程少很多;
    • 能充分利用多处理器的可并行数量;
    • 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务;
    • 计算密集型应用,为了提高性能,将I/O操作重叠;线程可同时等待不同的I/O操作;

    线程缺点

    • 性能损失,一个很少被外部事件阻塞的计算密集型线程往往无法与其他线程共享一个处理器;如计算密集型线程的数量比可用的处理器多,可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变;
    • 健壮性降低,编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话即线程间缺乏保护;
    • 缺乏访问控制,进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响;
    • 编程难度提高,编写与调试一个多线程程序比单线程困难的多;

    线程异常

    • 单个线程如出现除零,野指针等问题导致线程崩溃,进程也会随之崩溃;
    • 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程;进程终止,该进程内的所有线程也会退出;

    线程用途

    • 合理的使用多线程,能提高CPU密集型程序的执行效率;
    • 合理的使用多线程,能提高I/O密集型程序的用户体验;

    二,Linux进程与线程

    • 进程是资源分配的基本单位;
    • 线程是调度的基本单位;
    • 线程共享进程数据,但也拥有自己的一部分数据;
      • 线程ID;
      • 一组寄存器(上下文数据);
      • 栈;
      • errno;
      • 信号屏蔽字;
      • 调度优先级;

            进程的多个线程共享同一地址空间,因此Text Segment、Data Segment都是共享的;如定义一个函数,在各个线程中都可调用,如定义一个全局变量,在各线程都可访问到,除此之外,各线程还共享以下进程资源和环境:

    • 文件描述符;
    • 每种信号的处理方式(SIG_IGN、SIG_DFL或自定义信号处理函数);
    • 当前工作目录;
    • 用户id和组id;

    三,Linux线程控制

    POSIX线程库

    • 与线程有关的函数构成了一个完整的系列,绝大多数函数的名字都是以”pthread_“开头的;
    • 要使用这些函数库,引用头文件
    • 链接这些线程函数库,使用编译器命令”-lpthread“选项;

    线程创建

    1. //创建新线程
    2. int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *
    3. (*start_routine)(void*), void *arg);
    • thread,返回线程的ID;
    • attr,设置线程的属性,如为NULL表示使用默认属性;
    • start_routine,函数地址,线程启动后执行的函数;
    • arg,传给线程启动函数的参数;

    返回值

    • 成功返回0,失败返回错误码;
    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. void* rout(void* arg){
    7. for( ; ; ){
    8. printf("I am thread1\n");
    9. sleep(1);
    10. }
    11. }
    12. int main(){
    13. pthread_t tid;
    14. int ret = pthread_create(&tid, NULL, rout, NULL);
    15. if(ret != 0){
    16. fprintf(stderr, "pthread_create: %s\n", strerror(ret));
    17. exit(EXIT_FAILURE);
    18. }
    19. for( ; ; ){
    20. printf("I am main thread\n");
    21. sleep(1);
    22. }
    23. }
    1. [wz@192 Desktop]$ gcc -o test test.c -lpthread
    2. [wz@192 Desktop]$ ldd test
    3. linux-vdso.so.1 => (0x00007ffce0bb2000)
    4. libpthread.so.0 => /lib64/libpthread.so.0 (0x00007f9ab6dd8000)
    5. libc.so.6 => /lib64/libc.so.6 (0x00007f9ab6a0a000)
    6. /lib64/ld-linux-x86-64.so.2 (0x00007f9ab6ff4000)
    7. [wz@192 Desktop]$ ./test
    8. I am main thread
    9. I am thread1
    10. I am main thread
    11. I am thread1
    12. I am main thread
    13. I am thread1
    1. //一个进程,两个线程(轻量级进程)
    2. [wz@192 ~]$ ps axj | head -1 && ps axj | grep test
    3. PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND
    4. 2976 53194 53194 2976 pts/0 53194 Sl+ 1000 0:00 ./test
    5. 3351 53324 53323 3351 pts/1 53323 S+ 1000 0:00 grep --color=auto test
    6. [wz@192 ~]$ ps -aL | head -1 && ps -aL | grep test
    7. PID LWP TTY TIME CMD
    8. 53194 53194 pts/0 00:00:00 test
    9. 53194 53195 pts/0 00:00:00 test

    线程ID及进程地址空间布局

    • pthread_create函数会产生一个线程ID,存放在第一参数所指向的地址;此线程ID与前面所说的线程ID不是一回事;此前所说的线程ID属于进程调度范畴,因线程是轻量级进程,是OS调度的最小单位,所以需一个数值来唯一标识该线程;
    • pthread_create函数第一个参数指向一个虚拟内存单元,该内存单元地址即为新创建线程的线程ID,属于NPTL线程库范畴;线程库的后续操作,就是根据该线程ID来操作线程的;
    • 线程库NPTL提供了pthread_self函数,可获得线程自身ID;

            pthread_t类型是什么,取决于实现;对于Linux目前实现的NPTL,pthread_t类型的线程ID,本质上是一个进程地址空间的一个地址;

    • Linux没有真正意义上的线程,是用进程模拟的(轻量级进程);
    • Linux本身不会直接提供类似线程创建、终止、等待、分离等相关system call接口,但会提供创建轻量级进程的接口vfork;
    • 但用户需要所谓的线程创建、终止、等待、分离等相关接口,所以系统基于轻量级进程接口模拟封装了用户原生线程库pthread;
    • 进程由PCB管理的,用户层也需进行用户级线程管理(由用户空间维护);
    • 用户层线程ID,本质是一个地址(共享区,pthread库中某个起始位置);

    线程终止

    如需终止某个线程而不是整个进程,有三种方法:

    • 从线程函数return,此方法对主线程不适用,从main函数return相当于调用exit;
    • 线程可调用pthread_exit终止自己;
    • 线程可调用pthread_cancel终止同一进程中的另一个线程;
    void pthread_exit(void* value_ptr);
    • vaule_ptr,不要指向一个局部变量;
    • pthread_exit或return返回的指针指向的内存单元必须是全局或是用malloc分配的,不能在线程函数的栈上分配,因为当其他线程得到这个返回指针时线程函数已经退出;
    int pthread_cancel(pthread_t thread);
    • 成功返回0,失败返回错误码;

    线程等待

    为何需要线程等待:

    • 已退出的线程,其空间没有被释放,仍然在进程的地址空间内;
    • 创建新的线程不会复用刚才退出线程的地址空间;
    int pthread_join(pthread_t thread, void** value_ptr);
    • value_ptr,指向一个指针,然后在指向线程的返回值;
    • 成功返回0,失败返回错误码;

            调用该函数的线程将挂起等待,直到id为thread的线程终止;thread的线程以不同的方法终止,通过pthread_join得到的终止状态也是不同的:

    • 如thread线程通过return返回,value_ptr所指向的单元存放的时thread线程函数的返回值;
    • 如thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数;
    • 如thread线程被别的线程调用pthread_cancel异常终止的,value_ptr所指向的单元存放的是常数PTHREAD_CANCELED((void*)-1);
    • 如对thread线程的终止状态不感兴趣,可对value_ptr传NULL;

    1. #include
    2. #include
    3. #include
    4. #include
    5. void* thread1(void* arg){
    6. printf("thread1 returning ...\n");
    7. int *p = (int*)malloc(sizeof(int));
    8. *p = 1;
    9. return (void*)p;
    10. }
    11. void* thread2(void* arg){
    12. printf("thread1 exiting ...\n");
    13. int *p = (int*)malloc(sizeof(int));
    14. *p = 2;
    15. pthread_exit((void*)p);
    16. }
    17. void* thread3(void* arg){
    18. while(1){
    19. printf("thread3 running ...\n");
    20. sleep(1);
    21. }
    22. return NULL;
    23. }
    24. int main(){
    25. pthread_t tid;
    26. void* ret;
    27. //线程1,return
    28. pthread_create(&tid, NULL, thread1, NULL);
    29. pthread_join(tid, &ret);
    30. printf("thread1 return, thread id %x, return code: %d\n", tid, *(int*)ret);
    31. free(ret);
    32. //线程2,exit
    33. pthread_create(&tid, NULL, thread2, NULL);
    34. pthread_join(tid, &ret);
    35. printf("thread2 return, thread id %x, return code: %d\n", tid, *(int*)ret);
    36. free(ret);
    37. //线程3,cancel by other
    38. pthread_create(&tid, NULL, thread3, NULL);
    39. sleep(3);
    40. pthread_cancel(tid);
    41. pthread_join(tid, &ret);
    42. if(ret == PTHREAD_CANCELED)
    43. printf("thread3 return, thread id %x, return code: PTHREAD_CANCELED\n", tid);
    44. else
    45. printf("thread3 return, thread id %x, return code: NULL\n", tid);
    46. }
    1. [wz@192 Desktop]$ gcc -o test test.c -lpthread
    2. [wz@192 Desktop]$ ./test
    3. thread1 returning ...
    4. thread1 return, thread id 2d3f6700, return code: 1
    5. thread1 exiting ...
    6. thread2 return, thread id 2d3f6700, return code: 2
    7. thread3 running ...
    8. thread3 running ...
    9. thread3 running ...
    10. thread3 return, thread id 2d3f6700, return code: PTHREAD_CANCELED

    线程分离

    • 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄露;
    • 如不关心线程的返回值,join是一种负担,此时可告诉系统,当线程退出时,自动释放线程资源;
    • joinable与detach是冲突的,线程不可既是joinable又是detach;
    1. int pthread_detach(pthread_t thread);
    2. //可是线程组内其他线程对目标线程进行分离,也可是线程自己分离
    3. pthread_detach(pthread_self());
    1. #include
    2. #include
    3. #include
    4. #include
    5. void* thread_run(void* arg){
    6. pthread_detach(pthread_self());
    7. printf("%s\n", (char*)arg);
    8. return NULL;
    9. }
    10. int main(){
    11. pthread_t tid;
    12. if(pthread_create(&tid, NULL, thread_run, "thread run ...\n") != 0){
    13. printf("create thread error\n");
    14. return 1;
    15. }
    16. int ret = 0;
    17. sleep(1); //很重要,要让线程先分离,在等待
    18. if(pthread_join(tid, NULL) == 0){
    19. printf("pthread wait success\n");
    20. ret = 0;
    21. } else{
    22. printf("pthread wait failed\n");
    23. ret = 1;
    24. }
    25. return ret;
    26. }
    1. [wz@192 Desktop]$ gcc -o test test.c -lpthread
    2. [wz@192 Desktop]$ ./test
    3. thread run ...
    4. pthread wait failed

    四,Linux线程互斥

    • 临界资源,多线程执行流共享的资源叫做临界资源;
    • 临界区,每个线程内部,访问临界资源的代码叫做临界区;
    • 互斥,任何时候,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用;
    • 原子性,不会被任何调度机制打断的操作,该操作只有两态,要么完成要么未完成;

    互斥量mutex

    • 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间,这种情况,变量归属单个线程,其他线程无法获得这种变量;
    • 但有时候,很多变量都需要在线程间共享,称为共享变量,可通过数据的共享,完成线程间的交互;
    • 多个线程并发的操作共享变量,会带来一些问题;
    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. int ticket = 100;
    7. void* route(void* arg){
    8. char* id = (char*)arg;
    9. while(1){
    10. if(ticket > 0){
    11. usleep(1000);
    12. printf("%s sells ticket: %d\n", id, ticket);
    13. ticket--;
    14. }
    15. else break;
    16. }
    17. }
    18. int main(){
    19. pthread_t t1, t2, t3, t4;
    20. pthread_create(&t1, NULL, route, "thread1");
    21. pthread_create(&t2, NULL, route, "thread2");
    22. pthread_create(&t3, NULL, route, "thread3");
    23. pthread_create(&t4, NULL, route, "thread4");
    24. pthread_join(t1, NULL);
    25. pthread_join(t2, NULL);
    26. pthread_join(t3, NULL);
    27. pthread_join(t4, NULL);
    28. }
    1. [wz@192 Desktop]$ gcc -o test test.c -lpthread
    2. [wz@192 Desktop]$ ./test
    3. thread1 sells ticket: 100
    4. thread3 sells ticket: 100
    5. thread4 sells ticket: 98
    6. thread2 sells ticket: 100
    7. thread1 sells ticket: 96
    8. thread4 sells ticket: 95
    9. thread2 sells ticket: 94
    10. ...
    11. thread3 sells ticket: 4
    12. thread4 sells ticket: 2
    13. thread1 sells ticket: 2
    14. thread3 sells ticket: 0
    15. thread2 sells ticket: 0
    16. thread4 sells ticket: -2

    ticket--语句通常有三个步骤,绝对不是原子的:

    • ticket从内存到CPU相关寄存器;
    • CPU对ticket--操作;
    • 操作完将ticket值在写回内存;
    • 代码必须要有互斥行为,当代码进入临界区执行时,不允许其他线程进入该临界区;
    • 如多个线程同时要求执行临界区的代码,并且临界区没有线程执行,那么只能允许一个线程进入临界区;
    • 如线程不再临界区中执行,那么该线程不能阻止其他线程进入临界区;

    互斥量接口

    初始互斥量

    • 方法一,静态分配
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
    • 方法二,动态分配
    int pthread_mutex_init(pthread_mutex_t* restrict mutex, const pthread_mutexattr_t* restrict attr);

    销毁互斥量

    • 使用PTHREAD_MUTEX_INITIALIZER初始化的互斥量不需要销毁;
    • 不要销毁一个已经加锁的互斥量;
    • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁;
    int pthread_mutex_destroy(pthread_mutex_t* mutex);

    互斥量加锁/解锁

    1. int pthread_mutex_lock(pthread_mutex_t* mutex);
    2. int pthread_mutex_unlock(pthread_mutex_t* mutex);

    调用pthread_lock时,可能会遇到以下问题:

    • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功;
    • 发起函数调用时,其他线程已锁定互斥量,或存在线程同时申请互斥量,但没有竞争到互斥量,那么调用会阻塞(执行流被挂起),等待互斥量解锁;
    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. int ticket = 100;
    7. pthread_mutex_t mutex;
    8. void* route(void* arg){
    9. char* id = (char*)arg;
    10. while(1){
    11. pthread_mutex_lock(&mutex);
    12. if(ticket > 0){
    13. usleep(1000);
    14. printf("%s sells ticket: %d\n", id, ticket);
    15. ticket--;
    16. pthread_mutex_unlock(&mutex);
    17. }
    18. else{
    19. pthread_mutex_unlock(&mutex);
    20. break;
    21. }
    22. }
    23. }
    24. int main(){
    25. pthread_t t1, t2, t3, t4;
    26. pthread_mutex_init(&mutex, NULL);
    27. pthread_create(&t1, NULL, route, "thread1");
    28. pthread_create(&t2, NULL, route, "thread2");
    29. pthread_create(&t3, NULL, route, "thread3");
    30. pthread_create(&t4, NULL, route, "thread4");
    31. pthread_join(t1, NULL);
    32. pthread_join(t2, NULL);
    33. pthread_join(t3, NULL);
    34. pthread_join(t4, NULL);
    35. pthread_mutex_destroy(&mutex);
    36. }
    1. [wz@192 Desktop]$ gcc -o test test.c -lpthread
    2. [wz@192 Desktop]$ ./test
    3. thread1 sells ticket: 100
    4. thread1 sells ticket: 99
    5. thread1 sells ticket: 98
    6. thread1 sells ticket: 97
    7. thread1 sells ticket: 96
    8. ...
    9. thread1 sells ticket: 5
    10. thread1 sells ticket: 4
    11. thread1 sells ticket: 3
    12. thread1 sells ticket: 2
    13. thread1 sells ticket: 1

    互斥量实现原理

    • 单纯的i++或++i都不是原子的,有可能会有数据一致性问题;
    • 为实现互斥锁操作,大多数体系结构都提供了swap、exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期;

    可重入和线程安全

    • 线程安全,多个线程并发同一段代码,不会出现不同的结果;常见对全局变量或静态变量进行操作,并且没有锁保护的情况下,会出现该问题;
    • 重入,同一函数被不同的执行流调用,当前一个流程还没执行完,就有其他的执行流再次进入,称为重入;一个函数在重入情况下,运行结果不会出现任何不同或任何问题,则该函数称为可重入函数,否则为不可重入函数;

    常见线程不安全情况

    • 不保护共享变量的函数;
    • 函数状态随着被调用,状态发生变化的函数;
    • 返回指向静态变量指针的函数;
    • 调用线程不安全函数的函数;

    常见线程安全的情况

    • 每个线程对全局变量或静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的;
    • 类或接口对于线程来说都是原子操作;
    • 多个线程之间的切换不会导致该接口的执行结果存在二义性;

    常见不可重入情况

    • 调用了malloc/free函数,因malloc函数是全局链表来管理堆的;
    • 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构;
    • 可重入函数体内使用了静态的数据结构;

    常见可重入情况

    • 不使用全局变量或静态变量;
    • 不使用malloc/new开辟的空间;
    • 不调用不可重入函数;
    • 不返回静态或全局数据,所有数据都由函数的调用者提供;
    • 使用本地数据,或通过制作全局数据的本地拷贝来保护全局数据;

    可重入与线程安全联系

    • 函数是可重入的,那就是线程安全的;
    • 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题;
    • 如一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入;

    可重入与线程安全区别

    • 可重入函数是线程安全函数的一种;
    • 线程安全不一定是可重入的,而可重入函数则一定是线程安全的;
    • 如将临界资源访问加锁,则这个函数是线程安全的,如这个重入函数锁还未释放则会产生死锁,因此是不可重入的;

    死锁

            死锁是指在一组进程中各个进程均占有不会释放的资源,但因相互申请被其他进程所占有不会释放的资源而处于一种永久等待的状态;

    四个必要条件

    • 互斥条件,一个资源每次只能被一个执行流使用;
    • 请求与保持条件,一个执行流因请求资源而阻塞时,对已获取的资源保持不放;
    • 不剥夺条件,一个执行流已获得的资源,在未使用完之前,不能强行剥夺;
    • 循环等待条件,若干执行流之间形成一种头尾相接的循环等待资源的关系;

    避免死锁

    • 破坏死锁的四个必要条件;
    • 加锁顺序一致;
    • 避免锁未释放的场景;
    • 资源一次性分配;

    避免死锁算法

    • 死锁检测算法;
    • 银行家算法;

    五,Linux线程同步

            条件变量,描述临界资源的状态,当一个线程互斥地访问每个变量时,可能发现在其他线程改变状态之前,其什么也做不了,如一个线程访问队列时,发现队列为空,只能等待,只到其他线程将一个节点添加到队列中,此时就需要用到条件变量;

            同步,在保证数据安全的情况下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步;

            竞态条件,因为时序问题,而导致程序异常,称为竞态条件;

    条件变量函数

    1. //初始化
    2. int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
    3. attr);
    1. //销毁
    2. int pthread_cond_destroy(pthread_cond_t *cond)
    1. //等待条件满足
    2. int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
    1. //唤醒等待
    2. int pthread_cond_broadcast(pthread_cond_t *cond);
    3. int pthread_cond_signal(pthread_cond_t *cond);
    1. #include
    2. #include
    3. #include
    4. pthread_cond_t cond;
    5. pthread_mutex_t mutex;
    6. void* r1(void* arg){
    7. while(1){
    8. pthread_cond_wait(&cond, &mutex);
    9. printf("active\n");
    10. }
    11. }
    12. void* r2(void* arg){
    13. while(1){
    14. pthread_cond_signal(&cond);
    15. sleep(1);
    16. }
    17. }
    18. int main(){
    19. pthread_t t1, t2;
    20. pthread_cond_init(&cond, NULL);
    21. pthread_mutex_init(&mutex, NULL);
    22. pthread_create(&t1, NULL, r1, NULL);
    23. pthread_create(&t2, NULL, r2, NULL);
    24. pthread_join(t1, NULL);
    25. pthread_join(t2, NULL);
    26. pthread_mutex_destroy(&mutex);
    27. pthread_cond_destroy(&cond);
    28. }
    1. [wz@192 Desktop]$ gcc -o test test.c -lpthread
    2. [wz@192 Desktop]$ ./test
    3. active
    4. active
    5. active
    6. ...

    pthread_cond_wait需要互斥量的原因

    • 条件等待是线程间同步的一种手段,如只有一个线程,条件不满足,一直等下去都不会满足,所有必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程;
    • 条件不会无缘无故的突然变得的满足了,必然牵扯到共享数据的变化,所以一定要用互斥锁来保护,没有互斥锁就无法安全的获取和修改共享数据;

    1. // 错误的设计
    2. pthread_mutex_lock(&mutex);
    3. while (condition_is_false) {
    4. pthread_mutex_unlock(&mutex);
    5. //解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过
    6. pthread_cond_wait(&cond);
    7. pthread_mutex_lock(&mutex);
    8. }
    9. pthread_mutex_unlock(&mutex);
    • 由于解锁和等待不是原子操作,调用解锁后,pthread_cond_wait之前,如已经有其他线程获取到互斥量,摈弃条件满足,发送了信号,那么pthread_cond_wait将错过这个信号,可能导致线程永远阻塞在这个pthread_cond_wait,所以解锁和等待必须是一个原子操作;
    • 进入函数pthread_cond_wait后,会根据条件量是否大于0,等于0把互斥量变为1,直到cond_wait返回,在把条件量改为1,把互斥量恢复成原样;

    条件变量使用规范

    1. //等待条件代码
    2. pthread_mutex_lock(&mutex);
    3. while(条件为假)
    4. pthread_cond_wait(cond, mutex);
    5. 修改条件
    6. pthread_mutex_unlock(&mutex);
    1. //给条件发送信号代码
    2. pthread_mutex_lock(&mutex);
    3. 设置条件为真
    4. pthread_cond_signal(cond);
    5. pthread_mutex_unlock(&mutex);

    六,生产者消费者模型

            此模型是通过一个容器来解决生产者和消费者的强耦合问题;生产者和消费者之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生成完数据后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡生产者和消费者的处理能力;此阻塞队列就是用来给生产者和消费者解耦的;

    • 三种关系
      • 生产者与生产者关系,互斥关系;
      • 消费者与消费者关系,互斥关系;
      • 生产者与消费者关系,互斥与同步关系;
    • 两种角色(线程或进程)
      • 生产者;
      • 消费者;
    • 一种交易场所
      • 通常指的是内存空间,如数组、队列、set、list等;

    优点

    • 解耦;
    • 支持并发;
    • 支持忙闲不均;

    基于BlockingQuene的生产者消费者模型

            在多线程编程中阻塞队列是一种常用于实现生产者和消费者模型的数据结构;其与普通队列的区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出;以上操作均是基于不同的线程来说的,线程在对阻塞队列操作时会被阻塞;

    1. //单生产者、单消费者
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. using namespace std;
    8. #define NUM 8
    9. class BlockQueue{
    10. private:
    11. queue<int> q;
    12. int cap;
    13. pthread_mutex_t lock;
    14. pthread_cond_t full;
    15. pthread_cond_t empty;
    16. private:
    17. void LockQueue(){
    18. pthread_mutex_lock(&lock);
    19. }
    20. void UnlockQueue(){
    21. pthread_mutex_unlock(&lock);
    22. }
    23. void Productwait(){
    24. pthread_cond_wait(&full, &lock);
    25. }
    26. void Consumewait(){
    27. pthread_cond_wait(&empty, &lock);
    28. }
    29. void NotifyProduct(){
    30. pthread_cond_signal(&full);
    31. }
    32. void NotifyConsume(){
    33. pthread_cond_signal(&empty);
    34. }
    35. bool IsEmpty(){
    36. return (q.size() == 0 ? true : false);
    37. }
    38. bool IsFull(){
    39. return (q.size() == cap ? true : false);
    40. }
    41. public:
    42. BlockQueue(int _cap = NUM)
    43. :cap(_cap)
    44. {
    45. pthread_mutex_init(&lock, NULL);
    46. pthread_cond_init(&full, NULL);
    47. pthread_cond_init(&empty, NULL);
    48. }
    49. void PushData(const int& data){
    50. LockQueue();
    51. while(IsFull()){
    52. NotifyConsume();
    53. cout<<"queue full, notify consume data, product stop;"<
    54. Productwait();
    55. }
    56. q.push(data);
    57. UnlockQueue();
    58. }
    59. void PopData(int& data){
    60. LockQueue();
    61. while(IsEmpty()){
    62. NotifyProduct();
    63. cout<<"queue empty, notify product data, consume stop;"<
    64. Consumewait();
    65. }
    66. data = q.front();
    67. q.pop();
    68. UnlockQueue();
    69. }
    70. ~BlockQueue(){
    71. pthread_mutex_destroy(&lock);
    72. pthread_cond_destroy(&full);
    73. pthread_cond_destroy(&empty);
    74. }
    75. };
    76. void* consumer(void* arg){
    77. BlockQueue* bqp = (BlockQueue*)arg;
    78. int data;
    79. for( ; ; ){
    80. bqp->PopData(data);
    81. cout<<"consume data done: "<
    82. }
    83. }
    84. void* producter(void* arg){
    85. BlockQueue* bqp = (BlockQueue*)arg;
    86. srand((unsigned long)time(NULL));
    87. for( ; ; ){
    88. int data = rand()%1024;
    89. bqp->PushData(data);
    90. cout<<"product data done: "<
    91. sleep(1);
    92. }
    93. }
    94. int main(){
    95. BlockQueue bq;
    96. pthread_t c, p;
    97. pthread_create(&c, NULL, consumer, (void*)&bq);
    98. pthread_create(&p, NULL, producter, (void*)&bq);
    99. pthread_join(c, NULL);
    100. pthread_join(p, NULL);
    101. return 0;
    102. }
    1. [wz@192 Desktop]$ g++ -o test test.c -lpthread
    2. [wz@192 Desktop]$ ./test
    3. queue empty, notify product data, consume stop;
    4. product data done: 763
    5. product data done: 391
    6. product data done: 116
    7. product data done: 519
    8. product data done: 611
    9. product data done: 106
    10. product data done: 825
    11. product data done: 58
    12. queue full, notify consume data, product stop;
    13. consume data done: 763
    14. consume data done: 391
    15. consume data done: 116
    16. consume data done: 519
    17. consume data done: 611
    18. consume data done: 106
    19. consume data done: 825
    20. consume data done: 58
    21. queue empty, notify product data, consume stop;

    七,信号量

            POSIX信号量和SystemV信号量作用相同,都是用于同步操作,以到达无冲突的访问共享资源目的;但POSIX可用于线程间同步;

            信号量本质是一个计数器,用来描述临界资源数目的计数器;互斥锁能保护临界资源,条件变量能知道临界资源的状态;如临界资源可看作多份情况下,只要访问的区域不是同一个,即可让多个线程同时访问;任何线程如想访问临界资源中的某一个,一定必须先申请信号量,使用完毕,必须释放信号量;如需先申请信号量资源,前提是所有的线程,必须先看到信号量;信号量本身也是临界资源,其PV操作必须是原子的;

    1. //初始化信号量
    2. #include
    3. int sem_init(sem_t* sem, int pshared, unsigned int value);
    4. //pshared,0表示线程间共享,非零表示进程间共享;
    5. //value,信号量初始值;
    1. //销毁信号量
    2. int sem_destroy(sem_t* sem)
    1. //等待信号量,P()
    2. int sem_wait(sem_t* sem);
    3. //等待信号量会将信号量值减1
    1. //发布信号量,V()
    2. int sem_post(sem_t* sem);
    3. //发布信号量表示资源使用完毕,可以归还资源了,将信号量值加1;

    基于环形队列的生产者消费者模型

    • 环形队列采用数组模拟;
    • 环形结构起始状态和结束状态都是一样的,不好判断空或满,所以可通过加计数器或标记位来判断空或满;另外也可预留一个空的位置,作为满的状态;

     

    1. //单生产单消费,没有直接加锁
    2. //多生产多消费,需加锁
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. using namespace std;
    10. #define NUM 10
    11. class RingQueue{
    12. private:
    13. vector<int> q;
    14. int cap;
    15. int consume_step;
    16. int product_step;
    17. sem_t data_sem;
    18. sem_t space_sem;
    19. public:
    20. RingQueue(int _cap = NUM)
    21. :q(_cap), cap(_cap), consume_step(0), product_step(0)
    22. {
    23. sem_init(&data_sem, 0, 0);
    24. sem_init(&space_sem, 0, cap);
    25. }
    26. void PutData(const int& data){
    27. sem_wait(&space_sem);
    28. q[product_step] = data;
    29. sem_post(&data_sem);
    30. product_step++;
    31. product_step %= cap;
    32. }
    33. void GetData(int& data){
    34. sem_wait(&data_sem);
    35. data = q[consume_step];
    36. sem_post(&space_sem);
    37. consume_step++;
    38. consume_step %= cap;
    39. }
    40. ~RingQueue(){
    41. sem_destroy(&data_sem);
    42. sem_destroy(&space_sem);
    43. }
    44. };
    45. void* consumer(void* arg){
    46. RingQueue* rqp = (RingQueue*)arg;
    47. int data;
    48. for( ; ; ){
    49. rqp->GetData(data);
    50. cout<<"consume data done: "<
    51. sleep(1);
    52. }
    53. }
    54. void* producter(void* arg){
    55. RingQueue* rqp = (RingQueue*)arg;
    56. srand((unsigned long)time(NULL));
    57. for( ; ; ){
    58. int data = rand()%1024;
    59. rqp->PutData(data);
    60. cout<<"product data done: "<
    61. }
    62. }
    63. int main(){
    64. RingQueue* rq = new RingQueue(10);
    65. pthread_t c, p;
    66. pthread_create(&c, NULL, consumer, (void*)rq);
    67. pthread_create(&p, NULL, producter, (void*)rq);
    68. pthread_join(c, NULL);
    69. pthread_join(p, NULL);
    70. }

    八,线程池

            线程池一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能;而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务;这避免了在处理短时间任务时创建与销毁的代价;线程池不仅能够保证内核的充分利用,还能防止过分调度;可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络socket等的数量;

    应用场景:

    • 需要大量的线程来完成任务,且完成任务的时间较短;WEB服务器完成网页请求这样的任务,使用线程池技术就非常合适;因为单个任务小、任务数量巨大(可想象网站点击次数);但对于长时间的任务,如一个Telnet连接请求,线程池的优点就不明显了;因为Telnet会话时间比线程的创建时间大得多;
    • 对性能要求苛刻的应用,如要求服务器迅速响应客户请求;
    • 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用;突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可使内存达到极限,出现错误;
    1. //threadpool.hpp
    2. #ifndef __M_TP_H__
    3. #define __M_TP_H__
    4. #include
    5. #include
    6. #include
    7. #include
    8. #define MAX_THREAD 5
    9. typedef bool (* handler_t)(int);
    10. class ThreadTask{
    11. private:
    12. int _data;
    13. handler_t _handler;
    14. public:
    15. ThreadTask():_data(-1), _handler(NULL){}
    16. ThreadTask(int data, handler_t handler){
    17. _data = data;
    18. _handler = handler;
    19. }
    20. void SetTask(int data, handler_t handler){
    21. _data = data;
    22. _handler = handler;
    23. }
    24. void Run(){
    25. _handler(_data);
    26. }
    27. };
    28. class ThreadPool{
    29. private:
    30. int _thread_max;
    31. int _thread_cur;
    32. bool _tp_quit;
    33. std::queue _task_queue;
    34. pthread_mutex_t _lock;
    35. pthread_cond_t _cond;
    36. private:
    37. void LockQueue(){
    38. pthread_mutex_lock(&_lock);
    39. }
    40. void UnLockQueue(){
    41. pthread_mutex_unlock(&_lock);
    42. }
    43. void WakeUpOne(){
    44. pthread_cond_signal(&_cond);
    45. }
    46. void WakeUpAll(){
    47. pthread_cond_broadcast(&_cond);
    48. }
    49. void ThreadQuit(){
    50. _thread_cur--;
    51. UnLockQueue();
    52. pthread_exit(NULL);
    53. }
    54. void ThreadWait(){
    55. if(_tp_quit)
    56. ThreadQuit();
    57. pthread_cond_wait(&_cond, &_lock);
    58. }
    59. bool IsEmpty(){
    60. return _task_queue.empty();
    61. }
    62. static void* thr_start(void* arg){
    63. ThreadPool* tp = (ThreadPool*) arg;
    64. while(1){
    65. tp->LockQueue();
    66. while(tp->IsEmpty()){
    67. tp->ThreadWait();
    68. }
    69. ThreadTask* tt;
    70. tp->PopTask(&tt);
    71. tp->UnLockQueue();
    72. tt->Run();
    73. delete tt;
    74. }
    75. return NULL;
    76. }
    77. public:
    78. ThreadPool(int max = MAX_THREAD)
    79. :_thread_max(max), _thread_cur(max),_tp_quit(false){
    80. pthread_mutex_init(&_lock, NULL);
    81. pthread_cond_init(&_cond, NULL);
    82. }
    83. ~ThreadPool(){
    84. pthread_mutex_destroy(&_lock);
    85. pthread_cond_destroy(&_cond);
    86. }
    87. bool PoolInit(){
    88. pthread_t tid;
    89. for(int i=0; i<_thread_max; i++){
    90. int ret = pthread_create(&tid, NULL, thr_start, this);
    91. if(ret !=0 ){
    92. std::cout<<"create pool thread error\n";
    93. return false;
    94. }
    95. }
    96. return true;
    97. }
    98. bool PushTask(ThreadTask* tt){
    99. LockQueue();
    100. if(_tp_quit){
    101. UnLockQueue();
    102. return false;
    103. }
    104. _task_queue.push(tt);
    105. WakeUpOne();
    106. UnLockQueue();
    107. return true;
    108. }
    109. bool PopTask(ThreadTask** tt){
    110. *tt = _task_queue.front();
    111. _task_queue.pop();
    112. return true;
    113. }
    114. bool PoolQuit(){
    115. LockQueue();
    116. _tp_quit = true;
    117. UnLockQueue();
    118. while(_thread_cur > 0){
    119. WakeUpAll();
    120. usleep(1000);
    121. }
    122. return true;
    123. }
    124. };
    125. #endif
    1. //main.cpp
    2. #include "threadpool.hpp"
    3. bool handler(int data){
    4. srand(time(NULL));
    5. int n = rand()%5;
    6. printf("Thread: %p Run Tast: %d--sleep %d sec\n", pthread_self(), data, n);
    7. sleep(n);
    8. return true;
    9. }
    10. int main(){
    11. ThreadPool pool;
    12. pool.PoolInit();
    13. int i;
    14. for(i=0; i<10; i++){
    15. ThreadTask* tt = new ThreadTask(i, handler);
    16. pool.PushTask(tt);
    17. }
    18. pool.PoolQuit();
    19. return 0;
    20. }
    1. [wz@192 Desktop]$ g++ -std=c++0x main.cpp -o test -pthread -lrt
    2. [wz@192 Desktop]$ ./test
    3. Thread: 0x7f6e23c8f700 Run Tast: 0--sleep 4 sec
    4. Thread: 0x7f6e25c93700 Run Tast: 4--sleep 4 sec
    5. Thread: 0x7f6e24490700 Run Tast: 1--sleep 0 sec
    6. Thread: 0x7f6e24490700 Run Tast: 5--sleep 4 sec
    7. Thread: 0x7f6e24c91700 Run Tast: 3--sleep 4 sec
    8. Thread: 0x7f6e25492700 Run Tast: 2--sleep 4 sec
    9. Thread: 0x7f6e25c93700 Run Tast: 6--sleep 2 sec
    10. Thread: 0x7f6e24490700 Run Tast: 8--sleep 2 sec
    11. Thread: 0x7f6e23c8f700 Run Tast: 9--sleep 2 sec
    12. Thread: 0x7f6e24c91700 Run Tast: 7--sleep 2 sec

    九,线程安全单例模式

            单例模式是经典常用的设计模式,其特点是某些类只应该具有一个对象(实例)称为单例;

    饿汉方式实现单例模式

    1. //只要通过Singleton这个包装类来使用T对象,则一个进程中只有一个T对象的实例;
    2. template <typename T>
    3. class Singleton{
    4. static T data;
    5. public:
    6. static T* GetInstance(){
    7. return &data;
    8. }
    9. };

    懒汉方式实现单例模式,核心思想是“延时加载”,从而优化服务器启动速度;

    1. //存在一个严重的问题,线程不安全;
    2. //第一次调用GetInstance时,如两个线程同时调用,可能会创建两份T的对象的实例;但后续再次调用,没问题;
    3. template <typename T>
    4. class Singleton{
    5. static T* inst;
    6. public:
    7. static T* GetInstance(){
    8. if(inst == NULL)
    9. inst = new T();
    10. return inst;
    11. }
    12. };

    懒汉方式实现单例模式(线程安全版本)

    1. template <typename T>
    2. class Singleton{
    3. volatile static T* inst; //设置volatile,否则可能会被编译器优化
    4. static std::mutex lock;
    5. public:
    6. static T* GetInstance(){
    7. //双重判定空指针,降低锁冲突的概率,提高性能
    8. //使用互斥锁,保证多线程只能调用异常new
    9. if(inst == NULL){
    10. lock.lock();
    11. if(inst == NULL)
    12. inst = new T();
    13. lock.unlock();
    14. }
    15. return inst;
    16. }
    17. };

    十,STL、智能指针和线程安全

            STL中的容器不是线程安全的,STL设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响;而且对于不同的容器,加锁方式的不同,性能可能也不同(如hash表的锁表和锁桶);因此STL默认不是线程安全的,如需在多线程下使用,往往要调用者自行保证线程安全;

            智能指针,对于unique_ptr,由于只是在当前代码块范围内生效,因此不涉及线程安全问题;对于shared_ptr,多个对象需共用一个引用计数变量,所以会存在线程安全问题,但标准库实现时考虑到了这个问题,基于原子操作的方式保证shared_ptr能够高效、原子的操作引用计数;

    • 悲观锁,在每次取数据时,总是担心数据会被其他线程修改,所以取数据前先加锁(读锁、写锁、行锁等),当其他线程想要范围数据时,被阻塞挂起;
    • 乐观锁,每次取数据时,总是乐观的认为数据不会被其他线程修改,因此不上锁,但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改,只要采用两种方式:版本号机制和CAS操作;
    • CAS操作,当需要更新数据时,判断当前内存值和之前取得的值是否相等,如相等则用新值更新,如不相等则失败,失败则重试,一般是一个自旋的过程,即不断重试;
    • 自旋锁,公平锁、非公平锁;

    十一,读者写者问题

            读写锁,在编写多线程的时候,有一种情况是十分常见的,那就是有些公共数据修改的机会比较少,相比较改写,它们读的机会反而高的多;通常在读的过程中,往往伴随着查找的操作,中间耗时很长;给这种代码段加锁,会极大地降低程序的效率;读写锁就是专门处理这种多读少写的情况;

    读写锁接口

    1. //设置读写优先
    2. int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t* attr, int pref);
    • pref 有三种选择
      • PTHREAD_RWLOCK_PREFER_READER_NP,默认设置,读者优先,可能会导致写者饥饿情况;
      • PTHREAD_RWLOCK_PREFER_WRITER_NP,写者优先,目前有BUG,导致表现行为和上一种选择一致;
      • PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP,写者优先,但写者不能递归加锁;
    1. //初始化
    2. int pthread_rwlock_init(pthread_rwlock_t* restrict rwlock, const pthread_rwlockattr_t* restrict attr);
    1. //销毁
    2. int pthread_rwlock_destroy(pthread_rwlock_t* rwlock);
    1. //加锁和解锁
    2. int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock);
    3. int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock);
    4. int pthread_rwlock_unlock(pthread_rwlock_t* rwlock);
    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. volatile int ticket = 1000;
    9. pthread_rwlock_t rwlock;
    10. void* reader(void* arg){
    11. char* id = (char*) arg;
    12. while(1){
    13. pthread_rwlock_rdlock(&rwlock);
    14. if(ticket <= 0){
    15. pthread_rwlock_unlock(&rwlock);
    16. break;
    17. }
    18. printf("%s: %d\n", id, ticket);
    19. pthread_rwlock_unlock(&rwlock);
    20. usleep(1);
    21. }
    22. return nullptr;
    23. }
    24. void* writer(void* arg){
    25. char* id = (char*)arg;
    26. while(1){
    27. pthread_rwlock_wrlock(&rwlock);
    28. if(ticket <= 0){
    29. pthread_rwlock_unlock(&rwlock);
    30. break;
    31. }
    32. printf("%s: %d\n", id, --ticket);
    33. pthread_rwlock_unlock(&rwlock);
    34. usleep(1);
    35. }
    36. return nullptr;
    37. }
    38. struct ThreadAttr{
    39. pthread_t tid;
    40. std::string id;
    41. };
    42. std::string create_reader_id(std::size_t i){
    43. std::ostringstream oss("thread reader ", std::ios_base::ate);
    44. oss<
    45. return oss.str();
    46. }
    47. std::string create_writer_id(std::size_t i){
    48. std::ostringstream oss("thread writer ", std::ios_base::ate);
    49. oss<
    50. return oss.str();
    51. }
    52. void init_readers(std::vector& vec){
    53. for(std::size_t i=0; isize(); i++){
    54. vec[i].id = create_reader_id(i);
    55. pthread_create(&vec[i].tid, nullptr, reader, (void*)vec[i].id.c_str());
    56. }
    57. }
    58. void init_writers(std::vector& vec){
    59. for(std::size_t i=0; isize(); i++){
    60. vec[i].id = create_writer_id(i);
    61. pthread_create(&vec[i].tid, nullptr, writer, (void*)vec[i].id.c_str());
    62. }
    63. }
    64. void join_threads(std::vector const& vec){
    65. for(std::vector::const_reverse_iterator it = vec.rbegin(); it!=vec.rend(); it++){
    66. pthread_t const& tid = it->tid;
    67. pthread_join(tid, nullptr);
    68. }
    69. }
    70. void init_rwlock(){
    71. #if 0
    72. pthread_rwlockattr_t attr;
    73. pthread_rwlockattr_init(&attr);
    74. pthread_rwlockattr_setkind_np(&attr, PHTREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
    75. pthread_rwlock_init(&rwlock, &attr);
    76. pthread_rwlockattr_destroy(&attr);
    77. #else
    78. pthread_rwlock_init(&rwlock, nullptr);
    79. #endif
    80. }
    81. int main(){
    82. const std::size_t reader_nr = 1000;
    83. const std::size_t writer_nr = 2;
    84. std::vector readers(reader_nr);
    85. std::vector writers(writer_nr);
    86. init_rwlock();
    87. init_readers(readers);
    88. init_writers(writers);
    89. join_threads(writers);
    90. join_threads(readers);
    91. pthread_rwlock_destroy(&rwlock);
    92. }
    1. [wz@192 Desktop]$ g++ -std=c++11 -Wall -Werror main.cpp -o test -lpthread
    2. [wz@192 Desktop]$ ./test
    3. thread reader 259: 1000
    4. thread reader 707: 1000
    5. thread reader 978: 1000
    6. thread reader 498: 1000
    7. thread reader 980: 1000
    8. thread reader 499: 1000
    9. ...

  • 相关阅读:
    Swift data范围截取问题
    如何理解单例模式?
    SpringBoot定时任务 - Spring自带的定时任务是如何实现的?有何注意点?
    沙利文&头豹研报|腾讯安全威胁情报中心TIX位居领导者,创新指数排名第一!
    IDEA Gradle Lombok错误:找不到符号 setter getter方法没有
    “策略+模型“评级流程详析(含数据字典)
    【navicat 密码查看】小技巧navicat 如何查看密码
    第3周学习:ResNet+ResNeXt
    stm32之1602+DHT11+继电器
    react antd 一些问题和要注意的地方
  • 原文地址:https://blog.csdn.net/NapoleonCoder/article/details/132866853