• Nginx线程池剖析


    一文搞懂Nginx线程池机制原理 - 知乎

    Nginx 的线程池与性能剖析 - imsoft - 博客园

    Linux网络编程】Nginx -- 线程池

    学习过程中参考过上述几篇文章,感谢!

    目录

    1.NgInx线程池配置

    2. NgInx线程池使用示例

    3.NgInx线程池数据结构

    1)Nginx数组结构

    2)线程池处理队列

    3)池管理组件

    4)线程池模块配置结构

    4.NgInx线程池执行流程

    1.线程池初始化

    2.添加任务到任务队列

    3.消耗任务

    4.  完成任务收尾工作

    5.  线程池销毁

    5.总结


    1.NgInx线程池配置

    使用线程池功能,首先需要在配置文件中添加如下配置项:

    1. location / {
    2. root /html;
    3. thread_pool default threads=32 max_queue=65536;
    4. aio threads=default;
    5. }

    上面定义了一个名为“default”,包含32个线程,任务队列最多支持65536个请求的线程池。如果任务队列过载,Nginx将输出如下错误日志并拒绝请求:

    thread pool "default" queue overflow: N tasks waiting

    如果出现上面的错误,说明线程池的负载很高,这是可以通过添加线程数来解决这个问题。当达到机器的最高处理能力之后,增加线程数并不能改善这个问题 。

    可在编译时使用如下选项可以启用线程池功能

    1. --with-threads

    2. --with-file-aio

    启用线程池功能,让请求排队等待处理,并且可以充分利用 CPU 提高处理效率,开启线程池需要 AIO 的支持,启用异步文件 IO (AIO) 一般用于大文件传输的场景;

    2. NgInx线程池使用示例

         在Ngnix过滤模块中,会涉及到对文件读写操作。将对磁盘读写的动作ngx_thread_read()交给线程池去处理。file->thread_handler相当于push任务操作。

    1. ssize_t
    2. ngx_thread_read(ngx_file_t *file, u_char *buf, size_t size, off_t offset,
    3. ngx_pool_t *pool)
    4. {
    5. ...
    6. ...
    7. ...
    8. task->handler = ngx_thread_read_handler;
    9. ...
    10. ...
    11. ...
    12. if (file->thread_handler(task, file) != NGX_OK) {
    13. return NGX_ERROR;
    14. }
    15. return NGX_AGAIN;
    16. }

    3.NgInx线程池数据结构

    1)Nginx数组结构

    在本文不做过多描述,内存池会详细陈述

    1. typedef struct ngx_array_s ngx_array_t;
    2. struct ngx_array_s {
    3. void *elts;
    4. ngx_uint_t nelts;
    5. size_t size;
    6. ngx_uint_t nalloc;
    7. ngx_pool_t *pool;
    8. };
    elts数据存储区
    nelts        数组元素个数
    size数组单个元素大小(字节)
    nalloc数组最大容量,当nelts = nalloc后如果还想继续存储,系统会分配一块新的内存,该内存是原内存两倍,原有数据会拷贝到新的内存中,继续存储数据        
    pool数组分配的所属内存池

    线程池数组初始化:

    1. typedef struct {
    2. ngx_array_t pools;
    3. } ngx_thread_pool_conf_t;

    2)线程池处理队列

    1. typedef struct {
    2. ngx_thread_task_t *first;
    3. ngx_thread_task_t **last;
    4. } ngx_thread_pool_queue_t;
    1. #define ngx_thread_pool_queue_init(q) \
    2. (q)->first = NULL; \
    3. (q)->last = &(q)->first

    这是一个双向链表,尾节点是一个二级指针, *last表示最后一个节点,**last指向上一个节点。使用二级指针构建链表非常香

    假设我要删除一个节点,按照常规操作,我得去定义一个临时变量,然后遍历链表,通过prev和pnext去删除。如果是一个二级指针**phead,phead = &(*phead)->next

    直接(*phead)= (*phead)->next即可删除

    3)池管理组件

    1. //池管理组件
    2. struct ngx_thread_pool_s {
    3. ngx_thread_mutex_t mtx; //锁
    4. ngx_thread_pool_queue_t queue; //消费队列
    5. ngx_int_t waiting; //等待任务数
    6. ngx_thread_cond_t cond; //条件变量
    7. ngx_log_t *log; //日志,多线程安全
    8. ngx_str_t name; //池名
    9. ngx_uint_t threads;//线程数量,默认32
    10. ngx_int_t max_queue;//最大任务数
    11. u_char *file; //线程池配置文件
    12. ngx_uint_t line; //线程池指令行号
    13. };

    4)线程池模块配置结构

    1. static ngx_command_t ngx_thread_pool_commands[] = {
    2. { ngx_string("thread_pool"),
    3. NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23, // NGX_MAIN_CONF|NGX_DIRECT_CONF配置文件对应的结构已经创建 NGX_CONF_TAKE23接受三个或者两个参数
    4. ngx_thread_pool,
    5. 0,
    6. 0,
    7. NULL },
    8. ngx_null_command
    9. };

    本质是调用以下模块

    1. struct ngx_command_t {
    2. ngx_str_t name;
    3. ngx_uint_t type;
    4. char *(*set)(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
    5. ngx_uint_t conf;
    6. ngx_uint_t offset;
    7. void *post;
    8. };
    name配置模块指令名称
    type        配置类型(指令属性集合)
    set                配置指令处理
    conf指定当前配置存储位置(使用哪个内存池)
    offset配置项存放位置
    post一般填0对配置不做处理

    4.NgInx线程池执行流程

    跟之前我写的线程池有些不同,但逻辑本质上还是跟以前一样的。NGNIX线程池实际上有三个队列:任务队列、执行队列、完成队列。

     

    1.线程池初始化

    当进程启动后首先初始化线程池.

    • 获取每个线程池配置参数 ngx_get_conf
    • 初始化线程池完成队列   ngx_thread_pool_queue_init
    • 对每个线程池进行初始化配置 
    1. static ngx_int_t
    2. ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
    3. {
    4. ngx_uint_t i;
    5. ngx_thread_pool_t **tpp;
    6. ngx_thread_pool_conf_t *tcf;
    7. if (ngx_process != NGX_PROCESS_WORKER
    8. && ngx_process != NGX_PROCESS_SINGLE)
    9. {
    10. return NGX_OK;
    11. }
    12. tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
    13. ngx_thread_pool_module);
    14. if (tcf == NULL) {
    15. return NGX_OK;
    16. }
    17. ngx_thread_pool_queue_init(&ngx_thread_pool_done);
    18. tpp = tcf->pools.elts;
    19. for (i = 0; i < tcf->pools.nelts; i++) {
    20. if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
    21. return NGX_ERROR;
    22. }
    23. }
    24. return NGX_OK;
    25. }

    ngx_thread_pool_init_worker()调用的是ngx_thread_pool_init(),详见注释

    1. static ngx_int_t
    2. ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
    3. {
    4. int err;
    5. pthread_t tid;
    6. ngx_uint_t n;
    7. pthread_attr_t attr;
    8. /*
    9. 要求必须有事件通知函数 ngx_notify,否则多线程无法工作
    10. ngx_event_actions.notify = ngx_notify。ngx_event_actions是一个外部全局变量
    11. */
    12. if (ngx_notify == NULL) {
    13. ngx_log_error(NGX_LOG_ALERT, log, 0,
    14. "the configured event method cannot be used with thread pools");
    15. return NGX_ERROR;
    16. }
    17. //初始化任务队列,为空
    18. ngx_thread_pool_queue_init(&tp->queue);
    19. /*
    20. ngx使用的锁类型是PTHREAD_MUTEX_ERRORCHECK,目的是防止死锁(当某个线程连续对锁操作的时候会返回EDEADLK)
    21. 调用API后将锁通过传参的方式传出,内部锁的属性被销毁
    22. */
    23. if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
    24. return NGX_ERROR;
    25. }
    26. if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
    27. (void) ngx_thread_mutex_destroy(&tp->mtx, log);
    28. return NGX_ERROR;
    29. }
    30. tp->log = log;
    31. err = pthread_attr_init(&attr);
    32. if (err) {
    33. ngx_log_error(NGX_LOG_ALERT, log, err,
    34. "pthread_attr_init() failed");
    35. return NGX_ERROR;
    36. }
    37. /*
    38. 设置线程是分离属性,目的是快速释放资源无需被别的线程等待、
    39. 无需join
    40. */
    41. err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    42. if (err) {
    43. ngx_log_error(NGX_LOG_ALERT, log, err,
    44. "pthread_attr_setdetachstate() failed");
    45. return NGX_ERROR;
    46. }
    47. #if 0
    48. err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
    49. if (err) {
    50. ngx_log_error(NGX_LOG_ALERT, log, err,
    51. "pthread_attr_setstacksize() failed");
    52. return NGX_ERROR;
    53. }
    54. #endif
    55. for (n = 0; n < tp->threads; n++) {
    56. err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
    57. if (err) {
    58. ngx_log_error(NGX_LOG_ALERT, log, err,
    59. "pthread_create() failed");
    60. return NGX_ERROR;
    61. }
    62. }
    63. /*
    64. pthread_attr_init(&attr);
    65. pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
    66. pthread_create(&tid,&attr,fn,arg);
    67. pthread_attr_destroy(&attr);
    68. 设置线程创建的属性是如上四步,需要注意的是,如果线程运行很快,可能在pthread_create返回之前就结束了
    69. 最好是在线程run函数内设置pthread_cond_wait等待之类的.但是不能使用wait,会导致整个进程休眠
    70. */
    71. (void) pthread_attr_destroy(&attr);
    72. return NGX_OK;
    73. }

    2.添加任务到任务队列

    当客户端向Ngnix发送请求的时候相当于一个任务,任务是主线程创建的(主线程负责处理客户端请求)主线程通过ngx_thread_task_post()函数向任务队列中添加一个任务。详见注释

    1. ngx_int_t
    2. ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
    3. {
    4. /*
    5. event.active 表示任务是否放到完成队列
    6. active = 1表示任务已经加入工作队列,
    7. */
    8. if (task->event.active) {
    9. ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
    10. "task #%ui already active", task->id);
    11. return NGX_ERROR;
    12. }
    13. //加锁
    14. if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
    15. return NGX_ERROR;
    16. }
    17. //如果等待的任务数大于最大任务书数,则失败
    18. if (tp->waiting >= tp->max_queue) {
    19. (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
    20. ngx_log_error(NGX_LOG_ERR, tp->log, 0,
    21. "thread pool \"%V\" queue overflow: %i tasks waiting",
    22. &tp->name, tp->waiting);
    23. return NGX_ERROR;
    24. }
    25. //表示任务添加到工作队列
    26. task->event.active = 1;
    27. //全局计数器,任务id++
    28. task->id = ngx_thread_pool_task_id++;
    29. task->next = NULL;
    30. //解锁加锁
    31. if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
    32. (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
    33. return NGX_ERROR;
    34. }
    35. //把任务添加到处理队列
    36. *tp->queue.last = task;
    37. tp->queue.last = &task->next;
    38. //待处理任务数量+1
    39. tp->waiting++;
    40. (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
    41. ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
    42. "task #%ui added to thread pool \"%V\"",
    43. task->id, &tp->name);
    44. return NGX_OK;
    45. }

    3.消耗任务

    • 消耗任务就是将任务分配给handler函数进行处理,执行 task->handler。
    • 处理完的任务添加至完成队列,触发 ngx_thread_pool_handler 函数处理。

    详见注释

    1. static void *
    2. ngx_thread_pool_cycle(void *data)
    3. {
    4. ngx_thread_pool_t *tp = data;
    5. int err;
    6. sigset_t set;
    7. ngx_thread_task_t *task;
    8. #if 0
    9. ngx_time_update();
    10. #endif
    11. ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0,
    12. "thread in pool \"%V\" started", &tp->name);
    13. sigfillset(&set); //将所有的信号加入到信号量集(屏蔽所有信号)
    14. sigdelset(&set, SIGILL); //从信号量集删除SIGILL信号(当来的这个信号可以进行处理)
    15. sigdelset(&set, SIGFPE);
    16. sigdelset(&set, SIGSEGV);
    17. sigdelset(&set, SIGBUS);
    18. err = pthread_sigmask(SIG_BLOCK, &set, NULL);
    19. /*
    20. 353-360表示上述设置此线程屏蔽除了SIGILL SIGFPE SIGSEGV SIGBUS以外的信号
    21. */
    22. if (err) {
    23. ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed");
    24. return NULL;
    25. }
    26. //从任务队列获取任务,执行 task->handler
    27. for ( ;; ) {
    28. if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
    29. return NULL;
    30. }
    31. /* 等待的任务数减一 */
    32. tp->waiting--;
    33. //如果任务队空,条件队列阻塞
    34. while (tp->queue.first == NULL) {
    35. if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
    36. != NGX_OK)
    37. {
    38. (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
    39. return NULL;
    40. }
    41. }
    42. /* 取一个任务 */
    43. task = tp->queue.first;
    44. tp->queue.first = task->next;
    45. /* 所有的线程都会处于等在状态,此时发送了一个signal_cond,假设线程A和B都收到了这个信号,
    46. 那么A或者B将分配一个任务(假设A收到将其从链表中删除)那么B工作折不会分配任务,此时是不能去回调的!!! */
    47. if (tp->queue.first == NULL) {
    48. tp->queue.last = &tp->queue.first;
    49. }
    50. if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
    51. return NULL;
    52. }
    53. #if 0
    54. ngx_time_update();
    55. #endif
    56. ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
    57. "run task #%ui in thread pool \"%V\"",
    58. task->id, &tp->name);
    59. /*处理任务 task->ctx 传递自定义文本 执行用户自定义的操作*/
    60. task->handler(task->ctx, tp->log);
    61. ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
    62. "complete task #%ui in thread pool \"%V\"",
    63. task->id, &tp->name);
    64. /*将此任务从链表中删除关联*/
    65. task->next = NULL;
    66. /*使用自旋锁保护 完成队列
    67. 自旋锁会一直停留在此,等待锁的状态改变
    68. 互斥锁睡眠等待的方式
    69. 使用自旋锁,cpu周期等待条件成立,反应迅速但耗cpu
    70. */
    71. ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
    72. /*将处理完毕的任务加入完成队列*/
    73. *ngx_thread_pool_done.last = task;
    74. ngx_thread_pool_done.last = &task->next;
    75. // 确保对内存操作按照正确的顺序执行
    76. // 要求处理器完成位于 ngx_memory_barrier 前面的内存操作后才处理后面操作
    77. ngx_memory_barrier();
    78. /*解锁自旋锁*/
    79. ngx_unlock(&ngx_thread_pool_done_lock);
    80. (void) ngx_notify(ngx_thread_pool_handler);
    81. }
    82. }

    4.  完成任务收尾工作

    任务处理完毕后加入完成队列,然后通知主线程。主线程收到通知后会在事件模块进行结束工作event->handler异步事件完成的回调函数(自定义)

    1. static void
    2. ngx_thread_pool_handler(ngx_event_t *ev)
    3. {
    4. ngx_event_t *event;
    5. ngx_thread_task_t *task;
    6. ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler");
    7. //使用自旋锁保护完成队列
    8. ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
    9. //取出已经完成的队列并将完成队列置空
    10. task = ngx_thread_pool_done.first;
    11. ngx_thread_pool_done.first = NULL;
    12. ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
    13. ngx_memory_barrier();
    14. ngx_unlock(&ngx_thread_pool_done_lock);
    15. while (task) {
    16. ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
    17. "run completion handler for task #%ui", task->id);
    18. //取出这个任务里的事件对象
    19. event = &task->event;
    20. task = task->next;
    21. //线程异步事件处理结束,不是很明白下两个标志位作用
    22. event->complete = 1;
    23. event->active = 0;
    24. //完成回调函数
    25. event->handler(event);
    26. }
    27. }

    5.  线程池销毁

    创建一个要求结束的线程,把任务data 置为 0,表示线程结束。

    注意:Nginx线程池销毁过程并不是发送一个全局信号销毁,在前一章我有测试过这种情况的弊端。Nginx采用的策略是死等while(lock)逐一线程释放。

    详见注释

    1. static void
    2. ngx_thread_pool_exit_handler(void *data, ngx_log_t *log)
    3. {
    4. ngx_uint_t *lock = data;
    5. *lock = 0;
    6. pthread_exit(0);
    7. }
    1. // 销毁线程池
    2. // 使用一个要求线程结束的 task,发给池里所有的线程
    3. // 最后销毁条件变量和互斥量
    4. static void
    5. ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
    6. {
    7. ngx_uint_t n;
    8. ngx_thread_task_t task;
    9. // lock 是一个简单的标志量,作为任务的 ctx 传递
    10. volatile ngx_uint_t lock;
    11. // 创建要求线程结束的 task
    12. ngx_memzero(&task, sizeof(ngx_thread_task_t));
    13. // 要求线程结束的任务,调用 pthread_exit
    14. task.handler = ngx_thread_pool_exit_handler;
    15. // lock 是一个简单的标志量,作为任务的 ctx 传递
    16. task.ctx = (void *) &lock;
    17. // 发送 tp->threads 个 task,逐个结束所有的线程
    18. for (n = 0; n < tp->threads; n++) {
    19. // 线程退出后将会被设置为 0
    20. lock = 1;
    21. // 把任务加入到线程池的队列
    22. if (ngx_thread_task_post(tp, &task) != NGX_OK) {
    23. return;
    24. }
    25. // 等待 task 被某个线程处理,从而结束一个线程
    26. while (lock) {
    27. // ngx_process.h:#define ngx_sched_yield() sched_yield()
    28. // 避免占用 cpu,让出主线程执行权,其他线程有机会执行
    29. ngx_sched_yield();
    30. }
    31. // event.active 表示任务是否已经放入任务队列
    32. task.event.active = 0;
    33. }
    34. // 销毁条件变量
    35. (void) ngx_thread_cond_destroy(&tp->cond, tp->log);
    36. // 销毁互斥量
    37. (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log);
    38. }

    5.总结

    1. Nginx线程池在释放资源的时候,并不是采用广播条件变量的方式(当广播条件变量的时候,可能在第二个线程释放完毕后第三个线程没来得及释放。线程销毁函数又重新解了锁,导致上述错误。)。采用逐一释放过程,并配合sched_yield() 避免占用 cpu,让出主线程执行权,其他线程有机会执行。
    2. 双向链表使用二级指针的好处,减少资源开销,且删除更快。
    3. 一个完成的管理组件应该具备哪些属性。即事件本身与事件外围应该考虑哪些东西
    4. 了解了Ngnix的Filter模块、Upstream模块、handler模块(比如模块上下文结构,本质一组糊回调函数指针,才创建前创建后等合适时间调用)
    5. 一些编程注意的问题:比如对某个线程屏蔽某些信号,自旋锁的使用,线程并发注意的一些问题。

    望指正!谢谢!

  • 相关阅读:
    KY30 进制转换
    esp32和ros2基础篇草稿-micro-ros-
    【基于递归混合尺度:无监督GAN:Pansharpening】
    【进阶C语言】C语言文件操作
    YOLO系列 --- YOLOV7算法(三):YOLO V7算法train.py代码解析
    一文搞懂什么是 PostCSS
    Sentinel学习(2)——sentinel的使用,引入依赖和配置 & 对消费者进行流控 & 对生产者进行熔断降级
    华为od 面试题及流程 (前后端)
    物理主外键与逻辑外键
    主键、外键和索引的区别
  • 原文地址:https://blog.csdn.net/qq_45604814/article/details/125955869