• Linux线程同步(上)


    线程同步概念

    在这里插入图片描述

    线程同步指的是协调多个线程之间的执行顺序和访问共享资源的方式,以确保数据的一致性和程序的正确性。同步的主要目的是防止数据竞争、不一致性和并发冲突。在多线程或多进程环境中,多个执行单元可能并发地访问共享数据,而同步机制可以用来协调它们的行为,例如使用互斥访问来确保在任何给定时间只有一个线程或进程可以访问共享资源,以避免竞争条件和数据污染。通过协作来让多个线程相互通信和等待特定条件的发生,例如,等待其他线程完成某个任务或等待某个事件的触发。

    竞态条件

    竞态条件是多线程或多进程编程中的一种并发问题,指的是程序的行为受到线程或进程执行顺序的影响,从而导致不一致的结果。竞态条件通常发生在多个执行单元试图同时访问共享资源时,导致不可预测的结果。竞态条件的特征包括:

    • 共享资源:多个线程或进程共享某种资源,例如变量、文件、内存区域等。

    • 并发访问:这些线程或进程同时试图对共享资源进行读取或写入操作,而不经过适当的同步。

    • 不确定性:由于线程执行顺序不确定,因此无法预测哪个线程将首先访问共享资源,导致不一致的结果。

    竞态条件可能导致各种问题,包括数据不一致、程序崩溃、内存泄漏以及其他未定义的行为。为避免竞态条件,需要采取适当的同步措施,如使用互斥锁、条件变量、信号量等,以确保在任何给定时间只有一个线程可以访问共享资源,这样可以保证数据的一致性和程序的正确性。而理解竞态条件和采取适当的同步措施对于多线程和多进程编程至关重要,以避免潜在的并发问题和确保程序的可靠性。

    条件变量

    条件变量是多线程编程中的同步工具,用于实现线程之间的协作和等待特定条件的发生,而不是忙等待。(忙等待是一种并发编程中的同步技术,其中线程在等待某个条件变为真之前不断轮询检查条件,而不是进入休眠状态。)条件变量允许一个或多个线程等待某个条件的满足,当条件满足时,等待的线程可以被唤醒。

    条件变量的使用通常与互斥锁一起使用,以确保在等待条件和发出信号之间的线程安全。条件变量的使用可以提高多线程应用程序的效率,因为它允许线程在等待条件时不浪费CPU时间。然而,使用条件变量需要小心设计,以避免死锁和竞态条件。如下是条件变量的基本操作和特性:

    • 初始化条件变量:条件变量通常通过pthread_cond_t类型的变量进行初始化。这可以使用pthread_cond_init函数来完成。

    • 等待条件:线程可以使用pthread_cond_wait函数来等待条件的满足。在等待时,线程会释放与之关联的互斥锁,并陷入等待状态。

    • 通知条件:某个线程或多个线程可以通过pthread_cond_signal或pthread_cond_broadcast函数来通知等待条件的线程。前者唤醒一个等待线程,而后者唤醒所有等待线程。

    • 释放条件变量:使用pthread_cond_destroy函数来释放条件变量资源。

    条件变量初始化和销毁

    条件变量的初始化可以使用POSIX 线程库提供的函数 pthread_cond_init 来初始化,而条件变量的销毁可以使用同样是POSIX 线程库下提供的函数 pthread_cond_destroy 函数来进行销毁。函数原型如下:

    在这里插入图片描述
    pthread_cond_init 函数的参数 cond是指向要初始化的条件变量的指针。attr 参数是条件变量的属性,通常可以将其设置为 NULL,表示使用默认属性。如果需要自定义属性,可以创建一个 pthread_condattr_t 类型的属性对象,然后将其传递给 attr 参数。

    条件变量的初始化除了使用 pthread_cond_init 函数进行动态初始化外,也可以进行静态初始化,静态初始化通过使用宏 PTHREAD_COND_INITIALIZER 来实现。这个宏用于创建并初始化一个条件变量,以默认属性开始。在静态初始化后无需调用 pthread_cond_init 函数来手动初始化条件变量,而是可以在声明时进行初始化。静态初始化后可以直接在程序中使用,但要注意,静态初始化的条件变量只能在声明时进行初始化,不能在之后的代码中重新初始化或销毁。

    pthread_cond_destroy 函数的参数 cond 是指向要销毁的条件变量的指针。在销毁条件变量之后会释放其占用的系统资源。通常情况下,在不再需要条件变量时,应该调用这个函数来确保资源被正确释放,一旦条件变量被销毁,应该避免再次初始化或使用它。

    条件变量等待

    pthread_cond_wait 是 POSIX 线程库提供的函数之一,用于在线程中等待条件的满足,同时释放互斥锁,以避免忙等待。这个函数通常与条件变量一起使用,以实现线程之间的协作和等待特定条件的发生。该函数原型如下:

    在这里插入图片描述
    参数 cond 是指向条件变量的指针,等待线程将在这个条件变量上等待条件的发生。参数 mutex 是指向互斥锁的指针,等待线程在等待之前会释放该互斥锁,允许其他线程访问共享资源。pthread_cond_wait 的典型用法如下:

    • 线程获取互斥锁以保护共享资源。
    • 线程检查某个条件是否满足,如果条件不满足,它会调用 pthread_cond_wait 来等待条件的发生,并在等待期间释放互斥锁。
    • 当其他线程满足条件时,它们可以调用 pthread_cond_signal 或 pthread_cond_broadcast 来通知等待的线程。
    • 被通知的线程被唤醒,重新获取互斥锁,然后重新检查条件。

    这个机制允许线程有效地等待特定条件的发生,在等待期间会释放互斥锁,而不会占用 CPU 资源,这有助于减少忙等待和提高多线程程序的效率。

    条件变量唤醒

    对线程可以使用 pthread_cond_signal 函数来唤醒等待在特定条件变量上的一个线程。具体来说,pthread_cond_signal 的作用是通知等待在条件变量上的一个线程,使其从等待状态唤醒,以便它可以继续执行。除了使用pthread_cond_signal 函数外,还可以使用 pthread_cond_broadcast 函数进行等待线程唤醒,但是需要注意的是,pthread_cond_signal 只唤醒等待在条件变量上的一个线程。而 pthread_cond_broadcast 函数是唤醒等待在特定条件变量上的所有线程,以便它们可以继续执行。另外,pthread_cond_signal 必须在互斥锁的保护下调用,以确保线程安全。

    pthread_cond_signal 和 pthread_cond_broadcast 函数都是 POSIX 线程库中的函数,函数原型如下:
    在这里插入图片描述

    pthread_cond_signal 的参数 cond为指向条件变量的指针,即要进行操作的条件变量。该函数如果成功,函数返回0;否则,返回一个非零的错误码。这个函数不会阻塞,它执行很快,仅用于唤醒等待的线程。通常需要在获得互斥锁的情况下调用 pthread_cond_signal 以确保线程安全。这是因为条件变量的操作通常伴随着对共享资源的操作,需要在互斥锁的保护下执行。

    pthread_cond_broadcast 的参数 cond 为指向条件变量的指针,即要进行操作的条件变量。该函数如果成功,函数返回0;否则,返回一个非零的错误码。与 pthread_cond_signal 类似,这个函数也是不会阻塞,也需要在获得互斥锁的情况下调用 pthread_cond_broadcast 以确保线程安全。pthread_cond_broadcast 适用于那些需要一次性通知多个等待线程的情况,例如,多个线程等待某个全局状态的变化。pthread_cond_broadcast 会唤醒所有等待在条件变量上的线程,如果没有线程在等待,它不会有任何效果,也不会引发错误。

    示例代码

    我们可以设计这样一个示例代码来测试条件变量,先创建两个线程,分别执行threadRun1和threadRun2方法,threadRun1方法为循环3次,每次都是进行等待,而threadRun2方法也是循环3次,每次都是唤醒。如下代码:

    pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    
    void *threadRun1(void *args)
    {
        char *name = static_cast<char *>(args);
        for(int i = 0; i < 3; i++)
        {
            pthread_cond_wait(&cond, &mtx); // 等待时自动释放锁,结束时自动申请锁
            cout << name << " run....." << endl;
        }
    }
    
    void *threadRun2(void *args)
    {
        char *name = static_cast<char *>(args);
        for(int i = 0; i < 3; i++)
        {
            cout << name << " wakeup" << endl;
            pthread_cond_signal(&cond);
            sleep(1);
        }
    }
    
    int main()
    {
        pthread_t t1, t2;
        pthread_create(&t1, nullptr, threadRun1, (void *)"Threadt1-->");
        pthread_create(&t2, nullptr, threadRun2, (void *)"Threadt2-->");
    
        pthread_join(t1, nullptr);
        pthread_join(t2, nullptr);
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    在这里插入图片描述

    条件变量与互斥锁

    按照上面所说,条件变量的使用是需要与互斥锁来搭配使用,可是为什么需要与互斥锁来搭配使用呢?难道互斥锁会唱、跳、rap、篮球???

    在这里插入图片描述
    其实不然,与互斥锁一起使用是因为条件变量通常与共享数据相关联。在多线程编程中,互斥锁的作用是保护共享数据,防止多个线程同时访问,从而避免竞争条件。为了避免竞争条件,通常需要使用互斥锁来保护共享资源的访问。当一个线程获得了互斥锁,其他线程就不能同时获得该锁,从而确保了共享资源的独占性。如以下是使用条件变量的典型模式:

    1. 线程在互斥锁的保护下检查条件,如果条件不满足,则调用 pthread_cond_wait 进入等待状态,同时会释放互斥锁,允许其他线程访问共享资源。

    2. 当另一个线程修改了共享资源,使得条件满足时,它会发送信号(使用 pthread_cond_signal 或 pthread_cond_broadcast),唤醒一个或多个等待的线程。

    3. 被唤醒的线程在重新获取互斥锁后,继续执行,并检查条件,如果条件满足,则继续执行相应的操作。

    使用互斥锁保护共享资源的访问,而条件变量提供了一种等待和唤醒的机制,确保在合适的时机等待线程被唤醒,从而避免了竞争条件。因此,pthread_cond_wait 函数需要与互斥锁一起使用,以确保线程安全和正确性。

    在这里插入图片描述

    由此可以知道,在使用条件变量的时候应该先加锁,然后再进行判断,如果不满足则解锁挂起等待。另一个线程就应该先加锁,然后修改条件为满足再唤醒等待线程,最后解锁。因此有如下代码:

    pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    int tickets = 0;
    void *threadRun1(void *args)
    {
        char *name = static_cast<char *>(args);
        while(1)
        {
            pthread_mutex_lock(&mtx);
            if(tickets <= 0)
            {
                pthread_cond_wait(&cond, &mtx);
            }
            else
            {
                cout << name << " get tickets : " << tickets-- << endl;
            }
            pthread_mutex_unlock(&mtx);
        }
    }
    
    void *threadRun2(void *args)
    {
        char *name = static_cast<char *>(args);
        while(1)
        {
            pthread_mutex_lock(&mtx);
            cout << name << " push a tickets : " << tickets++ << endl;
            pthread_cond_signal(&cond);
            pthread_mutex_unlock(&mtx);
            sleep(1);
        }
    }
    
    int main()
    {
        pthread_t t1, t2;
        pthread_create(&t1, nullptr, threadRun1, (void *)"Threadt1-->");
        pthread_create(&t2, nullptr, threadRun2, (void *)"Threadt2-->");
    
        pthread_join(t1, nullptr);
        pthread_join(t2, nullptr);
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    在这里插入图片描述

    生产者消费者模型

    概念

    生产者消费者模型是多线程编程中的一个经典同步问题,通常涉及一个生产者线程生产数据并将其放入共享缓冲区,以及一个消费者线程从共享缓冲区中取出数据进行处理。这个模型的目标是确保生产者和消费者之间的协同工作,以避免竞争条件并确保数据的正确处理。

    在这里插入图片描述

    优点

    生产者-消费者模型在多线程编程中有多个优点,包括:

    • 解耦生产者和消费者:该模型允许生产者和消费者线程分离开来,它们不需要直接协调或了解彼此的存在。这提供了更好的模块化,允许并行开发和维护。

    • 平衡生产和消费:生产者和消费者的速度通常不一致。使用合适的缓冲区大小和条件变量,可以平衡它们之间的速度,以避免资源浪费和性能下降。

    • 避免竞争条件:通过使用互斥锁和条件变量,可以有效地避免竞争条件。这确保了多个线程能够安全地访问共享资源,而不会导致数据损坏或不一致。

    • 提高并发性:允许多个生产者和消费者同时运行,以充分利用多核处理器和多线程能力,提高程序的并发性和性能。

    • 提高可维护性:使用明确的同步机制(如互斥锁和条件变量)来实现生产者-消费者模型,使代码更易于理解和维护。这种模块化的设计有助于减少错误和简化调试。

    总之,生产者-消费者模型是一种有助于有效同步多线程的经典模式,它提供了一种可维护、可扩展且高效的方法来处理共享资源的访问,从而提高了多线程应用的正确性和性能。

    基于BlockingQueue的生产者消费者模型

    在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

    #pragma once
    
    #include 
    #include 
    #include 
    
    template <class T>
    class BlockQueue
    {
    public:
        BlockQueue(const size_t cap = 5) : _cap(cap)
        {
            pthread_mutex_init(&_mutex, nullptr);
            pthread_cond_init(&_consumer_cond, nullptr);
            pthread_cond_init(&_productor_cond, nullptr);
        }
        void push(const T &in)
        {
            pthread_mutex_lock(&_mutex);
            while (isFull())
            {
                pthread_cond_wait(&_productor_cond, &_mutex);//等待自动释放锁,出来自动申请锁
            }
            _q.push(in);
            pthread_cond_signal(&_consumer_cond);
            pthread_mutex_unlock(&_mutex);
        }
        void pop(T *out)
        {
            pthread_mutex_lock(&_mutex);
            while (isEmpty())
            {
                pthread_cond_wait(&_consumer_cond, &_mutex);
            }
            *out = _q.front();
            _q.pop();
            pthread_cond_signal(&_productor_cond);
            pthread_mutex_unlock(&_mutex);
        }
        ~BlockQueue()
        {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_consumer_cond);
            pthread_cond_destroy(&_productor_cond);
        }
        bool isFull()
        {
            return _cap == _q.size();
        }
        bool isEmpty()
        {
            return _q.size() == 0;
        }
    private:
        std::queue<T> _q;
        int _cap;
        pthread_mutex_t _mutex;
        pthread_cond_t _consumer_cond;
        pthread_cond_t _productor_cond;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    #include "blockQueue.hpp"
    #include "Task.hpp"
    #include 
    #include 
    #include 
    
    void* consumer(void* args)
    {
        BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);
        while(1)
        {
            //usleep(120000);
            int data = 0;
            bq->pop(&data);
            std::cout << "comsumer data : " << data << std::endl;
        }
    }
    
    void* productor(void* args)
    {
        BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);
        while(1)
        {
            int data = rand() % 10 + 1;
            bq->push(data);
            std::cout << "productor data : " << data << std::endl;  
            sleep(1);     
        }
    }
    
    int main()
    {
        srand(time(nullptr));
        BlockQueue<int> *bq = new BlockQueue<int>();
        pthread_t c, p;
        pthread_create(&c, nullptr, consumer, bq);
        pthread_create(&p, nullptr, productor, bq);
    
        pthread_join(c, nullptr);
        pthread_join(p, nullptr);
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    消费者快,生产者慢:

    在这里插入图片描述

    生产者快,消费者慢:

    在这里插入图片描述

    总结

    文章介绍了线程同步的一些基本概念,讲述了维持线程同步的条件变量,并对条件变量的初始化,销毁,等待和唤醒函数进行介绍,以简易的代码查看现象,还分析了条件变量和互斥锁的关系,最后用条件变量与互斥锁实现了一个单生产单消费的生产者消费者模型。总的来说,正确的线程同步可以确保多线程程序的正确性和可靠性,防止竞态条件和数据不一致问题的发生。码文不易,如果文章对你有帮助的话,就请点一个👍支持一下作者呗。

    在这里插入图片描述

  • 相关阅读:
    【CVPR 2021】Cylinder3D:用于LiDAR点云分割的圆柱体非对称3D卷积网络
    推荐计划为什么有效?需要推荐计划的10个原因
    GitLab数据迁移后出现500错误
    Git入门到精通(大全)
    为什么短时傅里叶变换无法实现小波等优秀时频方法对时频分布的提取效果?
    排序:堆排序算法分析以及插入删除操作
    Windows又又又更新?Win 12开发将于下个月开始
    2、k8s 集群安装
    go 流程控制之switch 语句介绍
    python脚本(渗透测试)
  • 原文地址:https://blog.csdn.net/wzh18907434168/article/details/133777484