• Linux知识点 -- Linux多线程(四)


    Linux知识点 – Linux多线程(四)


    一、线程池

    1.概念

    一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

    • 预先申请资源,用空间换时间;
    • 预先申请一批线程,任务到来就处理;
    • 线程池就是一个生产消费模型;

    2.实现

    thread.hpp
    线程封装:

    #pragma once
    
    #include
    #include
    #include
    #include
    
    typedef void* (*fun_t)(void*); // 定义函数指针类型,后面回调
    
    class ThreadData  // 线程信息结构体
    {
    public:
        void* _args;
        std::string _name;
    };
    
    class Thread
    {
    public:
        Thread(int num, fun_t callback, void* args)
            : _func(callback)
        {
            char nameBuffer[64];
            snprintf(nameBuffer, sizeof(nameBuffer), "Thread-%d", num);
            _name = nameBuffer;
    
            _tdata._args = args;
            _tdata._name = _name;
        }
    
        void start() // 创建线程
        {
            pthread_create(&_tid, nullptr, _func, (void*)&_tdata); // 直接将_tdata作为参数传给回调函数
        }
    
        void join() // 线程等待
        {
            pthread_join(_tid, nullptr);
        }
    
        std::string name()
        {
            return _name;
        }
    
        ~Thread()
        {}
    
    private:
        std::string _name;
        fun_t _func;
        ThreadData _tdata;
        pthread_t _tid;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    lockGuard.hpp
    锁的封装,构建对象时直接加锁,对象析构时自动解锁;

    #pragma once
    
    #include 
    #include 
    
    class Mutex
    {
    public:
        Mutex(pthread_mutex_t *mtx)
            : _pmtx(mtx)
        {
        }
    
        void lock()
        {
            pthread_mutex_lock(_pmtx);
        }
    
        void unlock()
        {
            pthread_mutex_unlock(_pmtx);
        }
    
        ~Mutex()
        {}
    
    private:
        pthread_mutex_t *_pmtx;
    };
    
    
    class lockGuard
    {
    public:
        lockGuard(pthread_mutex_t *mtx)
            : _mtx(mtx)
        {
            _mtx.lock();
        }
    
        ~lockGuard()
        {
            _mtx.unlock();
        }
    private:
        Mutex _mtx;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    log.hpp

    #pragma once
    
    #include
    #include
    #include
    #include
    #include
    
    //日志级别
    #define DEBUG 0
    #define NORMAL 1
    #define WARNING 2
    #define ERROR 3
    #define FATAL 4
    
    const char* gLevelMap[] = {
        "DEBUG",
        "NORMAL",
        "WARNING",
        "ERROR",
        "FATAL"
    };
    
    #define LOGFILE "./threadpool.log"
    
    //完整的日志功能,至少需要:日志等级 时间 支持用户自定义(日志内容,文件行,文件名)
    
    void logMessage(int level, const char* format, ...)
    {
    #ifndef DEBUG_SHOW
        if(level == DEBUG) return;
    #endif
    
        char stdBuffer[1024];//标准部分
        time_t timestamp = time(nullptr);
        snprintf(stdBuffer, sizeof(stdBuffer), "[%s] [%ld] ", gLevelMap[level], timestamp);
    
        char logBuffer[1024];//自定义部分
        va_list args;
        va_start(args, format);
        vsnprintf(logBuffer, sizeof(logBuffer), format, args);
        va_end(args);
    
        FILE* fp = fopen(LOGFILE, "a");
        fprintf(fp, "%s %s\n", stdBuffer, logBuffer);
        fclose(fp);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 注:
      (1)提取可变参数
      在这里插入图片描述
      使用宏来提取可变参数:
      在这里插入图片描述
      将可变参数格式化打印到对应地点:
      在这里插入图片描述
      format是打印的格式;
      在这里插入图片描述
      (2)条件编译:
      在这里插入图片描述
      条件编译,不想调试的时候,就不加DEBUG宏,不打印日志信息;
      在这里插入图片描述
      -D:在命令行定义宏 ;

    threadPool.hpp

    线程池封装:

    #include "thread.hpp"
    #include 
    #include 
    #include 
    #include "log.hpp"
    #include "Task.hpp"
    #include "lockGuard.hpp"
    
    const int g_thread_num = 3;
    
    template <class T>
    class ThreadPool
    {
    public:
        pthread_mutex_t *getMutex()
        {
            return &_lock;
        }
    
        bool isEmpty()
        {
            return _task_queue.empty();
        }
    
        void waitCond()
        {
            pthread_cond_wait(&_cond, &_lock);
        }
    
        T getTask()
        {
            T t = _task_queue.front();
            _task_queue.pop();
            return t;
        }
    
        ThreadPool(int thread_num = g_thread_num)
            : _num(thread_num)
        {
            pthread_mutex_init(&_lock, nullptr);
            pthread_cond_init(&_cond, nullptr);
            for (int i = 1; i <= _num; i++)
            {
                _threads.push_back(new Thread(i, routine, this));
                // 线程构造传入的this指针,是作为ThreadData结构体的参数的,ThreadData结构体才是routine回调函数的参数
                // 由于由于回调函数是静态成员,无法访问非静态成员
                // 这里传入this指针是用来在回调函数中给访问非静态成员的
    
            }
        }
    
        void run()
        {
            for (auto &iter : _threads)
            {
                iter->start();
                logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
            }
        }
    
        // 消费过程:线程调用回调函数取任务就是所谓的消费过程,访问了临界资源,需要加锁
        static void *routine(void *args) //由于这个函数是类内成员,参数是有this指针的,参数类型不对,因此多线程回调的时候无法识别
                                         //需要设置成static静态成员,才可以回调
        {
            ThreadData *td = (ThreadData *)args;
            ThreadPool<T> *tp = (ThreadPool<T> *)td->_args; // 拿到this指针,通过本对象的this指针来调用成员函数
            while (true)
            {
                T task;
                {
                    lockGuard lockguard(tp->getMutex());
                    while (tp->isEmpty())
                    {
                        tp->waitCond();
                    }
                    // 读取任务
                    task = tp->getTask();
                    // 任务队列是共享的,将任务从共享空间,拿到私有空间
                }
                task(td->_name); // 处理任务
            }
        }
    
        void pushTask(const T &task)
        {
            lockGuard lockguard(&_lock); // 访问临界资源,需要加锁
            _task_queue.push(task);
            pthread_cond_signal(&_cond); // 推送任务后,发送信号,让进程处理
        }
    
        ~ThreadPool()
        {
            for (auto &iter : _threads)
            {
                iter->join();
                delete iter;
            }
            pthread_mutex_destroy(&_lock);
            pthread_cond_destroy(&_cond);
        }
    
    private:
        std::vector<Thread *> _threads; // 线程池
        int _num;
        std::queue<T> _task_queue; // 任务队列
        pthread_mutex_t _lock;     // 锁
        pthread_cond_t _cond;      // 条件变量
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 注:
      (1)如果回调函数routine放在thread类里面,由于成员函数会默认传this指针,因此参数识别的时候可能会出错,所以需要设置成静态成员;在这里插入图片描述
      在这里插入图片描述
      (2)如果设置成静态类内方法,这个函数只能使用静态成员,而不能使用其他类内成员;
      可以让routine函数拿到整体对象,在构造线程的时候,routine的参数传入this指针;

      在这里插入图片描述
      在构造函数的初始化列表中是参数的初始化,在下面的函数体中是赋值的过程,因此在函数体中对象已经存在了,就可以使用this指针了;
      (3)类内公有接口让静态成员函数routine通过this指针能够访问类内成员;
      在这里插入图片描述
      testMain.cc
    #include"threadPool.hpp"
    #include"Task.hpp"
    #include
    #include
    #include
    #include
    
    int main()
    {
        srand((unsigned long)time(nullptr) ^ getpid());
    
        ThreadPool<Task>* tp = new ThreadPool<Task>();
        tp->run();
    
        while(true)
        {
            //生产的时候,只做任务要花时间
            int x = rand()%100 + 1;
            usleep(7756);
            int y = rand()%30 + 1;
            Task t(x, y, [](int x, int y)->int{
                return x + y;
            });
    
            logMessage(DEBUG, "制作任务完成:%d+%d=?", x, y);
    
            //推送任务到线程池中
            tp->pushTask(t);
    
            sleep(1);
        }
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    运行结果:
    在这里插入图片描述

    3.单例模式的线程池

    threadPool.hpp

    #include "thread.hpp"
    #include 
    #include 
    #include 
    #include "log.hpp"
    #include "Task.hpp"
    #include "lockGuard.hpp"
    
    const int g_thread_num = 3;
    
    template <class T>
    class ThreadPool
    {
    public:
        pthread_mutex_t *getMutex()
        {
            return &_lock;
        }
    
        bool isEmpty()
        {
            return _task_queue.empty();
        }
    
        void waitCond()
        {
            pthread_cond_wait(&_cond, &_lock);
        }
    
        T getTask()
        {
            T t = _task_queue.front();
            _task_queue.pop();
            return t;
        }
    
    //单例模式线程池:懒汉模式
    private:
        //构造函数设为私有
        ThreadPool(int thread_num = g_thread_num)
            : _num(thread_num)
        {
            pthread_mutex_init(&_lock, nullptr);
            pthread_cond_init(&_cond, nullptr);
            for (int i = 1; i <= _num; i++)
            {
                _threads.push_back(new Thread(i, routine, this));
                // 线程构造传入的this指针,是作为ThreadData结构体的参数的,ThreadData结构体才是routine回调函数的参数
            }
        }
    
        ThreadPool(const ThreadPool<T> &other) = delete;
        const ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete;
    
    public:
        //创建单例对象的类内静态成员函数
        static ThreadPool<T>* getThreadPool(int num = g_thread_num)
        {
            //在这里再加上一个条件判断,可以有效减少未来必定要进行的加锁检测的问题
            //拦截大量的在已经创建好单例的时候,剩余线程请求单例而直接申请锁的行为
            if(nullptr == _thread_ptr)
            {
                //加锁
                lockGuard lockguard(&_mutex);
                //未来任何一个线程想要获取单例,都必须调用getThreadPool接口
                //一定会存在大量的申请锁和释放锁的行为,无用且浪费资源
                if(nullptr == _thread_ptr)
                {
                    _thread_ptr = new ThreadPool<T>(num);
                }
            }
            return _thread_ptr;
        }
    
        void run()
        {
            for (auto &iter : _threads)
            {
                iter->start();
                logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
            }
        }
    
        // 消费过程:线程调用回调函数取任务就是所谓的消费过程,访问了临界资源,需要加锁
        static void *routine(void *args)
        {
            ThreadData *td = (ThreadData *)args;
            ThreadPool<T> *tp = (ThreadPool<T> *)td->_args; // 拿到this指针
            while (true)
            {
                T task;
                {
                    lockGuard lockguard(tp->getMutex());
                    while (tp->isEmpty())
                    {
                        tp->waitCond();
                    }
                    // 读取任务
                    task = tp->getTask();
                    // 任务队列是共享的,将任务从共享空间,拿到私有空间
                }
                task(td->_name); // 处理任务
            }
        }
    
        void pushTask(const T &task)
        {
            lockGuard lockguard(&_lock); // 访问临界资源,需要加锁
            _task_queue.push(task);
            pthread_cond_signal(&_cond); // 推送任务后,发送信号,让进程处理
        }
    
        ~ThreadPool()
        {
            for (auto &iter : _threads)
            {
                iter->join();
                delete iter;
            }
            pthread_mutex_destroy(&_lock);
            pthread_cond_destroy(&_cond);
        }
    
    private:
        std::vector<Thread *> _threads; // 线程池
        int _num;
        std::queue<T> _task_queue; // 任务队列
    
        static ThreadPool<T>* _thread_ptr;
        static pthread_mutex_t _mutex;
    
        pthread_mutex_t _lock;     // 锁
        pthread_cond_t _cond;      // 条件变量
    };
    
    //静态成员在类外初始化
    template<class T>
    ThreadPool<T>* ThreadPool<T>::_thread_ptr = nullptr;
    
    template<class T>
    pthread_mutex_t ThreadPool<T>::_mutex = PTHREAD_MUTEX_INITIALIZER;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141

    在这里插入图片描述
    在这里插入图片描述
    多线程同时调用单例过程,由于创建过程是非原子的,有可能被创建多个对象,是非线程安全的;
    需要对创建对象的过程加锁,就可以保证在多线程场景当中获取单例对象;
    但是未来任何一个线程想调用单例对象,都必须调用这个成员函数,就会存在大量申请和释放锁的行为;
    可以在之间加一个对单例对象指针的判断,若不为空,就不进行对象创建;

    在这里插入图片描述
    testMain.cc

    #include"threadPool.hpp"
    #include"Task.hpp"
    #include
    #include
    #include
    #include
    
    int main()
    {
        srand((unsigned long)time(nullptr) ^ getpid());
    
        //ThreadPool* tp = new ThreadPool();
        //tp->run();    
        ThreadPool<Task>::getThreadPool()->run();//创建单例对象
    
        
    
        while(true)
        {
            //生产的时候,只做任务要花时间
            int x = rand()%100 + 1;
            usleep(7756);
            int y = rand()%30 + 1;
            Task t(x, y, [](int x, int y)->int{
                return x + y;
            });
    
            logMessage(DEBUG, "制作任务完成:%d+%d=?", x, y);
    
            //推送任务到线程池中
            ThreadPool<Task>::getThreadPool()->pushTask(t);
    
            sleep(1);
        }
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    运行结果:
    在这里插入图片描述

    二、STL、智能指针和线程安全

    1.STL的容器是否是线程安全的

    不是;
    原因是, STL的设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响;
    而且对于不同的容器,加锁方式的不同,性能可能也不同(例如hash表的锁表和锁桶)。
    因此STL默认不是线程安全。如果需要在多线程环境下使用,往往需要调用者自行保证线程安全。

    2.智能指针是否是线程安全的

    对于unique_ ptr,由于只是在当前代码块范围内生效,因此不涉及线程安全问题;
    对于shared_ptr,多个对象需要共用一个引用计数变量,所以会存在线程安全问题.但是标准库实现的时候考虑到了这个问题,基于原子操作(CAS)的方式保证shared_ptr 能够高效,原子的操作弓|用计数;

    三、其他常见的各种锁

    • 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等) ,当其他线程想要访问数据时,被阻塞挂起;
    • 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作;
      CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试;
    • 自旋锁
      临界资源就绪的时间决定了线程等待的策略;
      不断检测资源是否就绪就是自旋(轮询检测);
      自旋锁本质就是通过不断检测锁状态,来检测资源是否就绪的方案

      在这里插入图片描述
      互斥锁是检测到资源未就绪,就挂起线程;
      临界资源就绪的时间决定了使用哪种锁;

    四、读者写者问题

    1.读写锁

    在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少,相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门]处理这种多读少写的情况呢?有,那就是读写锁。

    • 读者写者模型与生产消费模型的本质区别:
      生产消费模型中消费者会取走数据,而读者写者模型中读者不会取走数据;

    • 读锁的优先级高

    2.读写锁接口

    • 初始化:
      在这里插入图片描述

    • 读者加锁:
      在这里插入图片描述

    • 写者加锁:

    在这里插入图片描述
    生产消费模型中,生产者和消费者的地位是对等的,这样才能达到最高效的状态
    而读写者模型中,写者只有在读者全部退出的时候才能写,是读者优先的,这样就会发生写者饥饿问题;
    读者写者问题中读锁的优先级高,是因为这种模型的应用场景为:数据的读取频率非常高,而被修改的频率特别低,这样有助于提升效率;

  • 相关阅读:
    【Linux详解】——环境变量
    每日一题——LeetCode1496.判断路径是否相交
    服务和协议的关系?
    1.用递归求一个正整数的逆序数
    C/C++ 有 1 、 2 、 3 、 4 个数字,能组成多少个互不相同且无重复数字的三位数?都是多少?
    基于PaddlePaddle平台训练物体分类——猫狗分类
    LeetCode-2760. 最长奇偶子数组-滑动窗口暴力
    蓝桥杯 奇偶覆盖 模拟
    Spring 注册 Bean 在配置中的定义和使用 Autowired
    SpringBoot3快速入门
  • 原文地址:https://blog.csdn.net/kissland96166/article/details/132488021