Nginx 的线程池与性能剖析 - imsoft - 博客园
学习过程中参考过上述几篇文章,感谢!
目录
使用线程池功能,首先需要在配置文件中添加如下配置项:
- location / {
- root /html;
- thread_pool default threads=32 max_queue=65536;
- aio threads=default;
- }
上面定义了一个名为“default”,包含32个线程,任务队列最多支持65536个请求的线程池。如果任务队列过载,Nginx将输出如下错误日志并拒绝请求:
thread pool "default" queue overflow: N tasks waiting
如果出现上面的错误,说明线程池的负载很高,这是可以通过添加线程数来解决这个问题。当达到机器的最高处理能力之后,增加线程数并不能改善这个问题 。
可在编译时使用如下选项可以启用线程池功能
--with-threads
--with-file-aio启用线程池功能,让请求排队等待处理,并且可以充分利用 CPU 提高处理效率,开启线程池需要 AIO 的支持,启用异步文件 IO (AIO) 一般用于大文件传输的场景;
在Ngnix过滤模块中,会涉及到对文件读写操作。将对磁盘读写的动作ngx_thread_read()交给线程池去处理。file->thread_handler相当于push任务操作。
- ssize_t
- ngx_thread_read(ngx_file_t *file, u_char *buf, size_t size, off_t offset,
- ngx_pool_t *pool)
- {
-
- ...
- ...
- ...
-
- task->handler = ngx_thread_read_handler;
- ...
- ...
- ...
-
- if (file->thread_handler(task, file) != NGX_OK) {
- return NGX_ERROR;
- }
-
- return NGX_AGAIN;
- }
在本文不做过多描述,内存池会详细陈述
- typedef struct ngx_array_s ngx_array_t;
- struct ngx_array_s {
- void *elts;
- ngx_uint_t nelts;
- size_t size;
- ngx_uint_t nalloc;
- ngx_pool_t *pool;
- };
| elts | 数据存储区 |
| nelts | 数组元素个数 |
| size | 数组单个元素大小(字节) |
| nalloc | 数组最大容量,当nelts = nalloc后如果还想继续存储,系统会分配一块新的内存,该内存是原内存两倍,原有数据会拷贝到新的内存中,继续存储数据 |
| pool | 数组分配的所属内存池 |
线程池数组初始化:
- typedef struct {
- ngx_array_t pools;
- } ngx_thread_pool_conf_t;
- typedef struct {
- ngx_thread_task_t *first;
- ngx_thread_task_t **last;
- } ngx_thread_pool_queue_t;
- #define ngx_thread_pool_queue_init(q) \
- (q)->first = NULL; \
- (q)->last = &(q)->first
这是一个双向链表,尾节点是一个二级指针, *last表示最后一个节点,**last指向上一个节点。使用二级指针构建链表非常香

