• 线程池原理与实现


    1. 线程池是什么

    线程池一种线程使用模式。

    因为线程过多会带来调度开销,影响缓存局部性和整体性能。线程池是用来维护着多个线程,等待监督管理者分配可并发执行的任务

    2. 线程池的优点:

    1. 解决在处理短时间任务时创建与销毁线程的代价
    2. 线程池不仅能够保证内核充分利用多线程,还能防止过分调度。

    注意点:

    线程池中可用线程的数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

    3. 线程池的应用场景

    a. 需要大量的线程来完成任务,并且要求完成任务的时间比较短。

    比如Web服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。

    b. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

    对于长时间的任务,比如Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。

    c. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。

    突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但短时间内产生大量线程可能使内存到达极限,出现错误。

    问题:创建的线程越多性能越高?

    答:不正确。一开始,程序的运行效率会随着线程数量的增加而逐渐变高。当线程数量增加临界值时,线程数量继续增加,程序的运行效率会减少。(由于线程的频繁切换影响了线程运行效率)。

    4. 线程池的实现

    4.1 线程池实现原理

            线程池由一个阻塞任务队列加上多个线程实现,线程池中的线程可以从阻塞任务队列中获取任务然后进行任务处理,当线程都处于繁忙状态时可以将任务加入阻塞队列中,等到其它的线程空闲后进行处理。

    4.2 线程池基本框架

    线程池的主体是一个任务队列和n个线程(工作队列):

    a.任务队列用来保存外界传入的需要解决的任务。

    b.而线程池中的n个线程负责一直从任务队列中拿出任务并解决。

    另外还需要一把互斥锁和一个条件变量:

            互斥锁:用来保护任务队列的数据安全,即维护多线程从任务队列中pop任务时的互斥关系。

            条件变量:用来维护多线程之间的同步关系,当任务队列为空时要求线程释放互斥锁并在条件变量下等待,这时任务队列中每插入一个任务就唤醒一个线程。

    4.3 结构体:

    分别为任务队列(生产者)、工作队列(消费者)、管理组件(互斥锁和条件变量)。

    1. //任务队列(生产者) 链表
    2. struct nTask{
    3. void (*task_func)(struct nTask* task); //任务函数回调
    4. void* user_data; //任务用户数据
    5. struct nTask* prev;
    6. struct nTask* next;
    7. };
    8. //工作队列(消费者) 链表
    9. struct nWorker{
    10. pthread_t threadid; //工作线程id
    11. struct nManager* manager;
    12. struct nWorker* prev;
    13. struct nWorker* next;
    14. };
    15. //管理互斥锁和条件变量
    16. typedef struct nManager {
    17. struct nTask* tasks; //任务队列
    18. struct nWorker* workers;//工作队列
    19. pthread_mutex_t mutex; //互斥锁
    20. pthread_cond_t cond; //条件变量
    21. } ThreadPool;

    4.4 提供的接口

    1. //线程池工作线程创建
    2. int nThreadPoolCreate(ThreadPool* pool, int numWorkers)
    3. //唤醒全部工作队列中的线程(销毁线程池)
    4. int nThreadPoolDestory(ThreadPool* pool, int nWorker)
    5. //唤醒单个工作队列中的线程
    6. int nThreadPoolPushTask(ThreadPool* pool, struct nTask* task)

    4.5 线程池测试代码

            创建10thread线程加入工作队列,阻塞等待。随后创建了1000task任务,写入 task的数据,去唤醒工作队列中的线程,管理组件判断任务有无,调用taskfunc来执行任务。

    1. #include
    2. #include
    3. #include
    4. #include
    5. #define THREADPOOL_INIT_COUNT 20 //工作线程数量
    6. #define TASK_INIT_SIZE 1000 //任务数量
    7. //双向链表插入(头插),list为头结点
    8. #define LIST_INSERT(item, list) do{ \
    9. item->prev = NULL; \
    10. item->next =list; \
    11. if((list) != NULL)(list)->prev = item; \
    12. (list) = item; \
    13. }while(0)
    14. //双向链表删除,list为头结点
    15. #define LIST_REMOVE(item, list) do{ \
    16. if(item->prev != NULL)item->prev->next = item->next;\
    17. if(item->next != NULL)item->next->prev = item->prev;\
    18. if(item == list)list = item->next;\
    19. item->prev = item->next = NULL;\
    20. }while(0)
    21. //任务队列(生产者) 链表
    22. struct nTask{
    23. void (*task_func)(struct nTask* task); //任务函数回调
    24. void* user_data; //任务用户数据
    25. struct nTask* prev;
    26. struct nTask* next;
    27. };
    28. //工作队列(消费者) 链表
    29. struct nWorker{
    30. pthread_t threadid; //工作线程id
    31. struct nManager* manager;
    32. struct nWorker* prev;
    33. struct nWorker* next;
    34. };
    35. //管理互斥锁和条件变量
    36. typedef struct nManager {
    37. struct nTask* tasks; //任务队列
    38. struct nWorker* workers;//工作队列
    39. pthread_mutex_t mutex; //互斥锁
    40. pthread_cond_t cond; //条件变量
    41. } ThreadPool;
    42. //消费者工作线程
    43. static void* nThreadPoolCallback(void* arg)
    44. {
    45. struct nWorker* worker = (struct nWorker*)arg;
    46. while(1){
    47. pthread_mutex_lock(&worker->manager->mutex);
    48. while(worker->manager->tasks == NULL){ //任务队列为空
    49. //解锁,阻塞等待唤醒,唤醒后加锁继续工作
    50. pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex);
    51. }
    52. struct nTask* task = worker->manager->tasks; //从头结点开始消费
    53. LIST_REMOVE(task, worker->manager->tasks); //任务队列移除头结点
    54. pthread_mutex_unlock(&worker->manager->mutex);
    55. task->task_func(task); //任务函数回调
    56. }
    57. free(worker);
    58. }
    59. //线程池工作线程创建
    60. int nThreadPoolCreate(ThreadPool* pool, int numWorkers)
    61. {
    62. if (pool == NULL) return -1;
    63. if (numWorkers < 1) numWorkers = 1;
    64. memset(pool, 0, sizeof(ThreadPool));
    65. pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; //静态申请 条件变量
    66. memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
    67. pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; //静态申请 互斥锁
    68. memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
    69. int i = 0;
    70. for(i = 0; i < numWorkers; i++){
    71. struct nWorker* worker = (struct nWorker*)malloc(sizeof(struct nWorker));
    72. if (worker == NULL) {
    73. perror("malloc");
    74. return -1;
    75. }
    76. memset(worker, 0, sizeof(struct nWorker));
    77. worker->manager = pool;
    78. int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker);
    79. if (ret) { //失败
    80. perror("pthread_create fail");
    81. free(worker);
    82. return -2;
    83. }
    84. LIST_INSERT(worker, pool->workers); //工作队列结点插入
    85. }
    86. return 0;
    87. }
    88. //唤醒全部工作队列中的线程(销毁线程池)
    89. int nThreadPoolDestory(ThreadPool* pool, int nWorker)
    90. {
    91. struct nWorker* worker = NULL;
    92. pthread_mutex_lock(&pool->mutex);
    93. //唤醒全部阻塞在条件变量上的工作线程
    94. pthread_cond_broadcast(&pool->cond);
    95. pthread_mutex_unlock(&pool->mutex);
    96. pool->workers = NULL;
    97. pool->tasks = NULL;
    98. return 0;
    99. }
    100. //唤醒单个工作队列中的线程
    101. int nThreadPoolPushTask(ThreadPool* pool, struct nTask* task)
    102. {
    103. pthread_mutex_lock(&pool->mutex);
    104. LIST_INSERT(task, pool->tasks);
    105. //唤醒至少一个阻塞在条件变量上的线程
    106. pthread_cond_signal(&pool->cond);
    107. pthread_mutex_unlock(&pool->mutex);
    108. }
    109. //任务执行回调函数
    110. void task_etry(struct nTask* task)
    111. {
    112. int idx = *(int*)task->user_data;
    113. printf("idx: %d\n", idx);
    114. free(task->user_data);
    115. free(task);
    116. }
    117. int main(void)
    118. {
    119. ThreadPool pool = {0};
    120. //线程池工作线程创建
    121. nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
    122. for(int i = 0; i < TASK_INIT_SIZE; i++){
    123. struct nTask* task = (struct nTask*)malloc(sizeof(struct nTask));
    124. if (task == NULL) {
    125. perror("malloc");
    126. exit(1);
    127. }
    128. memset(task, 0, sizeof(struct nTask));
    129. task->task_func = task_etry; //注册回调
    130. task->user_data = malloc(sizeof(int));
    131. *(int*)task->user_data = i;
    132. //唤醒单个工作队列中的线程
    133. nThreadPoolPushTask(&pool, task);
    134. }
    135. getchar();
    136. }

    推荐一个零声学院免费教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,点击立即学习: 

  • 相关阅读:
    Selenium自动化测试 —— 通过cookie绕过验证码的操作!
    虹科示波器 | 汽车免拆检修 | 2021款广汽丰田威兰达PHEV车发动机故障灯异常点亮
    【华为OD机试真题 python】最大花费金额 【2022 Q4 | 100分】
    基于教与学优化的BP神经网络(分类应用) - 附代码
    华为R&S设备状态及接口配置命令
    java--拼图游戏
    String转JsonObject 再转list<Map<String,Object>>
    MySQL学习笔记(十二)锁
    vue2不同版本下如何分环境打包
    NodeJS 基于 Dapr 构建云原生微服务应用,从 0 到 1 快速上手指南
  • 原文地址:https://blog.csdn.net/kakaka666/article/details/126028439