• 基于C++11的线程池实现


    1.线程池

    1.1 线程池是什么?

    一种线程管理方式。

    1.2 为什么用线程池?

    线程的创建和销毁都需要消耗系统开销,当线程数量过多,系统开销过大,就会影响缓存局部性和整体性能。而线程池能够在充分利用内核资源的前提下,避免系统资源被过度调用。

    1.3 如何设计线程池?

    简单来说,在线程池中提前创建好多个线程,使用时从线程池中取出,使用完放回线程池。线程池中的线程调度由线程池中的管理者线程调度。

    2.基于C++11的实现

    Talk is cheap. Show me the code.

    直接看程序,原理、函数在后面再介绍。

    2.1 程序

    程序主要分为四个文件,分别为:

    • Task.h //任务类
    • ThreadPool.h //线程池类
    • ThreadPool.cpp //线程池类实现
    • main.cpp //测试程序

    2.1.2 任务类Task.h

    #pragma once
    using callback = void(*)(void*);//函数指针,定义别名
    
    class Task{
    public:
        callback func;//回调任务函数
        void* arg;    //函数参数
    public:
        Task() {                        //无参构造函数
            this->func = nullptr;
            this->arg = nullptr;
        }
        Task(callback func, void* arg) {//含参构造函数
            this->func = func;
            this->arg = arg;
        }
        ~Task() = default;              //析构函数
        Task(const Task &t) = default;  //拷贝构造函数
        Task& operator=(const Task &t); //拷贝赋值操作符
        Task(Task &&t) = default;       //移动构造函数,注意不能有const
        Task& operator=(const Task &&t);//移动赋值操作符
    };
    

    2.1.2 线程池类ThreadPool.h

    #pragma once
    
    #include "Task.h"
    #include <thread>
    #include <queue>
    #include <vector>
    #include <atomic>
    #include <mutex>
    #include <condition_variable>
    
    using namespace std;
    
    class ThreadPool {
    public:
        ThreadPool(int minSize, int maxSize);//构造函数
        void AddTask(Task task);             //添加新任务
        int GetBusyNum();                    //获取当前工作中的线程数
        int GetAliveNum();                   //获取当前活着的线程数
        int GetTaskQueueSize();              //获取当前任务队列长度
        ~ThreadPool();
    
        ThreadPool(const ThreadPool &t) = default;  //拷贝构造函数
        ThreadPool& operator=(const ThreadPool &t); //拷贝赋值操作符
        ThreadPool(ThreadPool &&t) = default;       //移动构造函数
        ThreadPool& operator=(const ThreadPool &&t);//移动赋值操作符
    
    
    private:
        queue<Task> taskQueue; //任务队列
        thread managerID;//管理者线程ID
        vector<thread> threadIDs;//工作中的线程组ID
        int minNum;//最小线程数量(如果线程池中线程的数目过少,处理器的一些核可能就无法充分利用,浪费)
        int maxNum;//最大线程数量(如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。)
        atomic_int busyNum;//工作中的线程数量(atomic_int保证其赋值,取值操作的原子性)
        atomic_int liveNum;//活着的线程数量
        atomic_int exitNum;//将要被销毁的线程数量
    
        mutex mutexPool;//线程池的锁
        condition_variable cond;//条件变量
        bool shutDown;//是不是要销毁线程池, 销毁为true, 不销毁为false
    
        static void worker(void* arg);//工作的线程任务函数
        static void manager(void* arg);//管理者线程任务函数
    
        static const int NUMBER = 2;//管理者线程每次增加/销毁的线程数
    };
    

    2.1.3 线程池类实现ThreadPool.cpp

    #include "ThreadPool.h"
    #include <unistd.h> //pthread_self
    #include <iostream>
    
    using namespace std;
    
    ThreadPool::ThreadPool(int minSize, int maxSize) {
        do{
            minNum = minSize;
            maxNum = maxSize;
            busyNum = 0;
            liveNum = minSize;
            exitNum = 0;
            shutDown = false;
    
            //初始化管理者线程和工作线程组
            managerID = thread(manager, this);
            threadIDs.resize(maxSize);
            for(int i = 0; i < minSize; ++i) {
                threadIDs[i] = thread(worker, this);
            }
    
            return;
        } while(0);//do{...}while(0)结构提高代码健壮性
    }
    
    ThreadPool::~ThreadPool() {
        shutDown = true;
        if(managerID.joinable()) {//阻塞在管理者线程,直到其执行完,再向下进行
            managerID.join();
        }
        cond.notify_all();//唤醒所有等待的线程
        for(int i = 0; i < maxNum; ++i) {//依次执行工作者的线程
            if(threadIDs[i].joinable()) {
                threadIDs[i].join();
            }
        }
    }
    
    //添加新任务
    void ThreadPool::AddTask(Task task) {
        unique_lock<mutex> poolLock(mutexPool);
        if(shutDown) {
            return;
        }
        taskQueue.emplace(task);
        cond.notify_all();
    }
    
    int ThreadPool::GetBusyNum() {
        return busyNum;
    }
    
    int ThreadPool::GetAliveNum() {
        return liveNum;
    }
    
    int ThreadPool::GetTaskQueueSize() {
        unique_lock<mutex> poolLock(mutexPool);
        int queueSize = taskQueue.size();
        poolLock.unlock();
        return queueSize;
    }
    
    //工作者线程
    void ThreadPool::worker(void* arg) {
        ThreadPool* pool = static_cast<ThreadPool*>(arg);
        while(true) {
            unique_lock<mutex> poolLock(pool->mutexPool);
            //若当前任务队列为空且线程池处于开启状态
            while(pool->taskQueue.empty() && !pool->shutDown) {
                pool->cond.wait(poolLock);//阻塞工作线程
                //若存在待销毁线程
                if(pool->exitNum > 0) {
                    --pool->exitNum;
                    if(pool->liveNum > pool->minNum) {//若活着的线程数大于最小线程数,则可以进行销毁
                        --pool->liveNum;
                        cout << "threadID: " << pthread_self() << " has exited." << endl;
                        return;
                    }
                }
            }
    
            //判断线程池是否关闭了
            if(pool->shutDown) {
                cout << "threadID: " << pthread_self() << " has exited." << endl;
                return;
            }
    
            //从任务队列中取出一个任务
            Task task = pool->taskQueue.front();
            pool->taskQueue.pop();
            ++pool->busyNum;
    
             //解锁
            poolLock.unlock();
    
            //执行任务
            cout << "threadID: " << pthread_self() << " start to work." << endl;
            task.func(task.arg);
            task.arg = nullptr;
    
            //执行完后,工作线程数-1
            cout << "threadID: " << pthread_self() << " stop working." << endl;
            --pool->busyNum;
        }
    }
    
    //管理者线程
    void ThreadPool::manager(void* arg) {
        ThreadPool* pool = static_cast<ThreadPool*>(arg);
        while(!pool->shutDown) {
            //每隔3秒检测一次
            sleep(3);
    
            //添加新线程
            //若任务个数大于活着的线程数,且活着的线程数小于最大线程数
            if(pool->GetTaskQueueSize() > pool->liveNum && pool->liveNum < pool->maxNum) {
                unique_lock<mutex> poolLock(pool->mutexPool);
                poolLock.lock();
                int count = 0;
                for(int i = 0; i < pool->maxNum && count < ThreadPool::NUMBER && pool->liveNum < pool->maxNum; ++i) {
                    if(pool->threadIDs[i].get_id() == thread::id()) {
                        cout << "Create a new thread." << endl;
                        pool->threadIDs[i] = thread(worker, pool);
                        ++count;
                        ++pool->liveNum;
                    }
                }
                poolLock.unlock();
            }
    
            //销毁线程
            //若忙的线程*2小于存活的线程数,且存活的线程数大于最小的线程数
            if(pool->busyNum * 2 < pool->liveNum && pool->liveNum > pool->minNum) {
                pool->exitNum = ThreadPool::NUMBER;
                for(int i = 0; i < ThreadPool::NUMBER; ++i) {//让工作的线程自杀
                    pool->cond.notify_all();
                }
            }
        }
    }
    

    2.2 测试方法:

    将上述文件放在Linux下的一个文件夹(我这里是\Share\study_threadPool\myself)

    • 进入该文件夹:cd /share/study_threadPool/myself/
    • 编译:g++ main.cpp ThreadPool.cpp -o ThreadPool.o -pthread
    • 运行:./ThreadPool.o

    2.2 C++11相关函数

    1. thread类
    • ThreadPool.cpp第17行:managerID = thread(manager, this);表示创建一个新线程,manager是该线程执行的函数,this是该线程执行函数的参数。
    • ThreadPool.cpp第29行:managerID.joinable() 判断该线程是否可以join
    • ThreadPool.cpp第30行:managerID.join() 阻塞在该线程,直到其执行完
    • ThreadPool.cpp第123行:pool->threadIDs[i].get_id()表示获取该线程的ID
    1. mutex
    • ThreadPool.cpp第42行:unique_lock<mutex> poolLock(mutexPool); 自动加锁与解锁
    • ThreadPool.cpp第61行:poolLock.unlock();解锁
    • ThreadPool.cpp第120行:poolLock.lock();加锁
    1. condition_variable
    • ThreadPool.cpp第32行:nd.notify_all();唤醒所有等待的线程
    1. atomic
    • ThreadPool.h第34行:atomic_int busyNum;本质还是int,只是每次对其操作时,都能保证是原子操作
    1. using
    • Task.h第2行:sing callback = void(*)(void*);函数的别名

    3.调试过程中出现的问题及解决方法

    3.1 warning:#pragma once in main file

    image
    解决方案:g++编译时不要编译头文件

    3.2 移动构造函数出错

    image
    解决方案:移动构造函数的参数不能加const

    4.参考

  • 相关阅读:
    Opencv实现图像的基本变换
    【Nginx25】Nginx学习:连接限制和请求限制
    前端 JS 经典:apply、call、bind
    IDEA中maven的设置以及相关功能
    数据库数据恢复-Oracle数据库truncate的数据恢复案例
    使用mysql语句对分组结果进行再次筛选
    Vue如何实现快进后退的跑马灯组件
    【C++心愿便利店】No.6---C++之拷贝构造函数
    基于微信小程序的公交信息在线查询系统小程序设计与实现(源码+lw+部署文档+讲解等)
    信号与线性系统分析(吴大正,郭宝龙)(3-单位脉冲/阶跃序列以及4-信号的运算)
  • 原文地址:https://www.cnblogs.com/yunmeng-shi/p/16254042.html