• 【Linux】多线程_7



    九、多线程

    8. POSIX信号量

    POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
    创建多线程的信号量:
    在这里插入图片描述
    销毁多线程之间的信号量:
    在这里插入图片描述
    对信号量做P操作(申请资源):
    在这里插入图片描述
    对信号量做V操作(释放资源):
    在这里插入图片描述

    根据信号量+环形队列的生产者消费者模型代码

    Makefile

    cp_ring:Main.cc
    	g++ -o $@ $^ -std=c++11 -lpthread
    .PHONY:clean
    clean:
    	rm -f cp_ring
    

    Thread.hpp

    #ifndef __THREAD_HPP__
    #define __THREAD_HPP__
    
    #include 
    #include 
    #include 
    #include 
    #include 
    
    namespace ThreadModule
    {
        template<typename T>
        using func_t = std::function<void(T&, const std::string& name)>;
    
        template<typename T>
        class Thread
        {
        public:
            void Excute()
            {
                _func(_data, _threadname);
            }
        public:
            Thread(func_t<T> func, T& data, const std::string &name="none-name")
                : _func(func)
                , _data(data)
                , _threadname(name)
                , _stop(true)
            {}
    
            static void *threadroutine(void *args)
            {
                Thread<T> *self = static_cast<Thread<T> *>(args);
                self->Excute();
                return nullptr;
            }
    
            bool Start()
            {
                int n = pthread_create(&_tid, nullptr, threadroutine, this);
                if(!n)
                {
                    _stop = false;
                    return true;
                }
                else
                {
                    return false;
                }
            }
    
            void Detach()
            {
                if(!_stop)
                {
                    pthread_detach(_tid);
                }
            }
    
            void Join()
            {
                if(!_stop)
                {
                    pthread_join(_tid, nullptr);
                }
            }
    
            std::string name()
            {
                return _threadname;
            }
    
            void Stop()
            {
                _stop = true;
            }
    
            ~Thread() {}
        private:
            pthread_t _tid;
            std::string _threadname;
            T& _data;
            func_t<T> _func;
            bool _stop;
        };
    }
    
    #endif
    

    RingQueue.hpp

    #pragma once
    
    #include 
    #include 
    #include 
    #include 
    
    // 环形队列类模板
    template<class T>
    class RingQueue
    {
    private:
        // 申请资源
        void P(sem_t& sem)
        {
            sem_wait(&sem);
        }
    
        // 释放资源
        void V(sem_t& sem)
        {
            sem_post(&sem);
        }
    
        // 加锁
        void Lock(pthread_mutex_t& mutex)
        {
            pthread_mutex_lock(&mutex);
        }
    
        // 解锁
        void Unlock(pthread_mutex_t& mutex)
        {
            pthread_mutex_unlock(&mutex);
        }
    public:
        RingQueue(int cap)
            :_cap(cap)
            ,_ring_queue(cap)
            ,_prodeucer_step(0)
            ,_consumer_step(0)
        {
            sem_init(&_room_sem, 0, _cap);
            sem_init(&_data_sem, 0, 0);
            pthread_mutex_init(&_prodeucter_mutex, nullptr);
            pthread_mutex_init(&_consumer_mutex, nullptr);
        }
    
        // 生产者的入队列函数
        void Enqueue(const T& in)
        {
            // 申请空间资源
            P(_room_sem);
            // 加锁
            Lock(_prodeucter_mutex);
            // 入队列
            _ring_queue[_prodeucer_step++] = in;
            // 环形,绕一圈
            _prodeucer_step %= _cap;
            // 解锁
            Unlock(_prodeucter_mutex);
            // 释放数据资源
            V(_data_sem);
        }
    
        // 消费者的出队列函数
        void Pop(T* out)
        {
            // 申请数据资源
            P(_data_sem);
            // 加锁
            Lock(_consumer_mutex);
            // 出队列
            *out = _ring_queue[_consumer_step++];
            _consumer_step %= _cap;
            // 解锁
            Unlock(_consumer_mutex);
            // 释放空间资源
            V(_room_sem);
        }
    
        ~RingQueue()
        {
            sem_destroy(&_room_sem);
            sem_destroy(&_data_sem);
            pthread_mutex_destroy(&_prodeucter_mutex);
            pthread_mutex_destroy(&_consumer_mutex);
        }
    private:
        // 数组模拟环形队列
        std::vector<T> _ring_queue;
        // 容量
        int _cap;
        // 生产者和消费者的位置指针
        int _prodeucer_step;
        int _consumer_step;
        // 信号量
        sem_t _room_sem;
        sem_t _data_sem;
        // 互斥锁
        pthread_mutex_t _prodeucter_mutex;
        pthread_mutex_t _consumer_mutex;
    };
    

    Task.hpp

    #pragma once
    
    #include 
    #include 
    
    using Task = std::function<void()>;
    
    void Download()
    {
        std::cout << "Downloading..." << std::endl;
    }
    

    Main.cc

    #include "RingQueue.hpp"
    #include "Thread.hpp"
    #include "Task.hpp"
    #include 
    #include 
    #include 
    
    using namespace ThreadModule;
    // 创建类型别名
    using ringqueue_t = RingQueue<Task>;
    
    // 消费者线程
    void Consumer(ringqueue_t& rq, const std::string& name)
    {
        while (true)
        {
            // 获取任务
            Task t;
            rq.Pop(&t);
            std::cout << "Consumer " << name << " : ";
            // 执行任务
            t();
        }
    }
    
    // 生产者线程
    void Productor(ringqueue_t& rq, const std::string& name)
    {
        while (true)
        {
            // 发布任务
            rq.Enqueue(Download);
            std::cout << "Productor " << name << " : " << "Download task" << std::endl;
            sleep(1);
        }
    }
    
    // 启动线程
    void InitComm(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq, func_t<ringqueue_t> func)
    {
        for (int i = 0; i < num; i++)
        {
            // 创建一批线程
            std::string name = "thread-" + std::to_string(i + 1);
            threads->emplace_back(func, rq, name);
        }
    }
    
    // 创建消费者线程
    void InitConsumer(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
    {
        InitComm(threads, num, rq, Consumer);
    }
    
    // 创建生产者线程
    void InitProductor(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
    {
        InitComm(threads, num, rq, Productor);
    }
    
    // 等待所有线程结束
    void WaitAllThread(std::vector<Thread<ringqueue_t>>& threads)
    {
        for (auto& thread : threads)
        {
            thread.Join();
        }
    }
    
    // 启动所有线程
    void StartAll(std::vector<Thread<ringqueue_t>>& threads)
    {
        for (auto& thread : threads)
        {
            thread.Start();
        }
    }
    
    int main()
    {
        // 创建阻塞队列,容量为5
        ringqueue_t* rq = new ringqueue_t(10);
        // 创建线程
        std::vector<Thread<ringqueue_t>> threads;
        // 创建 1个消费者线程
        InitConsumer(&threads, 1, *rq);
        // 创建 1个生产者线程
        InitProductor(&threads, 1, *rq);
        // 启动所有线程
        StartAll(threads);
    
        // 等待所有线程结束
        WaitAllThread(threads);
    
        return 0;
    }
    

    结果演示

    在这里插入图片描述
    这里演示的是单生产者单消费者的模型,可以在主函数改成多生产者多消费者的模型。


    未完待续

  • 相关阅读:
    八股文第九天
    l8-d9 UDP通信实现
    JavaScript LocalStorage 完整指南
    在Ubuntu上用sane api实现通用扫描功能
    mount /dev/mapper/centos-root on sysroot failed处理
    Ubuntu下MySQL无法启动和访问的问题解决与修复
    科研经验-文件的可视化管理(Endnote)
    html+css+javascript+jquery+bootstarp响应式旅行社旅游平台网站模板(14页)
    MFC-对话框
    线性DP题目汇总(持续更新)
  • 原文地址:https://blog.csdn.net/m0_69828905/article/details/140431340