16、 Linux系统编程系列之线程池
线程池就是将许多线程,放置在一个池子中(实际就是一个结构体),只要有任务,就将任务投入池中,这些线程们通过某些机制,及时处理这些任务,一旦处理完后又重新回到池子中重新接收新任务。为了便于管理,线程池还应当提供诸如初始化线程池,增删线程数量,检测未完成任务的数据,检测正在执行任务的线程的数量、销毁线程池等等基本操作。
1、更好的满足并发性需求
2、更加节省系统资源,降低负载
3、避免每次需要执行任务时都创建新的线程
4、可以限制并发线程数量,帮助控制应用程序的性能和稳定性
核心思想:通过精巧的设计使得池子中的线程数量可以动态地发生变化,让线程既可以应对并发性需求,又不会浪费系统资源
用来应对某种场景,要在程序中创建大量线程,并且这些线程的数量和生命周期均不确定,可能方生方死,也可能常驻内存,请看下面举例。
线程池可以处理多个任务,从而减少了创建和销毁线程的开销,提高了系统性能
线程池可以帮助服务端应用程序同时处理多个客户端请求
线程池可以处理 I/O 操作,允许系统使用空闲时间进行其他任务处理
线程池可以处理来自 Web 客户端的请求,从而提高了 Web 应用程序的性能
总之,任何需要处理大量任务或需要同时处理多个请求的应用程序都可以使用线程池来提高性能和效率。
设计某个东西一定要有个目标,这里是要求设计一个线程池,首先要明白线程池的作用,然后根据其作用来展开设计。线程池是通过让线程的数量进行动态的变化来及时处理接受的任务,所以核心就是线程和任务。可以将问题转换为如何组织线程和组织任务?借鉴别人的一幅图
从上图可以看出,线程被创建出来之后,都处于睡眠态,它们实际上是进入了条件变量的等待队列中,而任务都被放入一个链表,被互斥锁保护起来
线程的生命周期如下:(这里省略程序执行的流程图)
(1)、线程被创建
(2)、预防死锁,准备好退出处理函数,防止在持有一把锁的状态中死去
(3)、尝试持有互斥锁(等待任务)
(4)、判断是否有任务,若无则进入条件变量等待队列睡眠,若有则进入第(5)步
(5)、从任务链表中取得一个任务
(6)、释放互斥锁
(7)、弹出退出处理函数,避免占用内存
(8)、执行任务
(9)、执行任务完成后重新回到第(2)步
线程的生命周期其实就是线程要做的事情,把上面的第(2)步到第(8)步写成一个线程的例程函数。
所谓的任务就是一个函数,回想一下创建一条线程时,是不是要指定一个线程例程函数。这里的线程池做法是将函数(准确的说是函数指针)及其参数存入一个任务节点,并将节点链接成一个链表。所以现在的问题变成如何设计一个任务链表?
一个链表的操作有:创建节点,增加节点,删除节点,查询节点这几步。由于这里是任务节点,就不需要查询了,当然也可以查询。所以现在的问题转换为任务节点如何设计?
任务节点的数据域主要有函数指针和函数参数,指针域就是一个简单任务结点指针,也可以是双向循环链表。
设计分析到这里,基本上有了框架了,前面买了这么多关子,下面就来个重头戏。其实前面主要是想描述一种设计思想,遇到新的东西如何设计它?首先要明白设计的目的和作用,然后找出核心点,最后就是把核心点进行不断的分析推理,一步步转换为最基本的问题。学过项目管理的同学应该听过在范围管理里有个叫工作分解结构的东西,这里跟那个差不多。
实现一个线程池,并完成线程池的基本操作演示
thread_pool.h(线程池头文件)
- #ifndef _THREAD_POOL_H
- #define _THREAD_POOL_H
-
- #include
- #include
- #include
- #include
- #include
-
- #define MAX_WAITING_TASKS 1000 // 最大的等待任务数量
- #define MAX_ACTIVE_THREADS 20 // 最多的活跃线程数量
-
- // 任务节点
- typedef struct task
- {
- void *(*do_task) (void *arg); // 函数指针
- void *arg; // 函数参数
- struct task *next; // 指向下一个任务节点
- }task;
-
-
- // 线程池的管理节点
- typedef struct thread_pool
- {
- pthread_mutex_t task_list_lock; // 任务列表互斥锁,保证同一个时间只有一条线程进行访问
- pthread_cond_t task_list_cond; // 任务列表条件变量,保证没有任务时,线程进入睡眠
-
- task *task_list; // 任务列表
-
- pthread_t *tids; // 存放线程号的数组
-
- int shutdown; // 线程池销毁的开关
-
- unsigned int max_waiting_tasks; // 最大等待的任务数量
- unsigned int waiting_tasks; // 正在等待被处理的任务数量
- unsigned int active_threads; // 正在活跃的线程数
-
- }thread_pool;
-
- // 初始化线程池
- int init_pool(thread_pool *pool, unsigned int threads_number);
-
- // 往线程池中添加任务
- int add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg);
-
- // 往线程池中添加线程
- int add_thread(thread_pool *pool, unsigned int additional_threads);
-
- // 从线程池中删除线程
- int remove_thread(thread_pool *pool, unsigned int removing_threads);
-
- // 查询线程池中线程数量
- int total_thread(thread_pool *pool);
-
- // 销毁线程池
- int destroy_pool(thread_pool *pool);
-
- // 线程的例程函数
- void *routine(void *arg);
-
- #endif // _THREAD_POOL_H
thread_pool.c(线程池实现文件)
- #include "thread_pool.h"
-
- // 初始化线程池
- int init_pool(thread_pool *pool, unsigned int threads_number)
- {
- if(threads_number < 1)
- {
- printf("warning: threads_number must bigger than 0\n");
- return -1;
- }
- if(threads_number > MAX_ACTIVE_THREADS)
- {
- printf("warning: threads number is bigger than MAX_ACTIVE_THREADS(%u)\n", MAX_ACTIVE_THREADS);
- return -1;
- }
-
- pthread_mutex_init(&pool->task_list_lock, NULL); // 初始化互斥锁
- pthread_cond_init(&pool->task_list_cond, NULL); // 初始化条件变量
-
- pool->task_list = calloc(1, sizeof(task)); // 申请一个任务头节点
- if(!pool->task_list)
- {
- printf("error: task_list calloc fail\n");
- return -1;
- }
- pool->task_list->next = NULL; // 让任务头节点指向NULL
-
- pool->tids = calloc(MAX_ACTIVE_THREADS, sizeof(pthread_t)); // 申请堆数组用来存放线程号
- if(!pool->tids)
- {
- printf("error: tids calloc fail\n");
- return -1;
- }
-
- pool->shutdown = 0; // 关闭线程池销毁开发
-
- pool->waiting_tasks = 0; // 当前等待任务为0
- pool->max_waiting_tasks = MAX_WAITING_TASKS; // 初始化最大等待任务数量
- pool->active_threads = threads_number; // 初始化活跃的线程数
-
- // 根据活跃的线程数,创建对应数量的线程
- for(int i = 0; i < pool->active_threads; i++)
- {
- // 需要判断线程是否创建失败
- // 线程号用线程号数组,线程属性设置为默认的,例程函数是routine,函数参数是线程池
- errno = pthread_create(&pool->tids[i], NULL, routine,(void*)pool);
- if(errno != 0)
- {
- perror("error: pthread_create fail");
- return -1;
- }
- }
-
- return 1;
- }
-
- // 往线程池中添加任务
- int add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
- {
- if(!pool)
- {
- printf("warning: thread_pool is null\n");
- return -1;
- }
-
- // 如果当前等待执行的任务数超过最大等待的任务数量,则退出
- if(pool->waiting_tasks >= pool->max_waiting_tasks)
- {
- printf("warning: task_list is full, too many list\n");
- return -1;
- }
-
- // 尝试给新的任务节点申请空间
- task *new_task = (task*)malloc(sizeof(task));
- if(!new_task)
- {
- printf("error: task malloc fail\n");
- return -1;
- }
-
- // 初始化任务节点
- new_task->do_task = do_task;
- new_task->arg = arg;
- new_task->next = NULL;
-
- // 把任务节点添加到任务列表最后面
- // 1、上锁
- pthread_mutex_lock(&pool->task_list_lock);
-
- // 2、先找到任务列表末尾
- task *tmp = NULL;
- for(tmp = pool->task_list; tmp->next; tmp = tmp->next);
-
- // 3、添加新任务到任务列表末尾,同时当前等待任务数+1
- tmp->next = new_task;
- pool->waiting_tasks++;
-
- // 4、解锁
- pthread_mutex_unlock(&pool->task_list_lock);
-
- // 唤醒一个正在条件变量中睡眠等待的线程,取执行任务
- pthread_cond_signal(&pool->task_list_cond);
-
- return 1;
- }
-
- // 往线程池中添加线程,返回实际添加的线程数量
- int add_thread(thread_pool *pool, unsigned int additional_threads)
- {
- if(!pool)
- {
- printf("warning: thread_pool is null\n");
- return -1;
- }
-
- if(additional_threads == 0)
- {
- return 0;
- }
-
- // 期望活跃的线程数 = 目前活跃的线程数 + 期望添加的线程数
- unsigned int total_threads = pool->active_threads + additional_threads;
-
- // 如果超过最大的活跃线程数就打印警告
- if(total_threads > MAX_ACTIVE_THREADS)
- {
- printf("warning: add too many threads\n");
- }
-
- int actual_add_threads = 0;
- for(int i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++)
- {
- // 需要判断线程是否创建失败
- // 线程号用线程号数组,线程属性设置为默认的,例程函数是routine,函数参数是线程池
- errno = pthread_create(&pool->tids[i], NULL, routine,(void*)pool);
- if(errno != 0)
- {
- perror("error: pthread_create fail");
-
- // 如果一个都没有创建成功,就直接返回
- if(actual_add_threads == 0)
- {
- return -1;
- }
- // 如果成功创建了多个线程,但是本次创建失败,就跳出循环
- break;
- }
- else
- {
- actual_add_threads++; // 线程创建成功就+1
- }
- }
-
- // 更新线程池中活跃的线程数
- pool->active_threads += actual_add_threads;
-
- return actual_add_threads; // 返回实际添加的线程
- }
-
- // 从线程池中删除线程,返回实际删除线程的数量
- int remove_thread(thread_pool *pool, unsigned int removing_threads)
- {
- if(!pool)
- {
- printf("warning: thread_pool is null\n");
- return -1;
- }
-
- if(removing_threads == 0)
- {
- return pool->active_threads;
- }
-
- // 如果要删除的线程数大于线程池中活跃的线程数,则打印警告
- if(removing_threads >= pool->active_threads)
- {
- printf("warning: remove too many threads\n");
- }
-
- // 剩余数量 = 活跃数量 - 删除的目标数
- int remaining_threads = pool->active_threads - removing_threads;
-
- // 目的是为了让线程池中最少保留有一个线程可以用于执行任务
- remaining_threads = remaining_threads > 0 ? remaining_threads : 1;
-
- int actual_remove_threads = 0;;
- // 循环取消线程直到等于期望线程数
- for(int i = pool->active_threads-1; i > remaining_threads-1; i--)
- {
- errno = pthread_cancel(pool->tids[i]);
- if(errno != 0)
- {
- printf("[%ld] cancel error: %s\n", pool->tids[i], strerror(errno));
- break;
- }
- else
- {
- actual_remove_threads++;
- }
- }
-
- // 更新线程池中活跃的线程数量
- pool->active_threads -= actual_remove_threads;
-
- return actual_remove_threads; // 返回实际删除的线程
- }
-
- // 查询线程池中线程数量
- int total_thread(thread_pool *pool)
- {
- return pool->active_threads;
- }
-
- // 销毁线程池
- int destroy_pool(thread_pool *pool)
- {
- if(!pool)
- {
- printf("warning: thread_pool is null\n");
- return -1;
- }
-
- pool->shutdown = 1; // 启动线程池销毁开关
-
- pthread_cond_broadcast(&pool->task_list_cond); // 唤醒所有在线程池中的线程
-
- // 循环等待所有线程退出
- for(int i = 0; i < pool->active_threads; i++)
- {
- errno = pthread_join(pool->tids[i], NULL);
- if(errno != 0)
- {
- printf("join tids[%d] error: %s\n", i, strerror(errno));
- }
- else
- {
- printf("[%ld] is joined\n", pool->tids[i]);
- }
- }
-
- // 先上锁
- pthread_mutex_lock(&pool->task_list_lock);
- // 头删法,从头一个个的删除任务节点
- for(task *p = pool->task_list->next; p; p = pool->task_list)
- {
- pool->task_list->next = p->next; // 修改首元节点
- free(p);
- }
-
- free(pool->task_list);
- pool->task_list = NULL; // 安全保证
-
- // 解锁
- pthread_mutex_unlock(&pool->task_list_lock);
-
- free(pool->tids);
- free(pool);
- pool->tids = NULL; // 安全保证
- pool = NULL; // 安全保证
-
- return 1;
- }
-
- // 线程的取消函数
- void pthread_cancel_handler(void *arg)
- {
- printf("[%ld] is cancel\n", pthread_self());
-
- pthread_mutex_unlock((pthread_mutex_t*)arg); // 解锁
- }
-
- // 线程的例程函数
- void *routine(void *arg)
- {
- task *task_p; // 任务指针,用来执行将要执行的任务
-
- thread_pool *pool = (thread_pool*)arg;
-
- while(1)
- {
- pthread_cleanup_push(pthread_cancel_handler, (void*)&pool->task_list_lock);
-
- // 尝试持有互斥锁
- pthread_mutex_lock(&pool->task_list_lock);
-
- // 判断是否有任务,没有则进入睡眠
- // 1、没有任务,线程池销毁开关断开,则进入条件变量等待队列
- while(!pool->waiting_tasks && !pool->shutdown)
- {
- // 当条件不满足时,会先自动解锁pool->lock,然后等待到条件满足后,会自动上锁pool->lock
- pthread_cond_wait(&pool->task_list_cond, &pool->task_list_lock);
- }
-
- // 2、线程池销毁开发闭合,不管有没有任务,先解锁,然后退出,准备销毁线程池
- if(pool->shutdown)
- {
- pthread_mutex_unlock(&pool->task_list_lock); // 需要解锁
- pthread_exit(NULL);
- }
-
- // 3、有任务,线程池销毁开关断开,则取一个任务
- if(pool->waiting_tasks && !pool->shutdown)
- {
- task_p = pool->task_list->next;
- pool->task_list->next = task_p->next; // 弹出第一任务节点
- pool->waiting_tasks--; // 当前等待的任务数量-1,
-
- pthread_mutex_unlock(&pool->task_list_lock); // 解锁
- }
-
- // 弹出压入的线程取消函数,运行到这里不执行,但是当线程在前面被意外取消或中断会执行
- pthread_cleanup_pop(0);
-
- // 为了防止死锁,执行任务期间不接受线程取消请求
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-
- (task_p->do_task)(task_p->arg); // 通过函数指针的方式执行任务
-
- // 执行完任务后,接受线程取消请求
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
-
- free(task_p); // 删除任务节点
- task_p = NULL; // 安全保证
- }
-
- pthread_exit(NULL);
- }
main.c(线程池操作演示文件)
- // 线程池的测试案例
-
- #include
- #include
- #include "thread_pool.h"
-
- void *mytask(void *arg)
- {
- int n = rand()%10;
-
- printf("[%ld][%s] ==> job will be done in %d sec...\n", pthread_self(), __FUNCTION__, n);
-
- sleep(n);
-
- printf("[%ld][%s] ==> job done!\n", pthread_self(), __FUNCTION__);
- }
-
- void *func(void *arg )
- {
- printf("this is a test by [%s]\n", (char*)arg);
- sleep(1);
- printf("test finish...\n");
- }
-
- void *count_time(void *arg)
- {
- int i = 0;
- while(1)
- {
- sleep(1);
- printf("sec: %d\n", ++i);
- }
- }
-
- int main(void)
- {
- pthread_t a;
- pthread_create(&a, NULL, count_time, NULL);
-
- // 1、初始化线程池
- thread_pool *pool = malloc(sizeof(thread_pool));
- init_pool(pool, 2);
-
- // 2、投放任务
- printf("throwing 3 tasks...\n");
- add_task(pool, mytask, NULL);
- add_task(pool, mytask, NULL);
- add_task(pool, mytask, NULL);
-
- // 3、查看当前线程池中的线程数量
- printf("current thread number: %d\n", total_thread(pool));
- sleep(9);
-
- // 4、再次投放任务
- printf("throwing another 2 tasks...\n");
- add_task(pool, mytask, NULL);
- add_task(pool, mytask, NULL);
- add_task(pool, func, (void *)"Great Macro");
-
- // 5、添加2条线程
- printf("try add 2 threads, actual add %d\n", add_thread(pool, 2));
- printf("current thread number: %d\n", total_thread(pool));
-
- sleep(5);
-
- // 6、删除3条线程
- printf("try remove 3 threads, actual remove: %d\n", remove_thread(pool, 3));
- printf("current thread number: %d\n", total_thread(pool));
-
- sleep(5);
- // 7、 销毁线程池
- printf("destroy thread pool\n");
- destroy_pool(pool);
- return 0;
- }
注:编译时,把线程池文件和测试文件放在同一个工作目录下。
实际使用时,只需要把thread_pool.h和thread_pool.c拷贝到自己的工程目录下,然后根据规则操作线程池。
线程池是许多线程的集合,它不是线程组。线程池适用于任何需要处理大量任务或需要同时处理多个请求的应用程序的场景,可以提供性能和效率。
至此,Linux系统编程系列,16篇完结撒花,历时5天,这年中秋国庆没有假放!!!