• 【C】高并发线程池设计


    高并发线程池设计

    并发基本概念

    • 所谓并发编程指的是在同一台计算机上"同时"处理多个任务。
    • 并发是在同一实体上的多个事件。

    处理事件过程出现阻塞

    • 漫长的CPU密集型处理。
    • 读取文件,但文件尚未缓存,从硬盘中读取较为缓慢。
    • 不得不等待获取某个资源:
      • 硬件驱动
      • 互斥锁
      • 等待同步方式调用的数据库响应
      • 网络上的请求和响应

    多线程的缺陷

    • 单个进程或线程同时只能处理一个任务,如果有很多请求需要同时处理怎么办?

    • 解决方案——运用多进程或多线程技术解决。

    • 缺陷:

      • 创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在处理实际用户请求的时间和资源要多的多。

      • 活动的线程需要消耗系统资源,如果启动太多线程,会导致系统由于过度消耗内存或"切换过度"而导致系统资源不足。

        • 线程切换时,该线程执行的相关信息会被保存在对应的上下文中,线程数越多,所用于切换的时间就越多。
    • 解决:——使用线程池技术。


    线程池

    • 线程池
      • 由一个任务队列和一组处理任务队列的线程组成。一旦工作进程需要处理某个可能"阻塞"的操作,不用自己操作,将其作为一个任务放到线程池的队列,接着会被某个空闲线程提取处理。
    • 注意:
      • 线程中的线程都要从任务队列中拿任务(同一个任务只允许一个线程拿到),会修改任务队列的链表,进程往里面加入新的任务也会修改任务队列的链表,二者无法同时修改,所以任务队列为临界资源,所以这里要实现同步与互斥。

    image-20220829113630258


    线程池的核心组件

    • 任务——待处理的工作,通常由标识、上下文和处理函数组成。
    • 任务队列——按顺序保存待处理的任务序列,等待线程中的线程组处理。
    • 线程池——由多个已启动的一组线程组成。
    • 条件变量——一种同步机制,允许线程挂起,知道共享数据上的某些条件得到满足。
    • 互斥锁——保证在任意时刻,只能有一个线程访问该对象。

    Nginx线程池解析

    • 注: 如下代码为本人看的某个视频中的资料,从Nginx中c抽下来的,貌似与最新的Nginx源码并不是很相同,因为经过删减,而且不是删减的最新版,但是大致意思我想应该是差不多的。😃
    执行流程
    1. 创建线程池并初始化。
      • 初始化开辟空间并进行相关默认设置及属性。
        • 创建互斥锁、条件变量。
        • 初始化任务队列。
        • 创建线程池中的线程。并启动线程。这里面涉及到互斥锁与条件变量,等待任务并进行取出,详情请看代码中的注释。这里为核心
    2. 分配任务内存
      • 任务结构体和其任务执行函数的参数内存一起分配。
    3. 指定任务的执行函数。
    4. 将任务放入线程池。
    5. 使用结束后销毁线程池。
      • 弄几个自杀任务放到任务队列中,等着线程们来取,然后依次自杀。
      • 之后销毁互斥锁、条件变量。
      • 最后free掉自己。

    主要数据结构
    任务结构体
    • thread_task_s
    struct thread_task_s {
        thread_task_t       *next;//下一个任务
        uint_t               id;//任务ID
        void                *ctx;//上下文,任务要带的参数
        void               (*handler)(void *data);//函数指针,具体执行的任务。
    };
    //起别名
    typedef struct thread_task_s  thread_task_t;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    分配任务内存
    • thread_task_alloc
    thread_task_t *
    thread_task_alloc(size_t size)
    {
        thread_task_t  *task;
    
        //任务内存+函数参数内存
        task = calloc(1,sizeof(thread_task_t) + size);
        if (task == NULL) {
            return NULL;
        }
        //task为thread_task_t指针
        //指针与常数相加
        task->ctx = task + 1;//task+1,此时指向的是任务函数的参数所在内存。
        return task;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    任务队列结构体
    • thread_pool_queue_t
    typedef struct {
        thread_task_t        *first;//指向第一个元素
        thread_task_t        **last;//指向最后一个结点
    } thread_pool_queue_t;//任务队列,单链表结构。
    
    • 1
    • 2
    • 3
    • 4
    • 补充:此单链表不同于我们在学数据结构时的那种定义,这里使用了二级指针,我感觉还是挺有意思的。相关的插入、取出操作在下面的相关线程池代码中有,这里我们提前拿出来先看一看。
    • 任务队列定义:如上所示,这里我们重复写一下,这样更方便顺序看。
    typedef struct {
       thread_task_t        *first;//指向第一个元素
       thread_task_t        **last;//指向最后一个结点,通过last来链接。
    } thread_pool_queue_t;//任务队列,单链表结构。
    
    • 1
    • 2
    • 3
    • 4
    • 插入操作:
    thread_task_t       *task;
    //task为任务,thread_task_t类型,将先将next置空。
    task->next = NULL;
    
    //*last其实就是first,即first=task
    *tp->queue.last = task;
    //注意last=&task->next,即目前task保存的是first后一结点的地址(注意这里是二级指针,这里我指一级指针为结点,二级指针就是结点的地址,即,next指针的地址)。
    tp->queue.last = &task->next;
    
    /*我们接着模拟第二次插入
    接着将上一个task的next = 本次要链接的task
    接着拿到本次要链接的task的下一个结点的地址,next指针的地址。
    */
       
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 取出操作:
    task = tp->queue.first;//取出第一个
    tp->queue.first = task->next;//首结点指针后移
    
    if (tp->queue.first == NULL) {//任务队列空了,回到最初的状态,重新准备链接。
       tp->queue.last = &tp->queue.first;
    }	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    线程池结构体
    • thread_pool_s
    struct thread_pool_s {
        pthread_mutex_t        mtx;         //互斥锁
        thread_pool_queue_t   queue;        //任务队列
        int_t                 waiting;      //线程池中没有处理的任务还有多少
        pthread_cond_t         cond;        //线程条件变量
    
        char                  *name;        //线程池的名字
        uint_t                threads;      //线程池中线程的数量
        int_t                 max_queue;    //任务队列最多能够容纳多少个任务
    };
    //别名
    typedef struct thread_pool_s  thread_pool_t;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    线程池的初始化
    • thread_pool_init()
    thread_pool_t* thread_pool_init()
    {
        int             err;
        pthread_t       tid;
        uint_t          n;
        pthread_attr_t  attr;//线程属性设置结构体
    	thread_pool_t   *tp=NULL;
    
    	tp = calloc(1,sizeof(thread_pool_t));
    
    	if(tp == NULL){
    	    fprintf(stderr, "thread_pool_init: calloc failed!\n");
    	}
    
    	thread_pool_init_default(tp, NULL);//线程池部分属性默认设置
        thread_pool_queue_init(&tp->queue);//线程池任务队列初始化
        if (thread_mutex_create(&tp->mtx) != OK) {//创建互斥锁
    		free(tp);
            return NULL;
        }
    
        if (thread_cond_create(&tp->cond) != OK) {//创建条件变量
            (void) thread_mutex_destroy(&tp->mtx);
    		free(tp);
            return NULL;
        }
        //线程属性初始化
        err = pthread_attr_init(&attr);
        if (err) {
            fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno));
    		free(tp);
            return NULL;
        }
        //在线程创建时,将其属性设置为分离状态。
        //主线程使用pthread_join无法等待该子线程。
        //即无法再捕捉该子线程的状态
        err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
        if (err) {
            fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno));
    		free(tp);
            return NULL;
        }
        for (n = 0; n < tp->threads; n++) {
            //线程的创建
            err = pthread_create(&tid, &attr, thread_pool_cycle, tp);
            if (err) {
                fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno));
    			free(tp);
                return NULL;
            }
        }
        (void) pthread_attr_destroy(&attr);
    
        return tp;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    线程池任务队列初始化
    • thread_pool_queue_init
    #define thread_pool_queue_init(q)                                         \
        (q)->first = NULL;                                                    \
        (q)->last = &(q)->first
    
    • 1
    • 2
    • 3

    线程池中线程的启动
    • thread_pool_cycle
    static void *
    thread_pool_cycle(void *data)
    {
        thread_pool_t *tp = data;//所在线程池,在创建线程的时候传递过来
        int                 err;
        thread_task_t       *task;
        if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name);
        for ( ;; ) {
            //上锁
            if (thread_mutex_lock(&tp->mtx) != OK) {
                return NULL;
            }
            tp->waiting--;//等待的线程--
            while (tp->queue.first == NULL) {//没有任务
                //等待信号,先挂起,然后开锁。——等任务队列中有任务。
                //被唤醒时,先上锁,然后正式被唤醒。
                if (thread_cond_wait(&tp->cond, &tp->mtx)
                    != OK)
                {
                    (void) thread_mutex_unlock(&tp->mtx);//防御型编程,开锁。
                    return NULL;
                }
            }
    		//从任务队列中拿任务
            task = tp->queue.first;
            tp->queue.first = task->next;
    		
            //如果取出一个任务后,任务队列又空了,重新设置last指向。
            if (tp->queue.first == NULL) {
                tp->queue.last = &tp->queue.first;
            }
    	    //开锁
            if (thread_mutex_unlock(&tp->mtx) != OK) {
                return NULL;
            }
            if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n",
                           task->id, tp->name);
            task->handler(task->ctx);//当前执行任务函数,task->ctx为函数参数
            if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name);
            task->next = NULL;
            free(task);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    往线程池中投递任务
    • thread_task_post
    int_t
    thread_task_post(thread_pool_t *tp, thread_task_t *task)
    {
        if (thread_mutex_lock(&tp->mtx) != OK) {//上锁
            return ERROR;
        }
    
        //任务队列尾邻接资源,进行互斥访问。
        if (tp->waiting >= tp->max_queue) {//线程池等待任务队列是否达到极限
            (void) thread_mutex_unlock(&tp->mtx);
    
            fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n",
                          tp->name, tp->waiting);
            return ERROR;
        }
    
        //task->event.active = 1;
    
        task->id = thread_pool_task_id++;//任务id++
        task->next = NULL;
    
        //发送一个信号,唤醒一个线程,之后该线程就能从任务队列中获取任务,进行执行。
        if (thread_cond_signal(&tp->cond) != OK) {
            (void) thread_mutex_unlock(&tp->mtx);
            return ERROR;
        }
    
        //一开始的时候last,默认指向的值first的地址。
        //所以此时给*tp->queue.last赋值后,first = tast
        //返回last为空,还是尾插法。
        *tp->queue.last = task;
        tp->queue.last = &task->next;
    
        tp->waiting++;
    
        (void) thread_mutex_unlock(&tp->mtx);
    
        if(debug)fprintf(stderr,"task #%lu added to thread pool \"%s\"\n",
                       task->id, tp->name);
    
        return OK;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    销毁线程池
    • thread_pool_destroy
    void thread_pool_destroy(thread_pool_t *tp)
    {
        uint_t           n;
        thread_task_t    task;
        volatile uint_t  lock;
    
        memset(&task,'\0', sizeof(thread_task_t));
    
        task.handler = thread_pool_exit_handler;//给一个自杀任务
        task.ctx = (void *) &lock;//参数
    
        for (n = 0; n < tp->threads; n++) {
            lock = 1;
    
            if (thread_task_post(tp, &task) != OK) {//投递任务
                return;
            }
    
            while  (lock) {//自杀任务中,会将lock置为0,终止循环。
                sched_yield();//当前线程放弃CPU的优先权,让出CPU的执行权,让别的线程得到更多的执行机会。
            }
    
        }
    
        (void) thread_cond_destroy(&tp->cond);//清理条件变量
        (void) thread_mutex_destroy(&tp->mtx);//清理互斥锁
    	free(tp);//释放线程池
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    线程自杀任务
    • thread_pool_exit_handler
    static void
    thread_pool_exit_handler(void *data)
    {
        uint_t *lock = data;
    
        *lock = 0;
    
        pthread_exit(0);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    示例
    #include "thread_pool.h"
    
    struct test{
    
      int arg1;
    
      int arg2;
    
    };
    
    
    
    void task_handler1(void* data){
    
      static int index = 0;
    
      printf("Hello, this is 1th test.index=%d\r\n", index++);
    
    }
    
    
    
    void task_handler2(void* data){
    
      static int index = 0;
    
      printf("Hello, this is 2th test.index=%d\r\n", index++);
    
    }
    
    
    
    void task_handler3(void* data){
    
      static int index = 0;
    
      struct test *t = (struct test *) data;
    
     
    
      printf("Hello, this is 3th test.index=%d\r\n", index++);
    
      printf("arg1: %d, arg2: %d\n", t->arg1, t->arg2);
    
    }
    
    int
    
    main(int argc, char **argv)
    
    {
    
      thread_pool_t* tp = NULL;//定义一个线程池指针
    
      int i = 0;
    
      
    
      tp = thread_pool_init(); //线程池初始化
    
    
    
      //分配任务内存
    
      thread_task_t * test1 = thread_task_alloc(0);
    
      thread_task_t * test2 = thread_task_alloc(0);
    
      thread_task_t * test3 = thread_task_alloc(sizeof(struct test));
    
      //指定任务
    
      test1->handler = task_handler1;
    
      test2->handler = task_handler2;
    
      test3->handler = task_handler3;
    
    
    
      //通过结构体指定参数
    
      ((struct test*)test3->ctx)->arg1 = 666;
    
      ((struct test*)test3->ctx)->arg2 = 888;
    
    
    
      //任务放入线程池
    
      thread_task_post(tp, test1);
    
      thread_task_post(tp, test2);
    
      thread_task_post(tp, test3);
    
    
    
      sleep(10);
    
      thread_pool_destroy(tp);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    image-20220829105827283


    补充
    • volatile关键字:
      • 有些变量是用 volatile 关键字声明的。当两个线程都要用到某一个变量且该变量的值会被改变时,应该用 volatile 声明,该关键字的作用是防止优化编译器把变量从内存装入 CPU 寄存器中。如果变量被装入寄存器,那么两个线程有可能一个使用内存中的变量,一个使用寄存器中的变量,这会造成程序的错误执行。volatile 的意思是让编译器每次操作该变量时一定要从内存中真正取出,而不是使用已经存在寄存器中的值。来源-菜鸟教程-C/C++ 中 volatile 关键字详解-多线程下的volatile

  • 相关阅读:
    深入理解 Java 泛型
    【聚类算法】带你轻松搞懂K-means聚类(含代码以及详细解释)
    linux环境安装SVN,以及常用的SVN操作
    论文详读《基于改进 LeNet-5 模型的手写体中文识别》,未完待补充
    【算法leetcode】2325. 解密消息(rust和go重拳出击)
    点云从入门到精通技术详解100篇-基于 CRF 模型语义优化的车载激光点云分类(续)
    jenkins-用户权限管理
    轻量级网络 ESPNetv2
    在亚马逊云科技Amazon SageMaker上部署构建聊天机器人的开源大语言模型
    深度学习的进展
  • 原文地址:https://blog.csdn.net/qq_51604330/article/details/126582270