因为前段时间项目需要所以阅读分析了Nginx线程池源码,感觉它的代码风格以及相关的思想都很好,所以就想着记录一下,既是自己对知识点的复习巩固也希望给大家提供一些帮助,本篇博客将从什么是并发编程、为什么要使用线程池、线程池的组成以及线程池的使用这4个方面对Nginx线程池进行刨析,接下来进入正片!!
所谓并发编程是指在在同一台计算机上“同时”处理多个任务。并发是在同一实体上的多个事件。就像银行有多个服务窗口可以同时处理多个用户的请求(每人一个窗口)
。
进程通过fork函数进行创建,调用fork函数后,会创建一个子进程,并且父子两个进程都从fork处执行,进程方面详细的解析可以看看我之前写的这两篇文章👇
C/C++代码中创建销毁进程
C/C++多进程高并发框架分享
多进程的优点
多进程的缺点
多线程的优点
多线程的缺点
那么如何选择是选择多进程还是多线程呢?
看实际的应用场景。如果是在集群上,就适合用多进程,甚至可以在每个计算节点再多线程;如果是在单台计算机(个人计算机)上,多线程的资源开销会更低,推荐用线程池。
单个进程或线程同时只能处理一个任务,如果有很多请求需要同时处理
怎么办?
解决方案: 运用多进程或多线程技术解决
存在的缺陷(以下以多线程为例):
创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在处理实际的用户请求的时
间和资源要多得多
活动的线程需要消耗系统资源,如果启动太多,会导致系统由于过度消耗内存或“切换过度”而导致系统资源不足
那么就可以用线程池技术来解决上面的缺陷问题👇
线程池由一个任务队列
和一组处理队列的线程
组成。一旦工作进程需要处理某个可能“阻塞”的操作,不用自己操作,将其作为一个任务放到线程池的队列,接着会被某个空闲线程提取处理。
任务——待处理的工作,通常由标识、上下文和处理函数组成。
任务队列——按顺序保存待处理的任务序列,等待线程中的线程组处理。
线程池——由多个已启动的一组线程组成。
条件变量——一种同步机制,允许线程挂起,直到共享数据上的某些条件
得到满足。
互斥锁 ——保证在任一时刻,只能有一个线程访问该对象。
我先将几个.c和.h文件贴出来,大家伙可以先简单的过下流程,可以运行一下,后面有线程池关键结构体和关键函数解析!!
在这里我提供一个Makefile可以用于运行下面线程池的代码,有需要的小伙伴可以用用。
Makefile
myThreadPool.exe:
gcc *.c -o myThreadPool.exe -lpthread
clear:
rm -r myThreadPool.exe
thread.h
#ifndef _DEMO_THREAD_H_INCLUDED_
#define _DEMO_THREAD_H_INCLUDED_
#include
#include
#include
#include
#include
#include
#include
typedef intptr_t int_t;
typedef uintptr_t uint_t;
#define OK 0
#define ERROR -1
//互斥量
int thread_mutex_create(pthread_mutex_t *mtx);
int thread_mutex_destroy(pthread_mutex_t *mtx);
int thread_mutex_lock(pthread_mutex_t *mtx);
int thread_mutex_unlock(pthread_mutex_t *mtx);
//条件变量
int thread_cond_create(pthread_cond_t *cond);
int thread_cond_destroy(pthread_cond_t *cond);
int thread_cond_signal(pthread_cond_t *cond);
int thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx);
#endif /* _DEMO_THREAD_H_INCLUDED_ */
thread_pool.h
#ifndef _THREAD_POOL_H_INCLUDED_
#define _THREAD_POOL_H_INCLUDED_
#include "thread.h"
#define DEFAULT_THREADS_NUM 4
#define DEFAULT_QUEUE_NUM 65535
typedef unsigned long atomic_uint_t;
typedef struct thread_task_s thread_task_t;
typedef struct thread_pool_s thread_pool_t;
struct thread_task_s {
thread_task_t *next;
uint_t id;
void *ctx;
void (*handler)(void *data);//指向线程所要执行的函数
};
typedef struct {
thread_task_t *first;
thread_task_t **last;
} thread_pool_queue_t;
#define thread_pool_queue_init(q) \
(q)->first = NULL; \
(q)->last = &(q)->first
struct thread_pool_s {
pthread_mutex_t mtx;
thread_pool_queue_t queue;
int_t waiting;
pthread_cond_t cond;
char *name;
uint_t threads;//线程池中线程的数目
int_t max_queue;//任务X队列的长度
};
thread_task_t *thread_task_alloc(size_t size);
int_t thread_task_post(thread_pool_t *tp, thread_task_t *task);
thread_pool_t* thread_pool_init();
void thread_pool_destroy(thread_pool_t *tp);
#endif /* _THREAD_POOL_H_INCLUDED_ */
thread_cond.c
#include "thread.h"
int
thread_cond_create(pthread_cond_t *cond)
{
int err;
err = pthread_cond_init(cond, NULL);
if (err == 0) {
return OK;
}
fprintf(stderr, "pthread_cond_init() failed, reason: %s\n",strerror(errno));
return ERROR;
}
int
thread_cond_destroy(pthread_cond_t *cond)
{
int err;
err = pthread_cond_destroy(cond);
if (err == 0) {
return OK;
}
fprintf(stderr, "pthread_cond_destroy() failed, reason: %s\n",strerror(errno));
return ERROR;
}
int
thread_cond_signal(pthread_cond_t *cond)
{
int err;
err = pthread_cond_signal(cond);
if (err == 0) {
return OK;
}
fprintf(stderr, "pthread_cond_signal() failed, reason: %s\n",strerror(errno));
return ERROR;
}
int
thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx)
{
int err;
err = pthread_cond_wait(cond, mtx);
if (err == 0) {
return OK;
}
fprintf(stderr, "pthread_cond_wait() failed, reason: %s\n",strerror(errno));
return ERROR;
}
thread_mutex.c
#include "thread.h"
int
thread_mutex_create(pthread_mutex_t *mtx)
{
int err;
pthread_mutexattr_t attr;
err = pthread_mutexattr_init(&attr);
if (err != 0) {
fprintf(stderr, "pthread_mutexattr_init() failed, reason: %s\n",strerror(errno));
return ERROR;
}
/*设置属性,PTHREAD_MUTEX_ERRORCHECK是检错锁,
如果同一个线程请求同一个锁则返回edeadlk,
否则与pthread_mutex_timed_np类型动作相同,这样防止死锁
------------------------------
pthread_mutex_timed_np是缺省锁,也就是普通锁,
当一个线程加锁后其余请求锁的线程则组成一个等待队列
并在解锁后按顺序优先级获得锁
*/
err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
if (err != 0) {
fprintf(stderr, "pthread_mutexattr_settype(PTHREAD_MUTEX_ERRORCHECK) failed, reason: %s\n",strerror(errno));
return ERROR;
}
//属性被带到mtx中,追加属性
err = pthread_mutex_init(mtx, &attr);
if (err != 0) {
fprintf(stderr,"pthread_mutex_init() failed, reason: %s\n",strerror(errno));
return ERROR;
}
err = pthread_mutexattr_destroy(&attr);
if (err != 0) {
fprintf(stderr,"pthread_mutexattr_destroy() failed, reason: %s\n",strerror(errno));
}
return OK;
}
int
thread_mutex_destroy(pthread_mutex_t *mtx)
{
int err;
err = pthread_mutex_destroy(mtx);
if (err != 0) {
fprintf(stderr,"pthread_mutex_destroy() failed, reason: %s\n",strerror(errno));
return ERROR;
}
return OK;
}
int
thread_mutex_lock(pthread_mutex_t *mtx)
{
int err;
err = pthread_mutex_lock(mtx);
if (err == 0) {
return OK;
}
fprintf(stderr,"pthread_mutex_lock() failed, reason: %s\n",strerror(errno));
return ERROR;
}
int
thread_mutex_unlock(pthread_mutex_t *mtx)
{
int err;
err = pthread_mutex_unlock(mtx);
#if 0
ngx_time_update();
#endif
if (err == 0) {
return OK;
}
fprintf(stderr,"pthread_mutex_unlock() failed, reason: %s\n",strerror(errno));
return ERROR;
}
thread_pool.c
#include "thread_pool.h"
static void thread_pool_exit_handler(void *data);
static void *thread_pool_cycle(void *data);
static int_t thread_pool_init_default(thread_pool_t *tpp, char *name);
static uint_t thread_pool_task_id;
static int debug = 0;
thread_pool_t* thread_pool_init()
{
int err;
pthread_t tid;
uint_t n;
pthread_attr_t attr;
thread_pool_t *tp=NULL;
tp = calloc(1,sizeof(thread_pool_t));
if(tp == NULL){
fprintf(stderr, "thread_pool_init: calloc failed!\n");
}
thread_pool_init_default(tp, NULL);
thread_pool_queue_init(&tp->queue);
if (thread_mutex_create(&tp->mtx) != OK) {
free(tp);
return NULL;
}
if (thread_cond_create(&tp->cond) != OK) {
(void) thread_mutex_destroy(&tp->mtx);
free(tp);
return NULL;
}
err = pthread_attr_init(&attr);
if (err) {
fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
//PTHREAD_CREATE_DETACHED:在线程创建时将其属性红色为分离状态(detached),主线程使用pthread_join无法等待到结束的子线程
err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (err) {
fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
//tp->threads线程数
for (n = 0; n < tp->threads; n++) {
//thread_pool_cycle四个进程启动后都会执行这个方法
err = pthread_create(&tid, &attr, thread_pool_cycle, tp);
if (err) {
fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
}
(void) pthread_attr_destroy(&attr);
return tp;
}
void thread_pool_destroy(thread_pool_t *tp)
{
uint_t n;
thread_task_t task;
volatile uint_t lock;
memset(&task,'\0', sizeof(thread_task_t));
task.handler = thread_pool_exit_handler;//给一个自杀任务
task.ctx = (void *) &lock;
for (n = 0; n < tp->threads; n++) {
lock = 1;
if (thread_task_post(tp, &task) != OK) {
return;
}
//提高效率,先把资源让出来
while (lock) {
sched_yield();//让出cpu的执行权
}
//task.event.active = 0;
}
(void) thread_cond_destroy(&tp->cond);
(void) thread_mutex_destroy(&tp->mtx);
free(tp);
}
static void
thread_pool_exit_handler(void *data)
{
uint_t *lock = data;
*lock = 0;
pthread_exit(0);
}
thread_task_t *
thread_task_alloc(size_t size)//调用任务函数传输的参数大小size=sizeof(struct args)
{
thread_task_t *task;
task = calloc(1,sizeof(thread_task_t) + size);
if (task == NULL) {
return NULL;
}
task->ctx = task + 1;//偏移一个task,ctx指向到size的内存
return task;
}
int_t
thread_task_post(thread_pool_t *tp, thread_task_t *task)
{
if (thread_mutex_lock(&tp->mtx) != OK) {//线程池上锁
return ERROR;
}
//正在等待的任务数量已经达到最大
if (tp->waiting >= tp->max_queue) {
(void) thread_mutex_unlock(&tp->mtx);//解锁
fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n",
tp->name, tp->waiting);
return ERROR;
}
//task->event.active = 1;
task->id = thread_pool_task_id++;//static thread_pool_task_id
task->next = NULL;
//发送信号(用于同步互斥)
if (thread_cond_signal(&tp->cond) != OK) {
(void) thread_mutex_unlock(&tp->mtx);
return ERROR;
}
*tp->queue.last = task;
tp->queue.last = &task->next;
tp->waiting++;//等待的任务数++
(void) thread_mutex_unlock(&tp->mtx);
if(debug)fprintf(stderr,"task #%lu added to thread pool \"%s\"\n",
task->id, tp->name);
return OK;
}
static void *
thread_pool_cycle(void *data)
{
thread_pool_t *tp = data;
int err;
thread_task_t *task;
if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name);
for ( ;; ) {
if (thread_mutex_lock(&tp->mtx) != OK) {//上锁
return NULL;
}
//拿到任务将waiting--
tp->waiting--;
while (tp->queue.first == NULL) {//没有任务可以做
if (thread_cond_wait(&tp->cond, &tp->mtx)//挂起,等任务进来再唤醒
!= OK)
{
(void) thread_mutex_unlock(&tp->mtx);
return NULL;
}
}
task = tp->queue.first;
tp->queue.first = task->next;
if (tp->queue.first == NULL) {
tp->queue.last = &tp->queue.first;
}
if (thread_mutex_unlock(&tp->mtx) != OK) {
return NULL;
}
if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n",
task->id, tp->name);
//执行任务
task->handler(task->ctx);
if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name);
task->next = NULL;
//释放task
free(task);
//notify
}
}
static int_t
thread_pool_init_default(thread_pool_t *tpp, char *name)
{
if(tpp)
{
tpp->threads = DEFAULT_THREADS_NUM;
tpp->max_queue = DEFAULT_QUEUE_NUM;
tpp->name = strdup(name?name:"default");
if(debug)fprintf(stderr,
"thread_pool_init, name: %s ,threads: %lu max_queue: %ld\n",
tpp->name, tpp->threads, tpp->max_queue);
return OK;
}
return ERROR;
}
main.c
#include "thread_pool.h"
struct test{
int arg1;
int arg2;
};
void task_handler1(void* data){
static int index = 0;
printf("Hello, this is 1th test.index=%d\r\n", index++);
}
void task_handler2(void* data){
static int index = 0;
printf("Hello, this is 2th test.index=%d\r\n", index++);
}
void task_handler3(void* data){
static int index = 0;
struct test *t = (struct test *) data;
printf("Hello, this is 3th test.index=%d\r\n", index++);
printf("arg1: %d, arg2: %d\n", t->arg1, t->arg2);
}
int
main(int argc, char **argv)
{
thread_pool_t* tp = NULL;
int i = 0;
tp = thread_pool_init();
//sleep(1);
thread_task_t * test1 = thread_task_alloc(0);
thread_task_t * test2 = thread_task_alloc(0);
thread_task_t * test3 = thread_task_alloc(sizeof(struct test));//ctx指向test参数
test1->handler = task_handler1;
test2->handler = task_handler2;
test3->handler = task_handler3;
//改变参数
((struct test*)test3->ctx)->arg1 = 666;
((struct test*)test3->ctx)->arg2 = 888;
//进行任务投递
thread_task_post(tp, test1);
thread_task_post(tp, test2);
thread_task_post(tp, test3);
sleep(10);
thread_pool_destroy(tp);//将线程池销毁
}
编译
运行效果:
记住下面这两个类型重命名
typedef struct thread_task_s thread_task_t;
typedef struct thread_pool_s thread_pool_t;
thread_task_s
thread_pool_queue_t
thread_pool_s
1.thread_task_s
thread_task_s结构体表示一个任务"对象":
struct thread_task_s {
thread_task_t *next; //指向下一个任务
uint_t id; //用于标记一个任务
void *ctx; //指向线程所要执行的函数所需要的参数
void (*handler)(void *data);//指向线程所要执行的函数
};
在main函数中可以看到
/*
thread_task_alloc就是动态分配一个地址空间,
而所带参数则是除了分配thread_task_t这个结构体所需的地址空间,还额外分配的空间大小,
而这个额外的大小就是用于放该任务所要执行的函数的参数
*/
thread_task_t * test1 = thread_task_alloc(0);
所以在thread_pool.c中有task->ctx = task + 1;,来使ctx指向到时候函数执行时所带参数
的位置(相对于一开始就将参数的内存地址空间也一起初始化,并且参数地址和任务地址相邻)
//调用任务函数传输的参数大小size=sizeof(struct args)
thread_task_t *thread_task_alloc(size_t size)
{
thread_task_t *task;
task = calloc(1,sizeof(thread_task_t) + size);
if (task == NULL) {
return NULL;
}
task->ctx = task + 1;//偏移一个task,ctx指向到size的内存
return task;
}
2.thread_pool_queue_t
thread_pool_queue_t结构表示任务队列"对象":
typedef struct {
thread_task_t *first;//指向任务队列中队头的位置
thread_task_t **last;//指向任务队列中尾部的地址(注意这里是二级指针)
} thread_pool_queue_t;
3.thread_pool_s
thread_pool_s结构体表示一个线程池"对象":
struct thread_pool_s {
pthread_mutex_t mtx; //互斥量
thread_pool_queue_t queue; //任务队列
int_t waiting; //在任务队列中等待调度的任务数
pthread_cond_t cond; //条件变量
char *name; //线程池名称
uint_t threads; //线程池中线程的数目
int_t max_queue; //任务队列的最大长度
};
thread_pool_init()
thread_task_post(thread_pool_t *tp, thread_task_t *task)
thread_pool_cycle(void *data)
thread_pool_destroy(thread_pool_t *tp)
1.thread_pool_init()
thread_pool_init()是对线程池的初始化,其中包括对互斥量、条件变量、消息队列等成员的初始化!
thread_pool_t* thread_pool_init()
{
int err;
pthread_t tid;
uint_t n;
pthread_attr_t attr;
thread_pool_t *tp=NULL;
tp = calloc(1,sizeof(thread_pool_t));
if(tp == NULL){
fprintf(stderr, "thread_pool_init: calloc failed!\n");
}
//初始化线程池的名字、线程池中线程数目以及消息队列最长长度
thread_pool_init_default(tp, NULL);
//对任务队列进行初始化
thread_pool_queue_init(&tp->queue);
//初始化互斥量
if (thread_mutex_create(&tp->mtx) != OK) {
free(tp);
return NULL;
}
//初始化条件变量
if (thread_cond_create(&tp->cond) != OK) {
(void) thread_mutex_destroy(&tp->mtx);
free(tp);
return NULL;
}
err = pthread_attr_init(&attr);
if (err) {
fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
//PTHREAD_CREATE_DETACHED:在线程创建时将其属性红色为分离状态(detached),主线程使用pthread_join无法等待到结束的子线程
err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (err) {
fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
//tp->threads是线程池的线程数
for (n = 0; n < tp->threads; n++) {
//thread_pool_cycle四个进程启动后都会执行这个方法,所带参数为tp
err = pthread_create(&tid, &attr, thread_pool_cycle, tp);
if (err) {
fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
}
//将attr释放
(void) pthread_attr_destroy(&attr);
return tp;
}
对于消息队列的初始化方法是采用宏定义
的方式:
#define thread_pool_queue_init(q)\
(q)->first = NULL; \ //将队头设置为空
(q)->last = &(q)->first //将last指向队头的地址
2.thread_task_post(thread_pool_t *tp, thread_task_t *task)
thread_task_post(thread_pool_t *tp, thread_task_t *task)是将任务推送至指定线程池的任务队列中,其中有运用到对互斥量和条件变量的操作(确保任务队列作为一种临界资源)
int_t thread_task_post(thread_pool_t *tp, thread_task_t *task)
{
//线程池上锁
if (thread_mutex_lock(&tp->mtx) != OK) {
return ERROR;
}
//如果正在等待的任务数量已经达到最大,则插入任务失败
if (tp->waiting >= tp->max_queue) {
(void) thread_mutex_unlock(&tp->mtx);//解锁
fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n",
tp->name, tp->waiting);
return ERROR;
}
//标记线程id+1
task->id = thread_pool_task_id++;//static thread_pool_task_id
task->next = NULL;
//发送条件变量的唤醒信号(用于和线程处理函数保持联系)
if (thread_cond_signal(&tp->cond) != OK) {
(void) thread_mutex_unlock(&tp->mtx);
return ERROR;
}
//调整任务队列的队尾
*tp->queue.last = task;
tp->queue.last = &task->next;
//任务等待的任务数++
tp->waiting++;
(void) thread_mutex_unlock(&tp->mtx);
if(debug)fprintf(stderr,"task #%lu added to thread pool \"%s\"\n",
task->id, tp->name);
return OK;
}
3.thread_pool_cycle(void *data)
thread_pool_cycle(void *data)函数是在初始化线程池时,创建线程设置的线程执行函数,线程池作为参数void*data
,在这个函数里面实现了线程执行相应任务(在任务队列不空的情况下,哪个线程空闲就负责执行队头任务)在下面的函数中我又从头到尾注释了一遍,可以好好看看注释
static void *thread_pool_cycle(void *data)
{
//data指向的其实也是一个线程池
thread_pool_t *tp = data;
int err;
thread_task_t *task;
//调试信息
if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name);
//死循环,用于执行任务,只要有一个线程空闲那么这个线程就会不断检查是否有任务需要执行
for ( ;; ) {
//互斥锁上锁
if (thread_mutex_lock(&tp->mtx) != OK) {
return NULL;
}
//拿到任务将waiting--,任务队列中等待的任务数目-1
tp->waiting--;
//没有任务可以做
while (tp->queue.first == NULL) {
//使用条件变量挂起,等任务进来再唤醒,当有任务成功插入的时候thread_task_post函数中会发射信号
if (thread_cond_wait(&tp->cond, &tp->mtx)
!= OK)
{
(void) thread_mutex_unlock(&tp->mtx);
return NULL;
}
}
//取得队头任务
task = tp->queue.first;
//队头指向后一个任务
tp->queue.first = task->next;
if (tp->queue.first == NULL) {
tp->queue.last = &tp->queue.first;
}
//解锁
if (thread_mutex_unlock(&tp->mtx) != OK) {
return NULL;
}
//调试信息
if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n",
task->id, tp->name);
//执行任务
task->handler(task->ctx);
if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name);
task->next = NULL;
//释放task
free(task);
}
}
这里补充一个关于条件变量
的知识点,上面的thread_cond_wait(&tp->cond, &tp->mtx)本质是执行pthread_cond_wait(tp->cond, tp->mtx);,是为了等待别的线程发送信号,这样才能够唤醒这个线程,要不然会一直阻塞在这(并且在执行pthread_cond_wait时,会将互斥量进行unlock,当别的进程发送条件变量过来唤醒这个线程的时候,互斥量又会进行进行lock上锁)
,来看看在哪里发送相应的条件变量的信号👇
int_t thread_task_post(thread_pool_t *tp, thread_task_t *task)
{
....
....
....
task->id = thread_pool_task_id++;//static thread_pool_task_id
task->next = NULL;
//发送唤醒条件变量的信号(用于同步互斥)
if (thread_cond_signal(&tp->cond) != OK) {
(void) thread_mutex_unlock(&tp->mtx);
return ERROR;
}
....
....
....
return OK;
}
4.thread_pool_destroy(thread_pool_t *tp)
thread_pool_destroy(thread_pool_t *tp)函数用于销毁线程池,它这里的销毁线程池操作我感觉是非常牛皮的,销毁函数执行的时候不是说直接释放内存或者说直接摧毁线程池,它对于销毁线程还是采用插入任务的形式进行操作(这样也是为了让任务队列中的各个任务都执行完再销毁)
,让我们欣赏一下这优美的代码吧😀
void thread_pool_destroy(thread_pool_t *tp)
{
uint_t n;
thread_task_t task;
volatile uint_t lock;
memset(&task,'\0', sizeof(thread_task_t));
//将任务的执行函数指向thread_pool_exit_handler
/***********************************
thread_pool_exit_handler(void *data)
{
uint_t *lock = data;
*lock = 0;
//销毁线程
pthread_exit(0);
}
************************************/
task.handler = thread_pool_exit_handler;//执行自杀任务
//函数执行时参数指向lock,
task.ctx = (void *) &lock;
for (n = 0; n < tp->threads; n++) {
lock = 1;
if (thread_task_post(tp, &task) != OK) {
return;
}
//如果刚刚上面的“自杀”函数执行完,那么lock值应该会为0
//只有第一个线程释放完才会释放第二个
while (lock) {
//提高效率,先把资源让出来(让出cpu的执行权,让其他线程执行自杀任务)
sched_yield();
}
}
//销毁互斥量和条件变量
(void) thread_cond_destroy(&tp->cond);
(void) thread_mutex_destroy(&tp->mtx);
//释放线程池内存空间
free(tp);
}
整个线程池的实现过程还是涉及到了很多知识点,不可能说通过一篇文章全部讲清楚,所以我这里就挑了我自己感觉重要的一些地方写了出来,这篇文章也写了好几天了,尽管有不完美的地方,但还是很希望能够帮助到您,如果对于本篇文章的相关知识点有不一样看法的同学,也欢迎在评论区中指出😊