• Linux:线程互斥与同步 | 生产者消费者模型 | 线程伪唤醒、唤醒丢失 | 死锁



    全文约 7578 字,预计阅读时长: 22分钟


    线程互斥

    • 多线程切换的情况下,极有可能出现因为数据交叉操作,而导致临界资源数据不一致问题,因此需要保护临界资源。
    • 进程线程间的互斥相关背景概念:
      • 临界资源:多线程执行流共享的资源就叫做临界资源。
      • 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
      • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区。
      • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
    • 互斥量mutex
      • 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
      • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。多个线程并发的操作共享变量,会带来一些问题。
      • --ticket 操作本身就不是一个原子操作:
        • load :将共享变量ticket从内存加载到寄存器中
        • update : 更新寄存器里面的值,执行-1操作
        • store :将新值,从寄存器写回共享变量ticket的内存地址
    • 要解决以上问题,需要做到三点:
      • 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
      • 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
      • 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
    • 要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量
      在这里插入图片描述
    • 初始化互斥量有两种方法:
      • pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
      • int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);
        • 参数:mutex:要初始化的互斥量;attr:NULL
    • 销毁互斥量需要注意:
      • 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁;不要销毁一个已经加锁的互斥量。
      • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
      • int pthread_mutex_destroy(pthread_mutex_t *mutex);
    • 互斥量加锁和解锁
      • int pthread_mutex_lock(pthread_mutex_t *mutex);
      • int pthread_mutex_unlock(pthread_mutex_t *mutex);
      • 返回值:成功返回0,失败返回错误号
      • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功;
      • 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么 pthread_ lock 调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    int ticket = 100;
    pthread_mutex_t mutex;//创建一个全局的互斥量。
    void *route(void *arg)
    {
    	char *id = (char*)arg;
    	while ( 1 ) 
    	{
    		pthread_mutex_lock(&mutex);
    		if ( ticket > 0 )
    		{
    			usleep(1000);
    			printf("%s sells ticket:%d\n", id, ticket);
    			ticket--;
    			pthread_mutex_unlock(&mutex);
    	    } 
    		else
    	   {
    			pthread_mutex_unlock(&mutex);
    			break;
    	   }
       }
    }
    int main( void )
    {
    	pthread_t t1, t2, t3, t4;
    	pthread_mutex_init(&mutex, NULL);
    	pthread_create(&t1, NULL, route, "thread 1");
    	pthread_create(&t2, NULL, route, "thread 2");
    	pthread_create(&t3, NULL, route, "thread 3");
    	pthread_create(&t4, NULL, route, "thread 4");
    	pthread_join(t1, NULL);
    	pthread_join(t2, NULL);
    	pthread_join(t3, NULL);
    	pthread_join(t4, NULL);
    	pthread_mutex_destroy(&mutex);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    互斥锁原理

      为了实现互斥锁操作,大多数体系结构都提供了swapexchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性。即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。

      cpu里寄存器的数据,是线程独有的上下文数据。mutex 互斥锁本质是内存中的一个内存空间,空间里面的值是1,是可以被所有线程读取并访问的。


    可重入、线程安全

    • 线程安全:描述的是线程之间的关系;以及访问某些函数、数据、区域是否会引起线程问题。
      • 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
    • 可重入:函数的状态。比如,是否可以被多个执行流同时进入,且不会出现问题。
      • 同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
    • 常见的线程安全与不安全的情况:
      • 不安全的情况:不保护共享变量的函数;函数状态随着被调用,状态发生变化的函数;返回指向静态变量指针的函数;调用线程不安全函数的函数。
      • 安全的情况:每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的;类或者接口对于线程来说都是原子操作;多个线程之间的切换不会导致该接口的执行结果存在二义性。
      • 二义性:一个线程正在写入,另一个线程过来取数据,取没取到的结果是不确定的。
    • 常见不可重入和可重入的情况:
      • 不可重入的情况:调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的;调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构;可重入函数体内使用了静态的数据结构。
      • 可重入的情况:不使用全局变量或静态变量;不使用用malloc或者new开辟出的空间;不调用不可重入函数;不返回静态或全局数据,所有数据都有函数的调用者提供;使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据。
    • 可重入与线程安全联系和区别:
      • 联系:函数是可重入的,那就是线程安全的;函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题;如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
      • 区别:可重入函数是线程安全函数的一种;线程安全不一定是可重入的,而可重入函数则一定是线程安全的。如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。

    死锁

    • 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态
    • 死锁四个必要条件:
      • 互斥条件:一个资源每次只能被一个执行流使用
      • 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
      • 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
      • 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系,a,b,c,d
    • 避免死锁:加锁顺序一致;避免锁未释放的场景;资源一次性分配;破坏死锁的四个必要条件。死锁检测算法,银行家算法。

    线程同步

      饥饿问题:一个线程,它的竞争锁的能力特别强,每次申请,都是它又申请到锁。该线程一直在进行申请锁,检测,释放锁;其他线程没有机会拥有锁。因此,当此线程释放锁以后,不能再立即申请,必须排到队列的尾部;在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。

    • 条件变量:当一个线程互斥地访问某个变量时,条件阻塞等待,直到有人唤醒它。
      • 例如一个线程访问队列时,发现队列为空,它只能等待;直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
      • 是一个线程库提供的一个描述临界资源状态的变量对象,不用频繁地通过申请或释放锁的方式,也能达到检测临界资源的目的。
    • 条件变量函数初始化
      • int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrictattr);
      • 参数:cond:要初始化的条件变量;attr:NULL
    • 销毁int pthread_cond_destroy(pthread_cond_t *cond)
    • 等待条件满足int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
      • 参数:cond:要在这个条件变量上等待;
      • mutex:互斥量
        • 陷入阻塞等待时,自动释放锁;唤醒时,自动拥有锁。
    • 唤醒等待
      • 广播式唤醒,唤醒一堆因此条件陷入阻塞的线程:int pthread_cond_broadcast(pthread_cond_t *cond);
      • 唤醒一个:int pthread_cond_signal(pthread_cond_t *cond);
    #include 
    #include 
    #include 
    #include 
    #include 
    
    pthread_cond_t cond;//创建
    pthread_mutex_t mutex;
    void *r1( void *arg )
    {
    	while ( 1 )
    	{
    		pthread_cond_wait(&cond, &mutex);//陷入阻塞等待,自动释放锁
    		printf("活动\n");
    	}
    }
    
    void *r2(void *arg )
    {
    	while ( 1 ) 
    	{
    		pthread_cond_signal(&cond);
    		sleep(1);
    	}
    }
    int main( void )
    {
    	pthread_t t1, t2;
    	pthread_cond_init(&cond, NULL);
    	pthread_mutex_init(&mutex, NULL);
    	
    	pthread_create(&t1, NULL, r1, NULL);
    	pthread_create(&t2, NULL, r2, NULL);
    
    	pthread_join(t1, NULL);
    	pthread_join(t2, NULL);
    
    	pthread_mutex_destroy(&mutex);
    	pthread_cond_destroy(&cond);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 条件变量使用规范
    ---//等待条件代码:
    pthread_mutex_lock(&mutex);
    while (条件为假)
    {
    	pthread_cond_wait(cond, mutex);
    	//唤醒后再检测条件是否满足。
    }
    pthread_mutex_unlock(&mutex);
    
    ---//唤醒代码:
    pthread_mutex_lock(&mutex);
    	.....
    pthread_cond_signal(cond);
    pthread_mutex_unlock(&mutex);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    生产者消费者模型

    • 段共享的内存空间。
    • 种角色:生产者,消费者。进程 或 线程。
    • 种关系:生产者与消费者(同步与互斥的关系),生产者与生产者(互斥关系),消费者与消费者(互斥关系)。
    • 强耦合:消费者来消费,等待生产者生产以后,才能消费;生产者,在没有消费者的同时,也在等待。

      生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。容器的容量应该有上限,当容器不满足生产或消费条件时,对应的线程应该阻塞。

      生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列;消费者不找生产者要数据,而是直接从阻塞队列里取。阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

    • 生产者消费者模型优点:解耦、支持并发、支持忙闲不均
      在这里插入图片描述
    • 开源代码用的都是 hpp ,只要把头文件包含进来,头文件里的方法就可以直接用。
    • hpp :- 输入行参数:const type & ;输出型参数:Type *;输入输出型参数:type &
    #pragma once
    
    #include 
    #include 
    #include 
    
    template <class T>
    class BlockQueue{
        private:
            int cap;
            pthread_mutex_t lock;
          //  pthread_mutex_t p_lock;  //多个生产者
          //  pthread_mutex_t c_lock;	//多个消费者
            pthread_cond_t have_space;  //生产者
            pthread_cond_t have_data;   //消费者
            std::queue<T> bq;  //临界资源
        private:
            bool IsFull()
            {
                return bq.size() == cap ? true : false;
            }
            bool IsEmpty()
            {
                return bq.size() == 0 ? true : false;
            }
        public:
            BlockQueue(int _cap):cap(_cap)
            {
                pthread_mutex_init(&lock, nullptr);
                pthread_cond_init(&have_space, nullptr);
                pthread_cond_init(&have_data, nullptr);
            }
            void Put(const T &in)//生产
            {
            	//p_lock;
                //lock(&p_lock);
                pthread_mutex_lock(&lock);
                while(IsFull())
                {
                    pthread_cond_wait(&have_space, &lock); 
                }
                bq.push(in);
                if(bq.size() >= cap/2)//生产策略
                {
                    pthread_cond_signal(&have_data);
                }
                pthread_mutex_unlock(&lock);
                //unlock(&p_lock);
            }
            void Get(T *out) //消费
            {
    			//c_lock;
                //lock(&c_lock)
                pthread_mutex_lock(&lock);
                while(IsEmpty())
                {
                    pthread_cond_wait(&have_data, &lock);
                }
                *out = bq.front();
                bq.pop();
                pthread_cond_signal(&have_space);
                pthread_mutex_unlock(&lock);
                //unlock(&c_lock);
            }
            ~BlockQueue()
            {
                pthread_mutex_destroy(&lock);
                pthread_cond_destroy(&have_space);
                pthread_cond_destroy(&have_data);
            }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 生产、消费的使用:
    #include "block_queue.hpp"
    #include 
    
    #define NUM 10
    
    void *consumer(void *c)
    {
        BlockQueue<int> *bq = (BlockQueue<int>*)c;
    
        int out = 0;
        while(true){
            bq->Get(&out);
            std::cout << "consumer: " << out << std::endl;
        }
        return nullptr;
    }
    
    void *producter(void *p)
    {
        BlockQueue<int> *bq = (BlockQueue<int>*)p;
    
        int in = 1;
        while(true){
            sleep(1); //消费的慢!
            bq->Put(in);
            in++;
            in %= 100;
            std::cout << "producter: " << in << std::endl;
        }
    
        return nullptr;
    }
    
    int main()
    {
        BlockQueue<int> *bq = new BlockQueue<int>(NUM);
        
        pthread_t c,p;
        pthread_create(&c, nullptr, consumer, bq);
        pthread_create(&p, nullptr, producter, bq);
    
        pthread_join(c, nullptr);
        pthread_join(p, nullptr);
    
        delete bq;
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    伪唤醒

      什么是线程伪唤醒:假设有两个消费者线程C1、C2和一个生产者线程P,线程通信开始前,产品数量为0,当用wait/notify进行线程通信时,C1执行到wait方法,释放锁等待,此时P生产产品唤醒线程C1,但被C2获取锁消费此产品,C2执行完成后释放锁,此时C1获取锁后进行产品的消费。以上过程出现了只有一个产品,但是被两次消费的现象。

      造成伪唤醒的根本原因:是唤醒线程和被唤醒的线程获取锁不是原子操作。在线程被唤醒过程中,如果锁被其他线程抢占执行,等持锁线程执行完后,被唤醒线程获得锁执行,就有可能造成临界资源为0的情况下被过度消费为负数的现象(在生产者消费者模式中)。

      多核CPU的情况下,每个CPU都有自己的缓存信息,条件变量缓存到CPU内部。当通知某个线程条件变量已经就绪,就会把每个CPU内部的数据进行更新,唤醒多个线程;而此之前假如某个线程调用等待函数失败,拥有锁的情况下继续往下走,导致其他被唤醒的进程过度消费或者生产。

    • 唤醒丢失:我们在主线程中开启了两个线程,分别是:唤醒线程和阻塞线程。唤醒线程先启动并调用通知函数,但是阻塞线程还没有开始执行等待函数,如果不再次调用函数通知,等待会一直持续下去。

    寄语

    • 互斥锁全局创建,主线程初始化,销毁;上锁,解锁。
    • 条件变量全局创建,主线程初始化,条件阻塞,唤醒。
  • 相关阅读:
    Hadoop在启动yarn时报错:Cannot set priority of resourcemanager process xxxxx
    yoloV5环境搭建与运行(windows+pytorch+kaggle)
    如何在表格里面添加表单,并且进行表单验证
    鲁棒局部均值分解 (RLMD)附Matlab代码
    (附源码)ssm天天超市购物网站 毕业设计 022101
    egg-jwt的使用
    Django--models.py
    html+css 带图片的搜索框
    什么是 X.509 证书以及它是如何工作的?
    被问到可重入锁条件队列,看这一篇就够了!|原创
  • 原文地址:https://blog.csdn.net/WTFamer/article/details/126173106