假设我要删除一个节点,按照常规操作,我得去定义一个临时变量,然后遍历链表,通过prev和pnext去删除。如果是一个二级指针**phead,phead = &(*phead)->next
直接(*phead)= (*phead)->next即可删除
- //池管理组件
- struct ngx_thread_pool_s {
- ngx_thread_mutex_t mtx; //锁
- ngx_thread_pool_queue_t queue; //消费队列
- ngx_int_t waiting; //等待任务数
- ngx_thread_cond_t cond; //条件变量
-
- ngx_log_t *log; //日志,多线程安全
-
- ngx_str_t name; //池名
- ngx_uint_t threads;//线程数量,默认32
- ngx_int_t max_queue;//最大任务数
-
- u_char *file; //线程池配置文件
- ngx_uint_t line; //线程池指令行号
- };
- static ngx_command_t ngx_thread_pool_commands[] = {
-
- { ngx_string("thread_pool"),
- NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23, // NGX_MAIN_CONF|NGX_DIRECT_CONF配置文件对应的结构已经创建 NGX_CONF_TAKE23接受三个或者两个参数
- ngx_thread_pool,
- 0,
- 0,
- NULL },
-
- ngx_null_command
- };
本质是调用以下模块
- struct ngx_command_t {
- ngx_str_t name;
- ngx_uint_t type;
- char *(*set)(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
- ngx_uint_t conf;
- ngx_uint_t offset;
- void *post;
- };
| name | 配置模块指令名称 |
| type | 配置类型(指令属性集合) |
| set | 配置指令处理 |
| conf | 指定当前配置存储位置(使用哪个内存池) |
| offset | 配置项存放位置 |
| post | 一般填0对配置不做处理 |
跟之前我写的线程池有些不同,但逻辑本质上还是跟以前一样的。NGNIX线程池实际上有三个队列:任务队列、执行队列、完成队列。

当进程启动后首先初始化线程池.
- static ngx_int_t
- ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
- {
- ngx_uint_t i;
- ngx_thread_pool_t **tpp;
- ngx_thread_pool_conf_t *tcf;
-
- if (ngx_process != NGX_PROCESS_WORKER
- && ngx_process != NGX_PROCESS_SINGLE)
- {
- return NGX_OK;
- }
-
- tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
- ngx_thread_pool_module);
-
- if (tcf == NULL) {
- return NGX_OK;
- }
-
- ngx_thread_pool_queue_init(&ngx_thread_pool_done);
-
- tpp = tcf->pools.elts;
-
- for (i = 0; i < tcf->pools.nelts; i++) {
- if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
- return NGX_ERROR;
- }
- }
-
- return NGX_OK;
- }
ngx_thread_pool_init_worker()调用的是ngx_thread_pool_init(),详见注释
- static ngx_int_t
- ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
- {
- int err;
- pthread_t tid;
- ngx_uint_t n;
- pthread_attr_t attr;
-
- /*
- 要求必须有事件通知函数 ngx_notify,否则多线程无法工作
- ngx_event_actions.notify = ngx_notify。ngx_event_actions是一个外部全局变量
- */
- if (ngx_notify == NULL) {
- ngx_log_error(NGX_LOG_ALERT, log, 0,
- "the configured event method cannot be used with thread pools");
- return NGX_ERROR;
- }
- //初始化任务队列,为空
- ngx_thread_pool_queue_init(&tp->queue);
-
- /*
- ngx使用的锁类型是PTHREAD_MUTEX_ERRORCHECK,目的是防止死锁(当某个线程连续对锁操作的时候会返回EDEADLK)
- 调用API后将锁通过传参的方式传出,内部锁的属性被销毁
- */
- if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
- return NGX_ERROR;
- }
-
- if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
- (void) ngx_thread_mutex_destroy(&tp->mtx, log);
- return NGX_ERROR;
- }
-
- tp->log = log;
-
- err = pthread_attr_init(&attr);
- if (err) {
- ngx_log_error(NGX_LOG_ALERT, log, err,
- "pthread_attr_init() failed");
- return NGX_ERROR;
- }
- /*
- 设置线程是分离属性,目的是快速释放资源无需被别的线程等待、
- 无需join
- */
- err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (err) {
- ngx_log_error(NGX_LOG_ALERT, log, err,
- "pthread_attr_setdetachstate() failed");
- return NGX_ERROR;
- }
-
- #if 0
- err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
- if (err) {
- ngx_log_error(NGX_LOG_ALERT, log, err,
- "pthread_attr_setstacksize() failed");
- return NGX_ERROR;
- }
- #endif
-
- for (n = 0; n < tp->threads; n++) {
- err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
- if (err) {
- ngx_log_error(NGX_LOG_ALERT, log, err,
- "pthread_create() failed");
- return NGX_ERROR;
- }
- }
- /*
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
- pthread_create(&tid,&attr,fn,arg);
- pthread_attr_destroy(&attr);
- 设置线程创建的属性是如上四步,需要注意的是,如果线程运行很快,可能在pthread_create返回之前就结束了
- 最好是在线程run函数内设置pthread_cond_wait等待之类的.但是不能使用wait,会导致整个进程休眠
- */
- (void) pthread_attr_destroy(&attr);
-
- return NGX_OK;
- }
当客户端向Ngnix发送请求的时候相当于一个任务,任务是主线程创建的(主线程负责处理客户端请求)主线程通过ngx_thread_task_post()函数向任务队列中添加一个任务。详见注释。
- ngx_int_t
- ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
- {
- /*
- event.active 表示任务是否放到完成队列
- active = 1表示任务已经加入工作队列,
- */
- if (task->event.active) {
- ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
- "task #%ui already active", task->id);
- return NGX_ERROR;
- }
- //加锁
- if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
- return NGX_ERROR;
- }
- //如果等待的任务数大于最大任务书数,则失败
- if (tp->waiting >= tp->max_queue) {
- (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
-
- ngx_log_error(NGX_LOG_ERR, tp->log, 0,
- "thread pool \"%V\" queue overflow: %i tasks waiting",
- &tp->name, tp->waiting);
- return NGX_ERROR;
- }
- //表示任务添加到工作队列
- task->event.active = 1;
- //全局计数器,任务id++
- task->id = ngx_thread_pool_task_id++;
- task->next = NULL;
- //解锁加锁
- if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
- (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
- return NGX_ERROR;
- }
- //把任务添加到处理队列
- *tp->queue.last = task;
- tp->queue.last = &task->next;
- //待处理任务数量+1
- tp->waiting++;
-
- (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
-
- ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
- "task #%ui added to thread pool \"%V\"",
- task->id, &tp->name);
-
- return NGX_OK;
- }
详见注释。
- static void *
- ngx_thread_pool_cycle(void *data)
- {
- ngx_thread_pool_t *tp = data;
-
- int err;
- sigset_t set;
- ngx_thread_task_t *task;
-
- #if 0
- ngx_time_update();
- #endif
-
- ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0,
- "thread in pool \"%V\" started", &tp->name);
-
- sigfillset(&set); //将所有的信号加入到信号量集(屏蔽所有信号)
-
- sigdelset(&set, SIGILL); //从信号量集删除SIGILL信号(当来的这个信号可以进行处理)
- sigdelset(&set, SIGFPE);
- sigdelset(&set, SIGSEGV);
- sigdelset(&set, SIGBUS);
-
- err = pthread_sigmask(SIG_BLOCK, &set, NULL);
- /*
- 353-360表示上述设置此线程屏蔽除了SIGILL SIGFPE SIGSEGV SIGBUS以外的信号
- */
- if (err) {
- ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed");
- return NULL;
- }
- //从任务队列获取任务,执行 task->handler
- for ( ;; ) {
- if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
- return NULL;
- }
-
- /* 等待的任务数减一 */
- tp->waiting--;
- //如果任务队空,条件队列阻塞
- while (tp->queue.first == NULL) {
- if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
- != NGX_OK)
- {
- (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
- return NULL;
- }
- }
- /* 取一个任务 */
- task = tp->queue.first;
- tp->queue.first = task->next;
-
- /* 所有的线程都会处于等在状态,此时发送了一个signal_cond,假设线程A和B都收到了这个信号,
- 那么A或者B将分配一个任务(假设A收到将其从链表中删除)那么B工作折不会分配任务,此时是不能去回调的!!! */
- if (tp->queue.first == NULL) {
- tp->queue.last = &tp->queue.first;
- }
-
- if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
- return NULL;
- }
-
- #if 0
- ngx_time_update();
- #endif
-
- ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
- "run task #%ui in thread pool \"%V\"",
- task->id, &tp->name);
- /*处理任务 task->ctx 传递自定义文本 执行用户自定义的操作*/
- task->handler(task->ctx, tp->log);
-
- ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
- "complete task #%ui in thread pool \"%V\"",
- task->id, &tp->name);
- /*将此任务从链表中删除关联*/
- task->next = NULL;
- /*使用自旋锁保护 完成队列
- 自旋锁会一直停留在此,等待锁的状态改变
- 互斥锁睡眠等待的方式
- 使用自旋锁,cpu周期等待条件成立,反应迅速但耗cpu
- */
- ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
-
- /*将处理完毕的任务加入完成队列*/
- *ngx_thread_pool_done.last = task;
- ngx_thread_pool_done.last = &task->next;
-
- // 确保对内存操作按照正确的顺序执行
- // 要求处理器完成位于 ngx_memory_barrier 前面的内存操作后才处理后面操作
- ngx_memory_barrier();
-
- /*解锁自旋锁*/
- ngx_unlock(&ngx_thread_pool_done_lock);
-
- (void) ngx_notify(ngx_thread_pool_handler);
- }
- }
任务处理完毕后加入完成队列,然后通知主线程。主线程收到通知后会在事件模块进行结束工作event->handler异步事件完成的回调函数(自定义)
- static void
- ngx_thread_pool_handler(ngx_event_t *ev)
- {
- ngx_event_t *event;
- ngx_thread_task_t *task;
-
- ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler");
-
- //使用自旋锁保护完成队列
- ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
-
- //取出已经完成的队列并将完成队列置空
- task = ngx_thread_pool_done.first;
- ngx_thread_pool_done.first = NULL;
- ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
-
- ngx_memory_barrier();
-
- ngx_unlock(&ngx_thread_pool_done_lock);
-
- while (task) {
- ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
- "run completion handler for task #%ui", task->id);
- //取出这个任务里的事件对象
- event = &task->event;
- task = task->next;
- //线程异步事件处理结束,不是很明白下两个标志位作用
- event->complete = 1;
- event->active = 0;
- //完成回调函数
- event->handler(event);
- }
- }
创建一个要求结束的线程,把任务data 置为 0,表示线程结束。
注意:Nginx线程池销毁过程并不是发送一个全局信号销毁,在前一章我有测试过这种情况的弊端。Nginx采用的策略是死等while(lock)逐一线程释放。
详见注释
-
- static void
- ngx_thread_pool_exit_handler(void *data, ngx_log_t *log)
- {
- ngx_uint_t *lock = data;
-
- *lock = 0;
-
- pthread_exit(0);
- }
- // 销毁线程池
- // 使用一个要求线程结束的 task,发给池里所有的线程
- // 最后销毁条件变量和互斥量
- static void
- ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
- {
- ngx_uint_t n;
- ngx_thread_task_t task;
-
- // lock 是一个简单的标志量,作为任务的 ctx 传递
- volatile ngx_uint_t lock;
-
- // 创建要求线程结束的 task
- ngx_memzero(&task, sizeof(ngx_thread_task_t));
-
- // 要求线程结束的任务,调用 pthread_exit
- task.handler = ngx_thread_pool_exit_handler;
-
- // lock 是一个简单的标志量,作为任务的 ctx 传递
- task.ctx = (void *) &lock;
-
- // 发送 tp->threads 个 task,逐个结束所有的线程
- for (n = 0; n < tp->threads; n++) {
- // 线程退出后将会被设置为 0
- lock = 1;
-
- // 把任务加入到线程池的队列
- if (ngx_thread_task_post(tp, &task) != NGX_OK) {
- return;
- }
-
- // 等待 task 被某个线程处理,从而结束一个线程
- while (lock) {
- // ngx_process.h:#define ngx_sched_yield() sched_yield()
- // 避免占用 cpu,让出主线程执行权,其他线程有机会执行
- ngx_sched_yield();
- }
-
- // event.active 表示任务是否已经放入任务队列
- task.event.active = 0;
- }
-
- // 销毁条件变量
- (void) ngx_thread_cond_destroy(&tp->cond, tp->log);
-
- // 销毁互斥量
- (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log);
- }
望指正!谢谢!