本文主要参考《C++ 有什么好用的线程池? - TOMOCAT的回答 - 知乎》https://www.zhihu.com/question/397916107/answer/2385548374,然后对其中C++11实现的代码做了一些修改,保证可以在windows系统上使用。其中pthread在windows系统上的编译使用可以参考我的另外一篇博客:windows下使用pthread库方法
假设完成一项任务需要的时间=创建线程时间T1+线程执行任务时间T2+销毁线程时间T3,如果T1+T3的时间远大于T2,通常就可以考虑采取线程池来提高服务器的性能
thread pool就是线程的一种使用模式,一个线程池中维护着多个线程等待接收管理者分配的可并发执行的任务。
将待处理的任务抽象成task结构:
typedef struct task {
void* (*run)(void* args); // abstract a job function that need to run
void* arg; // argument of the run function
struct task* next; // point to the next task in task queue
} task_t;
threadpool中用first和last指针指向首尾两个任务task结构体保证每个task都能指向任务队列中下一个tasktypedef struct task {
void* (*run)(void* args); // abstract a job function that need to run
void* arg; // argument of the run function
struct task* next; // point to the next task in task queue
} task_t;
typedef struct threadpool {
condition_t ready; // condition & mutex
task_t* first; // fist task in task queue
task_t* last; // last task in task queue
int counter; // total task number
int idle; // idle task number
int max_threads; // max task number
int quit; // the quit flag
} threadpool_t;
设计了condition_t类来实现安全并发:
typedef struct condition {
/**
* 互斥锁
*/
pthread_mutex_t pmutex;
/**
* 条件变量
*/
pthread_cond_t pcond;
} condition_t;
提供对应的接口:
/**
* 初始化
*/
int condition_init(condition_t* cond);
/**
* 加锁
*/
int condition_lock(condition_t* cond);
/**
* 解锁
*/
int condition_unlock(condition_t* cond);
/**
* 条件等待
*
* pthread_cond_wait(cond, mutex)的功能有3个:
* 1) 调用者线程首先释放mutex
* 2) 然后阻塞, 等待被别的线程唤醒
* 3) 当调用者线程被唤醒后,调用者线程会再次获取mutex
*/
int condition_wait(condition_t* cond);
/**
* 计时等待
*/
int condition_timedwait(condition_t* cond, const timespec* abstime);
/**
* 激活一个等待该条件的线程
*
* 1) 作用: 发送一个信号给另外一个处于阻塞等待状态的线程, 使其脱离阻塞状态继续执行
* 2) 如果没有线程处在阻塞状态, 那么pthread_cond_signal也会成功返回, 所以需要判断下idle thread的数量
* 3) 最多只会给一个线程发信号,不会有「惊群现象」
* 4) 首先根据线程优先级的高低确定发送给哪个线程信号, 如果优先级相同则优先发给等待最久的线程
* 5) 重点: pthread_cond_wait必须放在lock和unlock之间, 因为他要根据共享变量的状态决定是否要等待; 但是pthread_cond_signal既可以放在lock和unlock之间,也可以放在lock和unlock之后
*/
int condition_signal(condition_t *cond);
/**
* 唤醒所有等待线程
*/
int condition_broadcast(condition_t *cond);
/**
* 销毁
*/
int condition_destroy(condition_t *cond);
仅仅是初始化了condition和mutex,还有一些线程池的属性。但是任务队列是空的,而且此时也一个线程都没有。
// initialize the thread pool
void threadpool_init(threadpool_t* pool, int threads_num) {
int n_status = condition_init(&pool ->ready);
if (n_status == 0) {
printf("Info: initialize the thread pool successfully!\n");
} else {
printf("Error: initialize the thread pool failed, status:%d\n", n_status);
}
pool->first = NULL;
pool->last = NULL;
pool->counter = 0;
pool->idle = 0;
pool->max_threads = threads_num;
pool->quit = 0;
}
首先构建task结构体,然后将其加入任务队列。
// add a task to thread pool
void threadpool_add_task(threadpool_t* pool, void* (*run)(void *arg), void* arg) {
// create a task
task_t* new_task = reinterpret_cast(malloc(sizeof(task_t)));
new_task->run = run;
new_task->arg = arg;
new_task->next = NULL;
// lock the condition
condition_lock(&pool->ready);
// add the task to task queue
if (pool->first == NULL) {
pool->first = new_task;
} else { // else add to the last task
pool->last->next = new_task;
}
pool->last = new_task;
/*
* after you add a task to task queue, you need to allocate it to a thread:
* (1)if idle thread num > 0: awake a idle thread
* (2)if idle thread num = 0 & thread num does not reach maximum: create a new thread to run the task
*/
if (pool->idle > 0) {
// awake a thread that wait for longest time
condition_signal(&pool->ready);
} else if (pool->counter < pool->max_threads) {
// define a tid to get the thread identifier that we are going to create
pthread_t tid;
/*
* pthread_create():
* (1)thread identifier
* (2)set the pthread attribute
* (3)the function that thread is going to run
* (4)the args of run func
*
* A realistic limit of thread num is 200 to 400 threads
* https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/ptcrea.htm
*/
pthread_create(&tid, NULL, thread_routine, pool);
pool->counter++;
} else { // when (idle == 0 & counter = max_threads), then wait
printf("Warning: no idle thread, please wait...\n");
}
condition_unlock(&pool->ready);
}
// when task queue is empty, then block 2 second to get the new task
// If timeout, then destroy the thread
while (pool->first == NULL && !pool->quit) {
printf("Info: thread %ld is waiting for a task\n", (u_int64_t)pthread_self());
// get the system time
clock_gettime(CLOCK_REALTIME, &abs_name);
abs_name.tv_sec += 2;
int status;
status = condition_timedwait(&pool->ready, &abs_name); // block for 2 second
if (status == ETIMEDOUT) {
printf("Info: thread %ld wait timed out\n", (u_int64_t)pthread_self());
timeout = true;
break;
}
}
...
// if visit task queue timeout(means no task in queue), quit destory the thread
if (timeout) {
pool->counter--;
condition_unlock(&pool->ready);
break; // destroy the thread
}
// when the thread run the task, we should unlock the thread pool
if (pool->first != NULL) {
// get the task from task queue
task_t* t = pool->first;
pool->first = t->next;
// unlock the thread pool to make other threads visit task queue
condition_unlock(&pool->ready);
// run the task run func
t->run(t->arg);
free(t);
// lock
condition_lock(&pool->ready);
}
// when task queue is clean and quit flag is 1, then destroy the thread
if (pool->quit && pool->first == NULL) {
pool->counter--;
// 若线程池中线程数为0,通知等待线程(主线程)全部任务已经完成
if (pool->counter == 0) {
condition_signal(&pool->ready);
}
condition_unlock(&pool->ready);
break; // destroy the thread
}
condition.h:
#ifndef CONDITION_H_
#define CONDITION_H_
#include
#include
typedef struct condition {
/**
* 互斥锁
*/
pthread_mutex_t pmutex;
/**
* 条件变量
*/
pthread_cond_t pcond;
} condition_t;
/**
* 初始化
*/
int condition_init(condition_t* cond);
/**
* 加锁
*/
int condition_lock(condition_t* cond);
/**
* 解锁
*/
int condition_unlock(condition_t* cond);
/**
* 条件等待
*
* pthread_cond_wait(cond, mutex)的功能有3个:
* 1) 调用者线程首先释放mutex
* 2) 然后阻塞, 等待被别的线程唤醒
* 3) 当调用者线程被唤醒后,调用者线程会再次获取mutex
*/
int condition_wait(condition_t* cond);
/**
* 计时等待
*/
int condition_timedwait(condition_t* cond, const timespec* abstime);
/**
* 激活一个等待该条件的线程
*
* 1) 作用: 发送一个信号给另外一个处于阻塞等待状态的线程, 使其脱离阻塞状态继续执行
* 2) 如果没有线程处在阻塞状态, 那么pthread_cond_signal也会成功返回, 所以需要判断下idle thread的数量
* 3) 最多只会给一个线程发信号,不会有「惊群现象」
* 4) 首先根据线程优先级的高低确定发送给哪个线程信号, 如果优先级相同则优先发给等待最久的线程
* 5) 重点: pthread_cond_wait必须放在lock和unlock之间, 因为他要根据共享变量的状态决定是否要等待; 但是pthread_cond_signal既可以放在lock和unlock之间,也可以放在lock和unlock之后
*/
int condition_signal(condition_t *cond);
/**
* 唤醒所有等待线程
*/
int condition_broadcast(condition_t *cond);
/**
* 销毁
*/
int condition_destroy(condition_t *cond);
#endif // CONDITION_H_
condition.cpp:
#include "condition.h"
// 初始化
int condition_init(condition_t* cond) {
int status;
status = pthread_mutex_init(&cond->pmutex, NULL);
if (status != 0) {
printf("Error: pthread_mutex_init failed, return value:%d\n", status);
return status;
}
status = pthread_cond_init(&cond->pcond, NULL);
if (status != 0) {
printf("Error: pthread_cond_init failed, return value:%d\n", status);
return status;
}
return 0;
}
// 加锁
int condition_lock(condition_t* cond) {
return pthread_mutex_lock(&cond->pmutex);
}
// 解锁
int condition_unlock(condition_t* cond) {
return pthread_mutex_unlock(&cond->pmutex);
}
// 条件等待
int condition_wait(condition_t* cond) {
return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}
// 计时等待
int condition_timedwait(condition_t* cond, const timespec* abstime) {
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}
// 激活一个等待该条件的线程
int condition_signal(condition_t *cond) {
return pthread_cond_signal(&cond->pcond);
}
// 唤醒所有等待线程
int condition_broadcast(condition_t *cond) {
return pthread_cond_broadcast(&cond->pcond);
}
// 销毁
int condition_destroy(condition_t *cond) {
int status;
status = pthread_mutex_destroy(&cond->pmutex);
if (status != 0) {
return status;
}
status = pthread_cond_destroy(&cond->pcond);
if (status != 0) {
return status;
}
return 0;
}
threadpool.h:
#ifndef THREAD_POLL_H_
#define THREAD_POLL_H_
#include "condition.h"
typedef struct task {
void* (*run)(void* args); // abstract a job function that need to run
void* arg; // argument of the run function
struct task* next; // point to the next task in task queue
} task_t;
typedef struct threadpool {
condition_t ready; // condition & mutex
task_t* first; // fist task in task queue
task_t* last; // last task in task queue
int counter; // total task number
int idle; // idle task number
int max_threads; // max task number
int quit; // the quit flag
} threadpool_t;
/**
* initialize threadpool
*/
void threadpool_init(threadpool_t* pool, int threads_num);
/**
* add a task to threadpool
*/
void threadpool_add_task(threadpool_t* pool, void* (*run)(void *args), void* arg);
/**
* destroy threadpool
*/
void threadpool_destroy(threadpool_t* pool);
#endif // THREAD_POLL_H_
Threadpool.cpp:
#include
#include
#include
#include
#include
#include "threadpool.h"
void *thread_routine(void *arg);
// initialize the thread pool
void threadpool_init(threadpool_t* pool, int threads_num) {
int n_status = condition_init(&pool ->ready);
if (n_status == 0) {
printf("Info: initialize the thread pool successfully!\n");
} else {
printf("Error: initialize the thread pool failed, status:%d\n", n_status);
}
pool->first = NULL;
pool->last = NULL;
pool->counter = 0;
pool->idle = 0;
pool->max_threads = threads_num;
pool->quit = 0;
}
// add a task to thread pool
void threadpool_add_task(threadpool_t* pool, void* (*run)(void *arg), void* arg) {
// create a task
task_t* new_task = reinterpret_cast(malloc(sizeof(task_t)));
new_task->run = run;
new_task->arg = arg;
new_task->next = NULL;
// lock the condition
condition_lock(&pool->ready);
// add the task to task queue
if (pool->first == NULL) {
pool->first = new_task;
} else { // else add to the last task
pool->last->next = new_task;
}
pool->last = new_task;
/*
* after you add a task to task queue, you need to allocate it to a thread:
* (1)if idle thread num > 0: awake a idle thread
* (2)if idle thread num = 0 & thread num does not reach maximum: create a new thread to run the task
*/
if (pool->idle > 0) {
// awake a thread that wait for longest time
condition_signal(&pool->ready);
} else if (pool->counter < pool->max_threads) {
// define a tid to get the thread identifier that we are going to create
pthread_t tid;
/*
* pthread_create():
* (1)thread identifier
* (2)set the pthread attribute
* (3)the function that thread is going to run
* (4)the args of run func
*
* A realistic limit of thread num is 200 to 400 threads
* https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/ptcrea.htm
*/
pthread_create(&tid, NULL, thread_routine, pool);
pool->counter++;
} else { // when (idle == 0 & counter = max_threads), then wait
printf("Warning: no idle thread, please wait...\n");
}
condition_unlock(&pool->ready);
}
// create a thread to run the task run func
// and the void *arg means the arg passed by pthread_create: pool
void *thread_routine(void *arg) {
struct timespec abs_name;
bool timeout;
printf("Info: create thread, and the thread id is: %ld\n", (u_int64_t)pthread_self());
threadpool_t *pool = reinterpret_cast(arg);
// keep visiting the task queue
while (true) {
timeout = false;
condition_lock(&pool->ready);
pool->idle++;
// when task queue is empty, then block 2 second to get the new task
// If timeout, then destroy the thread
while (pool->first == NULL && !pool->quit) {
printf("Info: thread %ld is waiting for a task\n", (u_int64_t)pthread_self());
// get the system time
clock_gettime(CLOCK_REALTIME, &abs_name);
abs_name.tv_sec += 2;
int status;
status = condition_timedwait(&pool->ready, &abs_name); // block for 2 second
if (status == ETIMEDOUT) {
printf("Info: thread %ld wait timed out\n", (u_int64_t)pthread_self());
timeout = true;
break;
}
}
pool->idle--;
// when the thread run the task, we should unlock the thread pool
if (pool->first != NULL) {
// get the task from task queue
task_t* t = pool->first;
pool->first = t->next;
// unlock the thread pool to make other threads visit task queue
condition_unlock(&pool->ready);
// run the task run func
t->run(t->arg);
free(t);
// lock
condition_lock(&pool->ready);
}
// when task queue is clean and quit flag is 1, then destroy the thread
if (pool->quit && pool->first == NULL) {
pool->counter--;
// 若线程池中线程数为0,通知等待线程(主线程)全部任务已经完成
if (pool->counter == 0) {
condition_signal(&pool->ready);
}
condition_unlock(&pool->ready);
break; // destroy the thread
}
// if visit task queue timeout(means no task in queue), quit destory the thread
if (timeout) {
pool->counter--;
condition_unlock(&pool->ready);
break; // destroy the thread
}
condition_unlock(&pool->ready);
}
// if break, destroy the thread
printf("Info: thread %ld quit\n", (u_int64_t)pthread_self());
return NULL;
}
/*
* destroy a thread pool:
* 1) awake all the idle thread
* 2) wait for the running thread to finish
*/
void threadpool_destroy(threadpool_t *pool) {
if (pool->quit) {
return;
}
condition_lock(&pool->ready);
pool->quit = 1;
if (pool->counter > 0) {
if (pool->idle > 0) {
condition_broadcast(&pool->ready);
}
while (pool->counter > 0) {
condition_wait(&pool->ready);
}
}
condition_unlock(&pool->ready);