• Linux网络编程8——线程池模型


    学习视频链接

    05-线程池模型原理分析_bilibili_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1iJ411S7UA?p=91&vd_source=0471cde1c644648fafd07b54e303c905

    目录

    一、简介

    1.1 以前程序存在的问题

    1.2 解决方法

    1.3 处理线程扩容或者减少

    二、代码

    2.1 分析

    2.2 代码


    一、简介

    1.1 以前程序存在的问题

    多线程服务器,每次客户端发过来信息服务端就会创建一个线程,处理完了再销毁线程开销有点大

    1.2 解决方法

    提前创建好线程,线程阻塞等待某个客户端数据来了后,分配一个线程处理数据,处理完了以后线程不删除,继续阻塞

    1.3 处理线程扩容或者减少

    半夜人少的时候减少线程的使用

    白天人多的时候扩容线程,线程数量不够用的时候也需要扩容

    二、代码

    2.1 分析

    结构体

    模块分析

    1、main();

    创建线程池

    向线程池中添加任务。借助回调处理任务

    销毁线程池

    2、pthreadpool_create();

    创建线程池结构体指针

    初始化线程池结构体 { N 个成员变量 }

    创建 N 个任务线程

    创建 1 个管理者线程

    失败时,销毁开辟的所有空间 (释放)

    3、threadpool_thread()

    进入子线程回调函数

    接收参数 void *arg -> pool 结构体

    加锁 -> lock -> 整个结构体锁

    判断条件变量 -> wait

    4、adjust_thread();

    循环 10s 执行一次

    进入管理者线程回调函数

    接收参数 void *arg -> pool 结构体

    加锁 -> lock -> 整个结构体锁

    获取管理线程池要用的到变量  task_num,live_num,busy_num

    根据既定算法,使用上述 3 变量,判断是否应该创建、销毁线程池中指定步长的线程

    5、threadpool_add();

    总功能:

            模拟产生任务。  num[20]

            设置回调函数,处理任务。   sleep(1) 代表处理完成

    内部实现:

            初始化 任务队列结构体成员。   回调函数 function,arg

            利用环形队列机制,实现添加任务。   借助队尾指针挪移 % 实现

            唤醒阻塞在 条件变量上的线程

            解锁

    6、从 3、中 wait 之后继续执行,处理任务

    加锁

    获取 任务处理回调函数及参数

    利用环形队列,实现处理任务。借助队头指针挪移 % 实现

    唤醒阻塞在 条件变量 上的 server

    解锁

    加锁

    改忙线程数++

    解锁

    执行处理任务的线程

    加锁

    改忙线程数--

    解锁

    7、创建、销毁线程

    管理者线程根据 task_num、live_num、busy_num

    根据既定算法,使用上述 3 变量,判断是否应该 创建、销毁线程池中 指定步长的线程

    如果满足 创建条件

            pthread_create();   回调 任务线程函数 live_num++

    如果满足 销毁条件

            wait_exit_thr_num = 10;

            signal 给 阻塞在条件变量上的线程 发送 假条件满足信号

            跳转至   wait 阻塞线程会被 假信号 唤醒。   wait_exit_thr_num > 0    pthread_exit();

    2.2 代码

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. //#include "threadpool.h"
    10. #define DEFAULT_TIME 10 // 10s检测一次
    11. #define MIN_WAIT_TASK_NUM 10 // 如果 queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池
    12. #define DEFAULT_THREAD_VARY 10 // 每次创建和销毁线程的个数
    13. #define true 1
    14. #define false 0
    15. typedef struct {
    16. void *(*function)(void *); // 指针函数,回调函数
    17. void *arg; // 上面函数的参数
    18. } threadpool_task_t; // 各子线程任务结构体
    19. // 描述线程池相关信息
    20. typedef struct threadpool_t {
    21. pthread_mutex_t lock; // 用于锁住本结构体
    22. pthread_mutex_t thread_counter; // 记录忙状态线程个数的锁 busy_thr_num
    23. pthread_cond_t queue_not_full; // 当任务队列满时,添加任务的线程阻塞,等待此条件变量
    24. pthread_cond_t queue_not_empty; // 当任务队列里不为空时,通知等待任务的线程
    25. pthread_t *threads; // 存放线程池中每个线程的 tid 数组
    26. pthread_t adjust_tid; // 管理线程 tid
    27. threadpool_task_t *task_queue; // 任务队列(数组首地址)
    28. int min_thr_num; // 线程池最小线程数
    29. int max_thr_num; // 线程池最大线程数
    30. int live_thr_num; // 当前存活线程个数
    31. int busy_thr_num; // 忙状态线程个数
    32. int wait_exit_thr_num; // 要销毁的线程个数
    33. int queue_front; // task_queue 队头下标
    34. int queue_rear; // task_queue 队尾下标
    35. int queue_size; // task_queue 队中实际任务数
    36. int queue_max_size; // task_queue 队列可容纳任务数上限
    37. int shutdown; // 标志位,线程池使用状态,true 或 false
    38. } threadpool_t;
    39. void *threadpool_thread(void *threadpool);
    40. void *adjust_thread(void *threadpool);
    41. int is_thread_alive(pthread_t tid);
    42. int threadpool_free(threadpool_t *pool);
    43. // threadpool_ create(3, 100, 100);
    44. threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
    45. {
    46. int i;
    47. threadpool_t *pool = NULL; // 线程池结构体
    48. do {
    49. if((pool = (threadpool_t*)malloc(sizeof(threadpool_t))) == NULL) {
    50. printf("malloc threadpool fail");
    51. break; // 跳出 do while
    52. }
    53. pool->min_thr_num = min_thr_num;
    54. pool->max_thr_num = max_thr_num;
    55. pool->busy_thr_num = 0;
    56. pool->live_thr_num = min_thr_num; // 活着的线程数初值=最小线程数
    57. pool->wait_exit_thr_num = 0;
    58. pool->queue_size = 0; // 有0个产品
    59. pool->queue_max_size = queue_max_size; // 最大任务队列数
    60. pool->queue_front = 0;
    61. pool->queue_rear = 0;
    62. pool->shutdown = false; // 不关闭线程池
    63. // 根据最大线程上限数,给工作线程数组开辟空间,并清零
    64. pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * max_thr_num);
    65. if (pool->threads == NULL) {
    66. printf("malloc threads fail");
    67. break;
    68. }
    69. memset(pool->threads, 0, sizeof(pthread_t) * max_thr_num); // 数组清零
    70. // 给任务队列开辟空间
    71. pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_max_size);
    72. if (pool->task_queue == NULL) {
    73. printf("malloc task_queue fail");
    74. break;
    75. }
    76. // 初始化互斥琐、条件变量
    77. if (pthread_mutex_init(&(pool->lock), NULL) != 0
    78. || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
    79. || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
    80. || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
    81. {
    82. printf("init the lock or cond fail");
    83. break;
    84. }
    85. // 启动 min_thr_num 个 work thread
    86. for (i = 0; i < min_thr_num; i++) {
    87. pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); // pool 指向当前线程池
    88. printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);
    89. }
    90. pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool); // 创建管理者线程
    91. return pool;
    92. } while(0);
    93. threadpool_free(pool); // 前面代码调用失败时,释放poll存储空间
    94. return NULL;
    95. }
    96. // 向线程池中添加一个任务
    97. // threadpool_add(thp, process, (void*)&num[i]); // 向线程池中添加任务 process:小写->大写
    98. int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
    99. {
    100. pthread_mutex_lock(&(pool->lock));
    101. // ==为真,队列已经满,调wait阻塞
    102. while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
    103. pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
    104. }
    105. if (pool->shutdown) {
    106. pthread_cond_broadcast(&(pool->queue_not_empty));
    107. pthread_mutex_unlock(&(pool->lock));
    108. return 0;
    109. }
    110. // 清空工作线程调用的回调函数的参数 arg
    111. if (pool->task_queue[pool->queue_rear].arg != NULL) {
    112. pool->task_queue[pool->queue_rear].arg = NULL;
    113. }
    114. // 添加任务到任务队列里
    115. pool->task_queue[pool->queue_rear].function = function;
    116. pool->task_queue[pool->queue_rear].arg = arg;
    117. pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; // 队尾指针移动,模拟环形
    118. pool->queue_size++;
    119. // 添加完任务后,队列不为空,唤醒线程池中等待处理任务的线程
    120. pthread_cond_signal(&(pool->queue_not_empty));
    121. pthread_mutex_unlock(&(pool->lock));
    122. return 0;
    123. }
    124. // 线程池中各个工作线程
    125. void *threadpool_thread(void *threadpool)
    126. {
    127. threadpool_t *pool = (threadpool_t *)threadpool;
    128. threadpool_task_t task;
    129. while (true) {
    130. // Lock must be taken to wait on conditional variable
    131. // 刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务
    132. pthread_mutex_lock(&(pool->lock));
    133. // queue_ size == 0说明没有任务,调wait阻塞在条件变量上,若有任务,跳过该while
    134. while ((pool->queue_size == 0) && (!pool->shutdown)) {
    135. printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
    136. pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
    137. // 清除指定数目的空闲线程,如果要结束的线程个数大于 0 ,结束线程
    138. if (pool->wait_exit_thr_num > 0) {
    139. pool->wait_exit_thr_num--;
    140. // 如果线程池里线程个数大于最小值时可以结束当前线程
    141. if (pool->live_thr_num > pool->min_thr_num) {
    142. printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
    143. pool->live_thr_num--;
    144. pthread_mutex_unlock(&(pool->lock));
    145. pthread_exit(NULL);
    146. }
    147. }
    148. }
    149. // 如果指定了 true,要关闭线程池里的每个线程,自行退出处理---销毁线程池
    150. if (pool->shutdown) {
    151. pthread_mutex_unlock(&(pool->lock));
    152. printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
    153. pthread_detach(pthread_self());
    154. pthread_exit(NULL); // 线程自行结束
    155. }
    156. // 从任务队列里获取任务,是一个出队操作
    157. task.function = pool->task_queue[pool->queue_front].function;
    158. task.arg = pool->task_queue[pool->queue_front].arg;
    159. pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; // 出队,模拟环形队列
    160. pool->queue_size--;
    161. // 通知可以有新的任务添加进来
    162. pthread_cond_broadcast(&(pool->queue_not_full));
    163. // 任务取出后,立即将线程池琐释放
    164. pthread_mutex_unlock(&(pool->lock));
    165. // 执行任务
    166. printf("thread 0x%x start working\n", (unsigned int)pthread_self());
    167. pthread_mutex_lock(&(pool->thread_counter)); // 忙状态线程数变量琐
    168. pool->busy_thr_num++; // 忙状态线程数+1
    169. pthread_mutex_unloqk(&(pool->thread_counter));
    170. (*(task.function))(task.arg); // 执行回调函数任务
    171. // task.function(task.arg); // 执行回调函数任务
    172. // 任务结束处理
    173. printf("thread 0x%x end working\n", (unsigned int)pthread_self());
    174. pthread_mutex_lock(&(pool->thread_counter));
    175. pool->busy_thr_num--; // 处理掉一个任务,忙状态数线程数 -1
    176. pthread_mutex_unlock(&(pool->thread_counter));
    177. }
    178. pthread_exit(NULL);
    179. }
    180. // 管理线程
    181. void *adjust_thread(void *threadpool)
    182. {
    183. int i;
    184. threadpool_t *pool = (threadpool_t *)threadpool;
    185. while (!pool->shutdown) {
    186. sleep(DEFAULT_TIME); // 定时对线程池管理
    187. pthread_mutex_lock(&(pool->lock));
    188. int queue_size = pool->queue_size; // 关注任务数
    189. int live_thr_num = pool->live_thr_num; // 存活线程数
    190. pthread_mutex_unlock(&(pool->lock));
    191. pthread_mutex_lock(&(pool->thread_counter));
    192. int busy_thr_num = pool->busy_thr_num; // 忙着的线程数
    193. pthread_mutex_unlock(&(pool->thread_counter));
    194. // 创建新线程算法: 任务数大于最小线程池个数,且存活的线程数少于最大线程个数时 如: 30>=10 && 40<100
    195. if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
    196. pthread_mutex_lock(&(pool->lock));
    197. int add = 0;
    198. // 一次增加 DEFAULT_THREAD 个线程
    199. for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
    200. && pool->live_thr_num < pool->max_thr_num; i++) {
    201. if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
    202. pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
    203. }
    204. }
    205. pthread_mutex_unlock(&(pool->lock));
    206. }
    207. // 销毁多余的空闲线程 算法: 忙线程 X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时
    208. if((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {
    209. // 一次销毁 DEFAULT_THREAD 个线程,随即 10 个即可
    210. pthread_mutex_lock(&(pool->lock));
    211. pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; // 要销毁的线程数设置为 10
    212. pthread_mutex_unlock(&(pool->lock));
    213. for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
    214. // 通知处在空闲状态的线程,他们会自行停止
    215. pthread_cond_signal(&(pool->queue_not_empty));
    216. }
    217. }
    218. }
    219. return NULL;
    220. }
    221. int threadpool_destroy(threadpool_t *pool)
    222. {
    223. int i;
    224. if (pool == NULL) {
    225. return -1;
    226. }
    227. pool->shutdown = true;
    228. // 先销毁管理线程
    229. pthread_join(pool->adjust_tid, NULL);
    230. for (i = 0; i < pool->live_thr_num; i++) {
    231. // 通知所有的空闲线程
    232. pthread_cond_broadcast(&(pool->queue_not_empty));
    233. }
    234. for (i = 0; i < pool->live_thr_num; i++) {
    235. pthread_join(pool->threads[i], NULL);
    236. }
    237. threadpool_free(pool);
    238. return 0;
    239. }
    240. int threadpool_free(threadpool_t *pool)
    241. {
    242. if(pool == NULL) {
    243. return -1;
    244. }
    245. if(pool->task_queue) {
    246. free(pool->task_queue);
    247. }
    248. if (pool->threads) {
    249. free(pool->threads);
    250. pthread_mutex_lock(&(pool->lock));
    251. pthread_mutex_destroy(&(pool->lock));
    252. pthread_mutex_lock(&(pool->thread_counter));
    253. pthread_mutex_destroy(&(pool->thread_counter));
    254. pthread_cond_destroy(&(pool->queue_not_empty));
    255. pthread_cond_destroy(&(pool->queue_not_full));
    256. }
    257. free(pool);
    258. pool = NULL;
    259. return 0;
    260. }
    261. int threadpool_all_threadnum(threadpool_t *pool)
    262. {
    263. int all_threadnum = -1; // 总线程数
    264. pthread_mutex_lock(&(pool->lock));
    265. all_threadnum = pool->live_thr_num; // 存活线程数
    266. pthread_mutex_unlock(&(pool->lock));
    267. return all_threadnum;
    268. }
    269. /*int threadpool_busy_threadnum(threadpool_t *pool)
    270. {
    271. int busy_threadnum = -1; // 忙线程数
    272. pthread_mutex_lock(&(pool->thread_counter));
    273. busy_threadnum = pool->busy_thr_num;
    274. pthread_mutex_unlock(&(pool->thread_counter));
    275. }*/
    276. // 线程池中的线程,模拟处理业务
    277. void *process(void *arg)
    278. {
    279. printf("thread 0x%x working on task %d\n", (unsigned int)pthread_self(), (int)arg);
    280. sleep(1);
    281. printf("task %d is end\n", (int)arg);
    282. return NULL;
    283. }
    284. int main(void)
    285. {
    286. // threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
    287. threadpool_t *thp = threadpool_create(3, 100, 100); // 创建线程池,池里最小3个线程,最大100,队列最大100
    288. printf("pool inited");
    289. // int *num = (int*)malloc(sizeof(int) * 20);
    290. int num[20], i;
    291. for(i = 0; i < 20; i++) {
    292. num[i] = i;
    293. printf("add task %d\n", i);
    294. // int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
    295. threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务*/
    296. }
    297. sleep(10); // 等子线程完成任务
    298. threadpool_destroy(thp);
    299. return 0;
    300. }
  • 相关阅读:
    linux内核中ecryptfs模块分析
    微服务原生案例搭建
    Linux 系统编程,Binder 学习,文件访问相关的接口
    JVM主要组成部分及其作用
    【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作
    IDEA使用技巧
    swift 问答app
    linux进阶56——systemd实现程序日志保存成文件
    第3章 AOP通知
    期货开户公司底蕴深厚实力强大
  • 原文地址:https://blog.csdn.net/HuanBianCheng27/article/details/127721950