• 基于C语言 -- 线程池实现


    项目地址

    GitHub - CHENLitterWhite/CPPWheel: CPP自封装的库

    专栏历史好文

    CMakeList.txt -- 编写保姆式教程_干饭小白的博客-CSDN博客

    C++实战-基于c++11新特性的mysql连接池_c++ mysql连接池_干饭小白的博客-CSDN博客

    C++项目实战--线程池代码讲解_std::shared_ptr_干饭小白的博客-CSDN博客

    C++项目实战-实际应用_干饭小白的博客-CSDN博客


    文章目录

    目录

    前言

    使用步骤


    前言

    线程池 -- 纯C版


    使用步骤

    1. //------------------------------------------//
    2. // 作者: 干饭小白
    3. // 时间: 2023-09-04
    4. //------------------------------------------//
    5. #pragma once
    6. #ifdef __cplusplus
    7. extern "C"
    8. {
    9. #endif __cplusplus
    10. #define DEFAULT_TIME 5
    11. #define MIN_WAIT_TASK_NUM 5
    12. #define DEFAULT_THREAD_NUM 2
    13. #include
    14. #include
    15. #include
    16. #include
    17. #include
    18. #include
    19. // 任务函数结构
    20. typedef struct {
    21. void *(*function)(void*);
    22. void *arg;
    23. }Task;
    24. // 线程池管理者
    25. struct Threadpool {
    26. /* 开关 */
    27. bool shutdown;
    28. /* 任务队列 */
    29. int queue_front; // 队头
    30. int queue_rear; // 队尾
    31. int queue_size; // 队列大小
    32. int queue_max_size; // 最大的任务数
    33. Task *task_queue; // 任务队列
    34. /* 工作线程状态 */
    35. int min_thr_num; // 最小的线程数
    36. int max_thr_num; // 最大线线程数
    37. int live_thr_num; // 当前的活跃线程数
    38. int busy_thr_num; // 当前忙碌的线程数
    39. int wait_exit_thr_num; // 当前正在等待退出的线程数
    40. pthread_t *threads; // 线程集合
    41. /* 线程集合的管理 */
    42. pthread_t admin_tid; // 线程管理者 -- 保持线程池中线程的相对平衡
    43. pthread_mutex_t lock; // 互斥锁
    44. pthread_mutex_t thread_counter; // 保证线程间竞争关系互斥 -- 正在忙碌的线程变量控制
    45. pthread_cond_t queue_not_full; // 唤醒任务可入队线程 -- 队列不为满
    46. pthread_cond_t queue_not_empty; // 唤醒工作线程取任务 -- 队列不为空
    47. };
    48. /*-----------------------------------------------
    49. 函数名:
    50. threadpool_creat
    51. 函数说明:
    52. 创建并初始化一个线程池
    53. 输入参数:
    54. min_thr_num(int) 最小的线程数量
    55. max_thr_num(int) 最大的线程数量
    56. queue_max_size(int) 任务队列大小
    57. 输出参数:
    58. 返回值:
    59. 返回线程池句柄,失败返回 NULL
    60. -----------------------------------------------*/
    61. Threadpool * threadpool_creat(int min_thr_num, int max_thr_num, int queue_max_size);
    62. /*-----------------------------------------------
    63. 函数名:
    64. threadpool_thread
    65. 函数说明:
    66. 任务处理线程函数
    67. 输入参数:
    68. threadpool(Threadpool) 线程池句柄
    69. 输出参数:
    70. 返回值:
    71. -----------------------------------------------*/
    72. void * threadpool_thread(void *threadpool);
    73. /*-----------------------------------------------
    74. 函数名:
    75. threadpool_thread
    76. 函数说明:
    77. 任务处理线程函数
    78. 输入参数:
    79. threadpool(Threadpool*) 线程池句柄
    80. 输出参数:
    81. 返回值:
    82. -----------------------------------------------*/
    83. void * admin_thread(void *threadpool);
    84. /*-----------------------------------------------
    85. 函数名:
    86. threadpool_free
    87. 函数说明:
    88. 释放资源
    89. 输入参数:
    90. pool(Threadpool*) 线程池句柄
    91. 输出参数:
    92. 返回值:
    93. 0 正常
    94. -1 pool == NULL
    95. -----------------------------------------------*/
    96. int threadpool_free(Threadpool * pool);
    97. /*-----------------------------------------------
    98. 函数名:
    99. threadpool_destory
    100. 函数说明:
    101. 销毁线程池
    102. 输入参数:
    103. pool(Threadpool*) 线程池句柄
    104. 输出参数:
    105. 返回值:
    106. 0 正常
    107. -1 pool == NULL
    108. -----------------------------------------------*/
    109. int threadpool_destory(Threadpool * pool);
    110. /*-----------------------------------------------
    111. 函数名:
    112. threadpool_add_task
    113. 函数说明:
    114. 向任务队列中添加事件
    115. 输入参数:
    116. pool(Threadpool*) 线程池句柄
    117. void *(*function)(void *arg) 任务函数
    118. arg(void *) 任务函数携带的参数
    119. 输出参数:
    120. 返回值:
    121. -----------------------------------------------*/
    122. int threadpool_add_task(Threadpool * pool, void *(*function)(void *arg), void *arg);
    123. /*-----------------------------------------------
    124. 函数名:
    125. is_thread_alive
    126. 函数说明:
    127. 判断一个线程是否存活
    128. 输入参数:
    129. tid(pthread_t) 线程pid
    130. 输出参数:
    131. 返回值:
    132. 存活 true
    133. 消亡 false
    134. -----------------------------------------------*/
    135. bool is_thread_alive(pthread_t tid);
    136. #ifdef __cplusplus
    137. }
    138. #endif
    1. #include "ThreadPool.h"
    2. Threadpool * threadpool_creat(int min_thr_num, int max_thr_num, int queue_max_size)
    3. {
    4. Threadpool *pool = NULL;
    5. /* 使用do{}while(0) --> 实现goto机制,错误时即使跳出,并统一处理 */
    6. do
    7. {
    8. /** 开启线程池空间 **/
    9. pool = (Threadpool *)malloc(sizeof(Threadpool));
    10. if(NULL == pool)
    11. {
    12. break;
    13. }
    14. /** 初始化信息 **/
    15. pool->min_thr_num = min_thr_num;
    16. pool->max_thr_num = max_thr_num;
    17. pool->live_thr_num = min_thr_num;
    18. pool->busy_thr_num = 0;
    19. pool->wait_exit_thr_num = 0;
    20. pool->queue_front = 0;
    21. pool->queue_rear = 0;
    22. pool->queue_size = 0;
    23. pool->queue_max_size = queue_max_size;
    24. pool->shutdown = false;
    25. /** 分配工作线程空间 **/
    26. pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * max_thr_num);
    27. if(NULL == pool->threads)
    28. {
    29. break;
    30. }
    31. memset(pool->threads, 0, sizeof(pthread_t) * max_thr_num);
    32. /** 队列空间 **/
    33. pool->task_queue = (Task *)malloc(sizeof(Task) * queue_max_size);
    34. if(NULL == pool->task_queue)
    35. {
    36. break;
    37. }
    38. /** 初始化互斥锁和条件变量 **/
    39. if(pthread_mutex_init(&(pool->lock), NULL) != 0 ||
    40. pthread_mutex_init(&(pool->thread_counter), NULL) != 0 ||
    41. pthread_cond_init(&(pool->queue_not_empty), NULL) ||
    42. pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
    43. {
    44. break;
    45. }
    46. /** 启动 min_thr_num 个工作线程 **/
    47. for(int i = 0; i < min_thr_num; ++i)
    48. {
    49. pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
    50. }
    51. /** 启动管理者线程 **/
    52. pthread_create(&(pool->admin_tid), NULL, admin_thread, (void*)pool);
    53. return pool;
    54. }while(0);
    55. /* 释放pool的空间 */
    56. threadpool_free(pool);
    57. return NULL;
    58. }
    59. int threadpool_destory(Threadpool * pool)
    60. {
    61. /* 容错性判断 */
    62. if(NULL == pool)
    63. {
    64. return -1;
    65. }
    66. /* 线程标志位:true */
    67. pool->shutdown = true;
    68. /* 销毁管理线程 */
    69. pthread_join(pool->admin_tid, NULL);
    70. /* 通知存活的线程结束自己 */
    71. for(int i = 0; i < pool->live_thr_num; ++i)
    72. {
    73. /** 唤醒所有被阻塞的线程 **/
    74. pthread_cond_broadcast(&(pool->queue_not_empty));
    75. }
    76. /* 等待线程结束,进行回收 */
    77. for(int i = 0; i < pool->live_thr_num; ++i)
    78. {
    79. pthread_join(pool->threads[i], NULL);
    80. }
    81. /* 释放资源 */
    82. threadpool_free(pool);
    83. return 0;
    84. }
    85. int threadpool_free(Threadpool * pool)
    86. {
    87. /* 容错性判断 */
    88. if(NULL == pool)
    89. {
    90. return -1;
    91. }
    92. /* 销毁任务队列 */
    93. if(pool->task_queue)
    94. {
    95. free(pool->task_queue);
    96. pool->task_queue = NULL;
    97. }
    98. /* 销毁工作线程集合 */
    99. if(pool->threads)
    100. {
    101. free(pool->threads);
    102. pool->threads = NULL;
    103. }
    104. /* 销毁池管理空间 */
    105. free(pool);
    106. pool = NULL;
    107. return 0;
    108. }
    109. void * admin_thread(void *threadpool)
    110. {
    111. /* 容错处理 */
    112. if(NULL == threadpool)
    113. {
    114. return NULL;
    115. }
    116. /* 维护线程池的平衡 */
    117. Threadpool *pool = (Threadpool *)threadpool;
    118. while(!pool->shutdown)
    119. {
    120. /** 每隔DEFAULT_TIME进行一次 **/
    121. sleep(DEFAULT_TIME);
    122. /** 获取当前状态下活跃数和任务数 -- 多线程访问同一个变量(互斥锁) **/
    123. pthread_mutex_lock(&(pool->lock));
    124. int queue_size = pool->queue_size;
    125. int live_thr_num = pool->live_thr_num;
    126. pthread_mutex_unlock(&(pool->lock));
    127. /** 获取忙碌数 **/
    128. pthread_mutex_lock(&(pool->thread_counter));
    129. int busy_thr_num = pool->busy_thr_num;
    130. pthread_mutex_unlock(&(pool->thread_counter));
    131. /** 创建新线程:正在等待的任务 >= 最小允许等待的任务数 && 存活线程数 < 最大线程数 **/
    132. if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num)
    133. {
    134. pthread_mutex_lock(&(pool->lock));
    135. int add = 0;
    136. for(int i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_NUM && pool->live_thr_num < pool->max_thr_num; ++i)
    137. {
    138. if(pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
    139. {
    140. pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool);
    141. add++;
    142. pool->live_thr_num++;
    143. }
    144. }
    145. pthread_mutex_unlock(&(pool->lock));
    146. }
    147. /** 销毁空闲的线程: 忙碌数*2 < 存活数(一半以上的空闲) && 存活数 > 最小线程数 **/
    148. if((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num)
    149. {
    150. pthread_mutex_lock(&(pool->lock));
    151. pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
    152. pthread_mutex_unlock(&(pool->lock));
    153. }
    154. for(int i = 0; i < DEFAULT_THREAD_NUM; ++i)
    155. {
    156. /*** 通知处于空闲的线程自杀 ***/
    157. pthread_cond_signal(&(pool->queue_not_empty));
    158. }
    159. }
    160. return NULL;
    161. }
    162. bool is_thread_alive(pthread_t tid)
    163. {
    164. int kill_rc = pthread_kill(tid, 0);
    165. if(ESRCH == kill_rc)
    166. {
    167. return false;
    168. }
    169. return true;
    170. }
    171. void * threadpool_thread(void *threadpool)
    172. {
    173. /* 容错处理 */
    174. if(NULL == threadpool)
    175. {
    176. return NULL;
    177. }
    178. Threadpool *pool = (Threadpool *)threadpool;
    179. Task task;
    180. while(true)
    181. {
    182. pthread_mutex_lock(&(pool->lock));
    183. /* 无任务:阻塞等待 */
    184. while((pool->queue_size == 0 && !pool->shutdown))
    185. {
    186. /** 阻塞等待 **/
    187. pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
    188. /** 是否是自杀任务 **/
    189. if(pool->wait_exit_thr_num > 0)
    190. {
    191. pool->wait_exit_thr_num--;
    192. /** 判断线程池中的线程数量是否大于最小线程数:是则结束当前线程 **/
    193. if(pool->live_thr_num > pool->min_thr_num)
    194. {
    195. pool->live_thr_num--;
    196. pthread_mutex_unlock(&(pool->lock));
    197. pthread_exit(NULL);
    198. }
    199. }
    200. }
    201. /* 是否销毁池 */
    202. if(pool->shutdown)
    203. {
    204. pthread_mutex_unlock(&(pool->lock));
    205. pthread_exit(NULL);
    206. }
    207. /* 有任务:执行任务 */
    208. task.function = pool->task_queue[pool->queue_front].function;
    209. task.arg = pool->task_queue[pool->queue_front].arg;
    210. /* 维护循环队列平衡 */
    211. pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;
    212. pool->queue_size--;
    213. /* 通知队列不未满 */
    214. pthread_cond_broadcast(&(pool->queue_not_full));
    215. pthread_mutex_unlock(&(pool->lock));
    216. /* 执行开始:忙碌线程+1 */
    217. pthread_mutex_lock(&(pool->thread_counter));
    218. pool->busy_thr_num++;
    219. pthread_mutex_unlock(&(pool->thread_counter));
    220. /* 执行任务 */
    221. (*(task.function))(task.arg);
    222. /* 执行结束:忙碌线程-1 */
    223. pthread_mutex_lock(&(pool->thread_counter));
    224. pool->busy_thr_num--;
    225. pthread_mutex_unlock(&(pool->thread_counter));
    226. }
    227. pthread_exit(NULL);
    228. }
    229. int threadpool_add_task(Threadpool * pool, void *(*function)(void *arg), void *arg)
    230. {
    231. if(NULL == pool)
    232. {
    233. return -1;
    234. }
    235. pthread_mutex_lock(&(pool->lock));
    236. /* 任务队列满时需要等到 */
    237. while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
    238. {
    239. pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
    240. }
    241. /* 线程池处于关闭状态 */
    242. if(pool->shutdown)
    243. {
    244. pthread_mutex_unlock(&(pool->lock));
    245. }
    246. /* 清空残留 */
    247. if(pool->task_queue[pool->queue_rear].arg != NULL)
    248. {
    249. free(pool->task_queue[pool->queue_rear].arg);
    250. pool->task_queue[pool->queue_rear].arg = NULL;
    251. }
    252. /* 添加任务 */
    253. pthread_cond_signal(&(pool->queue_not_empty));
    254. pthread_mutex_unlock(&(pool->lock));
    255. return 0;
    256. }

  • 相关阅读:
    Programming Languages PartC Week1学习笔记——Ruby与面向对象编程
    包埋紫杉醇的Pluronic P85/聚乳酸(PLA-P85-PLA)纳米粒子|制备方法
    LabVIEW专栏五、网口
    hexo 使用hexo g -d报错
    shiro详解-shiro史上最全学习笔记
    数据结构——排序算法——桶排序
    STS中打开Ibatis的xml文件提示错误
    VE对环境和社会的贡献
    QT单元测试初探
    golang select 机制
  • 原文地址:https://blog.csdn.net/weixin_46120107/article/details/132948426