线程池一种线程使用模式。
因为线程过多会带来调度开销,影响缓存局部性和整体性能。线程池是用来维护着多个线程,等待监督管理者分配可并发执行的任务。
注意点:
线程池中可用线程的数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
a. 需要大量的线程来完成任务,并且要求完成任务的时间比较短。
比如Web服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。
b. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
对于长时间的任务,比如Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
c. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。
突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但短时间内产生大量线程可能使内存到达极限,出现错误。
问题:创建的线程越多性能越高?
答:不正确。一开始,程序的运行效率会随着线程数量的增加而逐渐变高。当线程数量增加临界值时,线程数量继续增加,程序的运行效率会减少。(由于线程的频繁切换影响了线程运行效率)。
线程池由一个阻塞任务队列加上多个线程实现,线程池中的线程可以从阻塞任务队列中获取任务然后进行任务处理,当线程都处于繁忙状态时可以将任务加入阻塞队列中,等到其它的线程空闲后进行处理。

线程池的主体是一个任务队列和n个线程(工作队列):
a.任务队列用来保存外界传入的需要解决的任务。
b.而线程池中的n个线程负责一直从任务队列中拿出任务并解决。
另外还需要一把互斥锁和一个条件变量:
互斥锁:用来保护任务队列的数据安全,即维护多线程从任务队列中pop任务时的互斥关系。
条件变量:用来维护多线程之间的同步关系,当任务队列为空时要求线程释放互斥锁并在条件变量下等待,这时任务队列中每插入一个任务就唤醒一个线程。
分别为任务队列(生产者)、工作队列(消费者)、管理组件(互斥锁和条件变量)。
- //任务队列(生产者) 链表
- struct nTask{
- void (*task_func)(struct nTask* task); //任务函数回调
- void* user_data; //任务用户数据
-
- struct nTask* prev;
- struct nTask* next;
- };
-
- //工作队列(消费者) 链表
- struct nWorker{
- pthread_t threadid; //工作线程id
- struct nManager* manager;
-
- struct nWorker* prev;
- struct nWorker* next;
- };
-
- //管理互斥锁和条件变量
- typedef struct nManager {
- struct nTask* tasks; //任务队列
- struct nWorker* workers;//工作队列
-
- pthread_mutex_t mutex; //互斥锁
- pthread_cond_t cond; //条件变量
- } ThreadPool;
- //线程池工作线程创建
- int nThreadPoolCreate(ThreadPool* pool, int numWorkers)
-
- //唤醒全部工作队列中的线程(销毁线程池)
- int nThreadPoolDestory(ThreadPool* pool, int nWorker)
-
- //唤醒单个工作队列中的线程
- int nThreadPoolPushTask(ThreadPool* pool, struct nTask* task)
创建10个thread线程加入工作队列,阻塞等待。随后创建了1000个task任务,写入 task的数据,去唤醒工作队列中的线程,管理组件判断任务有无,调用task的func来执行任务。
- #include
- #include
- #include
- #include
-
- #define THREADPOOL_INIT_COUNT 20 //工作线程数量
- #define TASK_INIT_SIZE 1000 //任务数量
-
- //双向链表插入(头插),list为头结点
- #define LIST_INSERT(item, list) do{ \
- item->prev = NULL; \
- item->next =list; \
- if((list) != NULL)(list)->prev = item; \
- (list) = item; \
- }while(0)
-
- //双向链表删除,list为头结点
- #define LIST_REMOVE(item, list) do{ \
- if(item->prev != NULL)item->prev->next = item->next;\
- if(item->next != NULL)item->next->prev = item->prev;\
- if(item == list)list = item->next;\
- item->prev = item->next = NULL;\
- }while(0)
-
- //任务队列(生产者) 链表
- struct nTask{
- void (*task_func)(struct nTask* task); //任务函数回调
- void* user_data; //任务用户数据
-
- struct nTask* prev;
- struct nTask* next;
- };
-
- //工作队列(消费者) 链表
- struct nWorker{
- pthread_t threadid; //工作线程id
- struct nManager* manager;
-
- struct nWorker* prev;
- struct nWorker* next;
- };
-
- //管理互斥锁和条件变量
- typedef struct nManager {
- struct nTask* tasks; //任务队列
- struct nWorker* workers;//工作队列
-
- pthread_mutex_t mutex; //互斥锁
- pthread_cond_t cond; //条件变量
- } ThreadPool;
-
-
- //消费者工作线程
- static void* nThreadPoolCallback(void* arg)
- {
- struct nWorker* worker = (struct nWorker*)arg;
-
- while(1){
- pthread_mutex_lock(&worker->manager->mutex);
- while(worker->manager->tasks == NULL){ //任务队列为空
- //解锁,阻塞等待唤醒,唤醒后加锁继续工作
- pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex);
- }
-
- struct nTask* task = worker->manager->tasks; //从头结点开始消费
- LIST_REMOVE(task, worker->manager->tasks); //任务队列移除头结点
-
- pthread_mutex_unlock(&worker->manager->mutex);
- task->task_func(task); //任务函数回调
- }
-
- free(worker);
-
- }
-
- //线程池工作线程创建
- int nThreadPoolCreate(ThreadPool* pool, int numWorkers)
- {
- if (pool == NULL) return -1;
- if (numWorkers < 1) numWorkers = 1;
- memset(pool, 0, sizeof(ThreadPool));
-
- pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; //静态申请 条件变量
- memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
-
- pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; //静态申请 互斥锁
- memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
-
- int i = 0;
- for(i = 0; i < numWorkers; i++){
- struct nWorker* worker = (struct nWorker*)malloc(sizeof(struct nWorker));
- if (worker == NULL) {
- perror("malloc");
- return -1;
- }
- memset(worker, 0, sizeof(struct nWorker));
- worker->manager = pool;
-
- int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker);
- if (ret) { //失败
- perror("pthread_create fail");
- free(worker);
- return -2;
- }
-
- LIST_INSERT(worker, pool->workers); //工作队列结点插入
- }
-
- return 0;
- }
-
- //唤醒全部工作队列中的线程(销毁线程池)
- int nThreadPoolDestory(ThreadPool* pool, int nWorker)
- {
- struct nWorker* worker = NULL;
-
- pthread_mutex_lock(&pool->mutex);
-
- //唤醒全部阻塞在条件变量上的工作线程
- pthread_cond_broadcast(&pool->cond);
-
- pthread_mutex_unlock(&pool->mutex);
-
- pool->workers = NULL;
- pool->tasks = NULL;
-
- return 0;
- }
-
- //唤醒单个工作队列中的线程
- int nThreadPoolPushTask(ThreadPool* pool, struct nTask* task)
- {
- pthread_mutex_lock(&pool->mutex);
-
- LIST_INSERT(task, pool->tasks);
-
- //唤醒至少一个阻塞在条件变量上的线程
- pthread_cond_signal(&pool->cond);
-
- pthread_mutex_unlock(&pool->mutex);
-
- }
-
- //任务执行回调函数
- void task_etry(struct nTask* task)
- {
- int idx = *(int*)task->user_data;
- printf("idx: %d\n", idx);
-
- free(task->user_data);
- free(task);
- }
-
- int main(void)
- {
- ThreadPool pool = {0};
-
- //线程池工作线程创建
- nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
- for(int i = 0; i < TASK_INIT_SIZE; i++){
- struct nTask* task = (struct nTask*)malloc(sizeof(struct nTask));
- if (task == NULL) {
- perror("malloc");
- exit(1);
- }
- memset(task, 0, sizeof(struct nTask));
-
- task->task_func = task_etry; //注册回调
- task->user_data = malloc(sizeof(int));
- *(int*)task->user_data = i;
-
- //唤醒单个工作队列中的线程
- nThreadPoolPushTask(&pool, task);
- }
-
- getchar();
- }
-