• 线程池实现


    一、线程池介绍

    1)应用场景

    当并发数很多的时候,并且每个线程执行时间很短的任务,这样就会频繁创建线程,而这样的频繁创建和销毁线程会大大降低系统的执行效率。对于这种场景我们可以使用线程池来复用之前创建的线程,降低线程的频繁创建和销毁工作,达到提高执行效率的目的。

    2)线程池原理

    线程池使用者往线程池任务队列里面添加任务,线程池会根据任务的多少来自动创建或销毁工作线程取执行任务,即当任务数量比较多而线程池比较少处于忙不过来的状态时,线程池就会自动创建线程,而当仍务数量比较少而空闲线程比较多时,线程池就会自动销毁一部分空闲线程。其中任务队列、线程池使用者和工作线程组成一个生产者消费者模型,线程池使用者(消费者)检查队列已满就阻塞,否则就向任务队列添加任务并通知工作线程(消费者)取任务执行,而工作线程(消费者)取任务之后也会向线程池使用者(生产者)发送通知解阻塞。

    3)线程池结构

    线程池由任务队列工作线程管理线程三部分组成,他们的所用分别如下。

    • 任务队列
      • 负责保存要执行的任务(一般每个任务就是一个回调函数);
      • 线程池使用者(生产者)往任务队列里面添加任务,并通知工作线程(消费者)取任务执行;
      • 工作线程(消费者)从任务队列里面获取到任务后,需要把该任务从队列中删除;
    • 工作线程
      • 负责执行任务队列里面的任务;
      • 当任务队列没有任务时,工作线程便自动睡眠防止占用CPU资源;
      • 当由任务时唤醒工作线程,从队列中取任务执行(从队列中取出任务后,如果生产者此时阻塞的话可以通知生产者解阻塞);
    • 管理线程
      • 负责控制工作线程的数量;
      • 当空闲的工作线程数量比较多时,就销毁一部分线程;
      • 当队列任务比较多而工作线程比较少时,新创建一部分线程;

    线程池结构图

    二、程序实现

    1)C语言实现

    threadPool.h

    #ifndef _THREAD_POOL_
    #define _THREAD_POOL_
    
    typedef struct ThreadPool ThreadPool;
    
    // 创建并初始化线程池
    ThreadPool* threadPoolCreate(int queueSize, int minNum, int maxNum);
    
    // 销毁线程池
    void threadPoolDestory(ThreadPool* pool);
    
    // 往线程池添加任务
    int threadPoolAdd(ThreadPool* pool, void (*handler)(void* arg), void* arg);
    
    // 获取线程池当前工作线程数
    int threadPoolWorkNum(ThreadPool* pool);
    
    // 获取线程池当前存活线程数
    int threadPoolLiveNum(ThreadPool* pool);
    
    #endif // _THREAD_POOL_
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    threadPool.c

    #include "threadPool.h"
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    #define CHAGNUM 4
    
    void* worker(void *arg);
    void* manager(void *arg);
    void threadExit(ThreadPool* pool);
    
    typedef struct Task {
        void (*handler)(void* arg);
        void* arg;
    }Task;
    
    struct ThreadPool {
        Task* taskQ;
        int qCapacity;
        int qSize;
        int qFront;
        int qBack;
    
        pthread_t manageID;
        pthread_t* workIDs;
        int maxNum;			// 最大线程数量
        int minNum;			// 最小线程数量
        int workNum;		// 正在执行任务的工作线程数量
        int liveNum;        // 当前已创建的的工作线程数量
        int exitNum;        // 需要销毁退出的线程数量
    
        pthread_mutex_t mutexPool;
        pthread_mutex_t mutexWork; // 锁workNum变量
        pthread_cond_t hasTask;      // 任务队列是否有任务
        pthread_cond_t isFull;       // 任务队列是否已满
    
        int isDestory; // 线程池是否销毁
    };
    
    ThreadPool* threadPoolCreate(int queueSize, int minNum, int maxNum)
    {
        int i, res = 0;
    
        // 创建线程池对象
        ThreadPool* tPool = (ThreadPool*)malloc(sizeof(struct ThreadPool));
        if (tPool == NULL) {
            perror("tPool malloc:");
            goto err;
        }
    
        // 创建任务队列
        tPool->taskQ = (Task*)malloc(sizeof(struct Task) * queueSize);
        if (tPool->taskQ == NULL) {
            perror("taskQ malloc:");
            goto err;
        }
        tPool->qSize = 0;
        tPool->qCapacity = queueSize;
        tPool->qFront = tPool->qBack = 0;
    
        // 创建存储工作线程ID的数组
        tPool->workIDs = (pthread_t*)malloc(sizeof(pthread_t) * maxNum);
        if (tPool->workIDs == NULL) {
            perror("workIDs malloc:");
            goto err;
        }
        memset(tPool->workIDs, 0, sizeof(pthread_t) * maxNum);
        tPool->maxNum = maxNum;
        tPool->minNum = minNum;
        tPool->workNum = 0;
        tPool->liveNum = minNum;
        tPool->exitNum = 0;
    
        tPool->isDestory = 0;
        // 初始化互斥量和条件变量
        if (pthread_mutex_init(&tPool->mutexPool, NULL) != 0 ||
            pthread_mutex_init(&tPool->mutexWork, NULL) != 0 ||
            pthread_cond_init(&tPool->isFull, NULL) != 0 ||
            pthread_cond_init(&tPool->hasTask, NULL) != 0) {
            printf("mutex or cond init fail...\n");
            goto err;
        }
        
    
        // 创建工作线程
        for (i = 0; i < minNum; i++) {
            res = pthread_create(&tPool->workIDs[i], NULL, worker, tPool);
            if (res != 0) {  
                printf("thread create failed for worker, errno: %d, idx: %d\n", res, i);
                goto err;
            }
        }
    
        // 创建管理线程
        res = pthread_create(&tPool->manageID, NULL, manager, tPool);
        if (res != 0) {
            printf("thread create failed for manager, errno: %d\n", res);
            goto err;
        }
    
        return tPool;
    
    err:
        if (tPool && tPool->taskQ) {
            free(tPool->taskQ);
            tPool->taskQ = NULL;
        }
    
        if (tPool && tPool->workIDs) {
            free(tPool->workIDs);
            tPool->workIDs = NULL;
        }
    
        if (tPool) {
            free(tPool);
        }
        return NULL;
    }
    
    void* worker(void *arg)
    {
        Task task;
        ThreadPool* pool = (ThreadPool*)arg;
    
        while(1) {
            pthread_mutex_lock(&pool->mutexPool);
    
            // 队列为空就阻塞当前线程,避免占用CPU
            while(pool->qSize == 0 && !pool->isDestory) {
                pthread_cond_wait(&pool->hasTask, &pool->mutexPool);
                
                // 减少空闲线程
                if (pool->exitNum > 0) {
                    pool->exitNum--;
                    if (pool->liveNum > pool->minNum) {
                        pool->liveNum--;
                        pthread_mutex_unlock(&pool->mutexPool);
                        threadExit(pool);
                    }
                }
            }
    
            // 销毁线程池
            if (pool->isDestory) {
                pool->liveNum--;
                pthread_mutex_unlock(&pool->mutexPool);
                threadExit(pool);
            }
    
            // 取一个任务执行
            task.arg = pool->taskQ[pool->qFront].arg;
            task.handler = pool->taskQ[pool->qFront].handler;
            pool->qFront = (pool->qFront + 1) % pool->qCapacity;
            pool->qSize--;
            pthread_cond_signal(&pool->isFull);
            pthread_mutex_unlock(&pool->mutexPool);
    
            pthread_mutex_lock(&pool->mutexWork);
            pool->workNum++;
            pthread_mutex_unlock(&pool->mutexWork);
            task.handler(task.arg);
            if (task.arg) {  // 释放资源 或者 用户在回调函数中释放这里就不释放了
                free(task.arg);
                task.arg = NULL;
            }
    
            pthread_mutex_lock(&pool->mutexWork);
            pool->workNum--;
            pthread_mutex_unlock(&pool->mutexWork);
        }
    
        return NULL;
    }
    
    void* manager(void *arg)
    {
        int i = 0, incNum = CHAGNUM;
        ThreadPool* pool = (ThreadPool*)arg;
    
        while(!pool->isDestory) {
            sleep(3);
            pthread_mutex_lock(&pool->mutexPool);
            int queueSize = pool->qSize;
            int liveNum = pool->liveNum;
            pthread_mutex_unlock(&pool->mutexPool);
    
            pthread_mutex_lock(&pool->mutexWork);
            int workNum = pool->workNum;
            pthread_mutex_unlock(&pool->mutexWork);
    
            // 数据处理不过来要增加线程
            if (queueSize > liveNum) {
                pthread_mutex_lock(&pool->mutexPool);
                for(i = 0; i < pool->maxNum && incNum > 0; i++) {
                    if (pool->workIDs[i] == 0) {
                        pthread_create(&pool->workIDs[i], NULL, worker, pool);
                        incNum--;
                        pool->liveNum++;
                        printf("new thread %ld, liveNum = %d, workNum = %d\n",
                            pool->workIDs[i], pool->liveNum, pool->workNum);
                    }
                }
                pthread_mutex_unlock(&pool->mutexPool);
            }
            
            // 空闲线程多了要销毁
            if(workNum * 2 < liveNum &&
                liveNum - CHAGNUM > pool->minNum) {
    
                pthread_mutex_lock(&pool->mutexPool);
                pool->exitNum = CHAGNUM;
                pthread_mutex_unlock(&pool->mutexPool);
    
                for (i = 0; i < CHAGNUM; i++) {
                    pthread_cond_signal(&pool->hasTask);
                }
            }
        }
        return NULL;
    }
    
    int threadPoolAdd(ThreadPool* pool, void (*handler)(void* arg), void* arg)
    {
        pthread_mutex_lock(&pool->mutexPool);
        while(pool->qSize == pool->qCapacity && !pool->isDestory)  {
            pthread_cond_wait(&pool->isFull, &pool->mutexPool);
        }
        if (pool->isDestory) {
            pthread_mutex_unlock(&pool->mutexPool);
            return -1;
        }
    
        pool->taskQ[pool->qBack].arg = arg;
        pool->taskQ[pool->qBack].handler = handler;
        pool->qBack = (pool->qBack + 1) % pool->qCapacity;
        pool->qSize++;
        pthread_cond_signal(&pool->hasTask); // 通知空闲的工作线程取任务执行
        pthread_mutex_unlock(&pool->mutexPool);
        return 0;
    }
    
    void threadExit(ThreadPool* pool)
    {
        int i;
        pthread_t tid = pthread_self();
        for(i = 0; i < pool->maxNum; i++) {
            if (pool->workIDs[i] == tid) {
                pool->workIDs[i] = 0;
                break;
            }
        }
        printf("thread %ld exit, liveNum = %d, workNum = %d\n",
            tid, pool->liveNum, pool->workNum);
        pthread_exit(0);
    }
    
    int threadPoolWorkNum(ThreadPool* pool)
    {
        int workNum;
    
        pthread_mutex_lock(&pool->mutexWork);
        workNum = pool->workNum;
        pthread_mutex_unlock(&pool->mutexWork);
    
        return workNum;
    }
    
    int threadPoolLiveNum(ThreadPool* pool)
    {
        int liveNum;
    
        pthread_mutex_lock(&pool->mutexPool);
        liveNum = pool->liveNum;
        pthread_mutex_unlock(&pool->mutexPool);
    
        return liveNum;
    }
    
    void threadPoolDestory(ThreadPool* pool)
    {
        int i;
    
        if (pool == NULL) {
            return;
        }
        pool->isDestory = 1;
        // 销毁管理线程
        pthread_join(pool->manageID, NULL);
    
        // 销毁工作线程
        for (i = 0; i < pool->maxNum; i++) {
            if (pool->workIDs[i] > 0) {
                pthread_cond_signal(&pool->hasTask);
            }
        }
        for (i = 0; i < pool->maxNum; i++) {
            if (pool->workIDs[i] > 0) {
                pthread_join(pool->workIDs[i], NULL);
            }
        }
    
        pthread_mutex_destroy(&pool->mutexPool);
        pthread_mutex_destroy(&pool->mutexWork);
        pthread_cond_destroy(&pool->hasTask);
    
        if (pool->workIDs) {
            free(pool->workIDs);
            pool->workIDs = NULL;
        }
        if (pool->taskQ) {
            free(pool->taskQ);
            pool->taskQ = NULL;
        }
        free(pool);
        printf("thread pool destory...\n");
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320

    main.c

    #include "threadPool.h"
    #include 
    #include 
    #include 
    #include 
    
    void myTest(void *arg)
    {
        printf("tid: %ld, num = %d\n", pthread_self(), *(int *)arg);
        sleep(3);
    }
    
    int main()
    {
        int i;
        ThreadPool *pool = threadPoolCreate(20, 4, 10);
        for (i = 0; i < 40; i++) {
            int* num = (int *)malloc(sizeof(int));
            *num = i;
            threadPoolAdd(pool, myTest, num);
        }
        sleep(10);
        threadPoolDestory(pool);
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    2)C++实现

    threadPool.h

    #ifndef _THREADPOOL_H
    #define _THREADPOOL_H
    #include 
    #include 
    #include 
    #include 
    
    struct Task  {
        void (*handler)(void*);
        void* arg = nullptr;
    };
    
    class TaskQueue {
    public:
        TaskQueue();
        ~TaskQueue();
    
        // 添加任务
        void addTask(Task& task);
        void addTask(void (*handler)(void*), void* arg);
    
        // 取出任务
        Task getTask();
    
        // 获取任务数
        inline int getTaskNum();
    
    private:
        pthread_mutex_t m_lock;
        std::queue m_que;
    };
    
    class ThreadPool {
    public:
        ThreadPool(int max, int min);
        ~ThreadPool();
    
        // 添加任务
        void addTask(Task task);
    
        // 获取工作线程数
        int getWorkNum();
    
        // 获取存活线程数
        int getLiveNum();
    
    private:
        static void* worker(void* arg);
        static void* manager(void* arg);
    
        void threadExit();
    
    private:
        TaskQueue m_taskQ;
        int m_maxNum;
        int m_minNum;
        int m_workNum;
        int m_liveNum;
        int m_exitNum;
        static const int m_changeNum = 2;
    
        bool m_isDestory = false;
    
        pthread_t m_managerTid;
        pthread_t* m_workTids;
    
        pthread_cond_t m_hasTask;
        pthread_mutex_t m_lock; // 锁m_workNUm、m_liveNum、m_exitNum变量
    };
    
    #endif // _THREADPOOL_H
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    threadPool.cpp

    #include "threadPool.h"
    #include 
    
    ThreadPool::ThreadPool(int max, int min)
    {
        int i;
        m_maxNum = max;
        m_minNum = min;
        m_workNum = 0;
        m_liveNum = min;
        m_exitNum = 0;
    
        if (pthread_cond_init(&m_hasTask, nullptr) != 0
            || pthread_mutex_init(&m_lock, nullptr) != 0) {
                std::cout << "cond or mutex init fail..." << std::endl;
                return;
        }
    
        m_workTids = new pthread_t[m_maxNum];
        if(m_workTids == nullptr) {
            std::cout << "m_workTids malloc failed..." << std::endl;
        }
        memset(m_workTids, 0, sizeof(pthread_t) * m_maxNum);
        
        // 创建工作线程
        for (i = 0; i < m_minNum; i++) {
            pthread_create(&m_workTids[i], nullptr, worker, this); 
            std::cout << "worker thread " << m_workTids[i] << " created" << std::endl;
        }
    
        // 创建管理线程
        pthread_create(&m_managerTid, nullptr, manager, this);
    }
    
    ThreadPool::~ThreadPool()
    {
        m_isDestory = true;
    
        pthread_join(m_managerTid, nullptr);
    
        pthread_cond_broadcast(&m_hasTask);
    
        for (int i = 0; i < m_maxNum; i++) {
            if (m_workTids[i] != 0) {
                pthread_join(m_workTids[i], nullptr);
                std::cout << "thread i = " << i << " tid = " << m_workTids[i] << " exit..." << std::endl;
                m_workTids[i] = 0;
            }
        }
    
        pthread_mutex_destroy(&m_lock);
        pthread_cond_destroy(&m_hasTask);
    
        if (m_workTids) {
            delete []m_workTids;
        }
        std::cout << "liveNum = "<< m_liveNum <<", workNum = "<< m_workNum <<", queSize = " << this->m_taskQ.getTaskNum() << std::endl;
    }
    
    void* ThreadPool::worker(void* arg)
    {
        ThreadPool* pool = static_cast(arg);
    
        while(1) {
            pthread_mutex_lock(&pool->m_lock);
            while(pool->m_taskQ.getTaskNum() == 0 && !pool->m_isDestory) {
                std::cout << "thread " << pthread_self() << " waitting..." << std::endl;
                pthread_cond_wait(&pool->m_hasTask, &pool->m_lock);
                // 空闲线程退出
                if (pool->m_exitNum > 0) {
                    pool->m_exitNum--;
                    if(pool->m_liveNum > pool->m_minNum) { 
                        pool->m_liveNum--;
                        pthread_mutex_unlock(&pool->m_lock);
                        pool->threadExit();
                    }
                }
            }
    
            // 销毁线程池
            if (pool->m_isDestory) {
                pool->m_liveNum--;
                pthread_mutex_unlock(&pool->m_lock);
                pthread_exit(0);  // 这里不调用threadExit是让主线程好回收资源
            }
            // 取任务执行
            Task task = pool->m_taskQ.getTask();
            pool->m_workNum++;
            pthread_mutex_unlock(&pool->m_lock);
            task.handler(task.arg);  // 用户自己取释放arg内存
            pthread_mutex_lock(&pool->m_lock);
            pool->m_workNum--;
            pthread_mutex_unlock(&pool->m_lock);
    
        }
        return nullptr;
    }
    
    void* ThreadPool::manager(void* arg)
    {
        ThreadPool* pool = static_cast(arg);
    
        while(!pool->m_isDestory) {
            sleep(3);
    
            int liveNum;
            int taskNum;
            int workNum;
            int i, incNum = pool->m_changeNum;
    
            pthread_mutex_lock(&pool->m_lock);
            liveNum = pool->m_liveNum;
            workNum = pool->m_workNum;
            taskNum = pool->m_taskQ.getTaskNum();
            pthread_mutex_unlock(&pool->m_lock);
    
            // 任务太多忙不过来需要创建线程
            if(!pool->m_isDestory && taskNum > liveNum && liveNum < pool->m_maxNum) {
                for (i = 0; i < pool->m_maxNum && incNum > 0 ; i++) {
                    pthread_mutex_lock(&pool->m_lock);
                    if (pool->m_workTids[i] == 0) {
                        pool->m_liveNum++;
                        incNum--;
                        pthread_create(&pool->m_workTids[i], NULL, worker, pool);
                        std::cout << "new thread " << pool->m_workTids[i] << " created" << std::endl;
                    }
                    pthread_mutex_unlock(&pool->m_lock);
                }
            }
    
            // 销毁多余的空闲线程
            incNum = pool->m_changeNum;
            if (!pool->m_isDestory && workNum * 2 < liveNum && liveNum > pool->m_minNum) {
                pthread_mutex_lock(&pool->m_lock);
                pool->m_exitNum = pool->m_changeNum;
                pthread_mutex_unlock(&pool->m_lock);
                while (incNum--) {
                    pthread_cond_signal(&pool->m_hasTask);
                }
            }
        }
        return nullptr;
    }
    
    void ThreadPool::addTask(Task task)
    {
        if (m_isDestory) {
            return;
        }
        pthread_mutex_lock(&m_lock);
        m_taskQ.addTask(task);
        pthread_mutex_unlock(&m_lock);
        pthread_cond_signal(&m_hasTask);
    }
    
    void ThreadPool::threadExit()
    {
        for (int i = 0; i < m_maxNum; i++) {
            if (m_workTids[i] == pthread_self()) {
                std::cout << "thread " << m_workTids[i] << " exit..." << std::endl;
                pthread_mutex_lock(&m_lock);
                m_workTids[i] = 0;
                pthread_mutex_unlock(&m_lock);
                pthread_exit(0);
            }
        }
    }
    
    int ThreadPool::getWorkNum()
    {
        int workNum = 0;
        pthread_mutex_lock(&m_lock);
        workNum = m_workNum;
        pthread_mutex_unlock(&m_lock);
        return m_workNum;
    }
    
    int ThreadPool::getLiveNum()
    {
        int liveNum = 0;
        pthread_mutex_lock(&m_lock);
        liveNum = m_liveNum;
        pthread_mutex_unlock(&m_lock);
        return liveNum;
    }
    
    TaskQueue::TaskQueue() 
    {
        pthread_mutex_init(&m_lock, NULL);
    }
    
    TaskQueue::~TaskQueue() 
    {
        pthread_mutex_destroy(&m_lock);
    }
    
    void TaskQueue::addTask(Task& task)
    {
        pthread_mutex_lock(&this->m_lock);
        m_que.push(task);
        pthread_mutex_unlock(&this->m_lock);
    }
    
    void TaskQueue::addTask(void (*handler)(void*), void* arg)
    {
        Task task;
        task.arg = arg;
        task.handler = handler; 
        pthread_mutex_lock(&this->m_lock);
        m_que.push(task);
        pthread_mutex_unlock(&this->m_lock);
    }
    
    Task TaskQueue::getTask()
    {
        Task task;
    
        pthread_mutex_lock(&this->m_lock);
        if (m_que.size() > 0) {
            task = m_que.front();
            m_que.pop();
        }
        pthread_mutex_unlock(&this->m_lock);
    
        return task;
    }
    
    inline int TaskQueue::getTaskNum()
    {
        return this->m_que.size();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231

    main.cpp

    #include "threadPool.h"
    #include 
    using namespace std;
     
    void my_test(void* arg)
    {
        int num = *(int*)arg;
        cout << "thread id: " << pthread_self() << " , num: " << num << endl;
        sleep(1);
        delete (int*)arg;
    }
    
    int main()
    {
        ThreadPool* pool = new ThreadPool(10, 4);
        sleep(1);
        for (int i = 0; i < 30; i++) {
            Task task;
            task.handler = my_test;
            task.arg = new int(i);
            pool->addTask(task);
        }
        sleep(10);
        delete pool;
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
  • 相关阅读:
    Android实时获取摄像头画面传输至PC端
    80个在线小游戏源码
    uniapp之 登录成功后返回之前页面
    【webrtc】call:RtpTransportControllerSend 基于RTP的传输链路控制(包括收发)和MediaSoup cc简单对比
    LeetCode每日一题——792. 匹配子序列的单词数
    mipi介绍
    java基于springboot二手车交易管理系统附源码
    【算法集训专题攻克篇】第二十篇之二叉搜索树
    【cpu_entry_area mapping】SCTF2023-sycrop
    Jenkins快速了解
  • 原文地址:https://blog.csdn.net/qq_43479736/article/details/130857158