• 「Linux」400行纯C语言代码带你「手撕线程池」


    线程池的基本概念

    不管线程池是什么东西!但是我们必须知道线程池被搞出来的目的就是:提高程序执行效率而设计出来的;

    了解了线程池的目的后:我们就可以开始理解线程池:

    首先回答一个问题:为什么会有线程池?

    呃呃,我这么问就很奇怪,因为线程池是什么我都没说,怎么会知道为什么会有线程池呢?所以我打算带大家去思考一个场景:

    当我们的程序中:有一批任务到来时候(通常该任务都是从网络来的),我们就会创建一堆线程去处理这一批任务;

    虽然说创建线程的成本开销并不大,但是这里有个问题:当我们任务来到时候,你才去创建线程去处理这个任务,你不觉得这样很慢吗?

    是否我们可以换个思路:假如我们有一种手段:使得任务一到来,就可以马上有线程去处理这批任务,这样是不是相对于前面等线程来到,再创建线程去处理时候快得多;

    所以说:线程池就是基于上面的思路设计的;线程池就是:预先创建好一大批线程,同时线程池维护一个队列,来存放到来的任务,当队列中一旦有任务时候,预先创建好的一大批线程就可以并发处理这一批任务了;


    我们抽象出一个模型:

    任务派发者是谁? 是生产者;

    任务存储的队列是什么?是一个容器,数组,链表,只要是可以存放产品(数据)的东西即可;

    拿任务去处理的是谁?是消费者;

    所以说:线程池本质就是一个生产者消费者的模型;

    而我们线程池只需要关注两个点:一个存放任务的队列,和消费队列任务的消费者即可;而生产者暂时不用关注,因为生产者是你外部搞出任务丢给线程池去使用;那么什么时候可以关心生产者呢?

    也就是当我们去使用线程池的时候咯;这不就是妥妥的生产者消费者模型嘛!

    线程池实现的基本思路:

    在各个编程语言的语种中都有线程池的概念,并且很多语言中直接提供了线程池,作为程序猿直接使用就可以了,下面给大家介绍一下线程池的实现原理:

    线程池的组成主要分为 3 个部分,这三部分配合工作就可以得到一个完整的线程池:

    任务队列,存储需要处理的任务,由工作的线程来处理这些任务

    通过线程池提供的 API 函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除;

    已处理的任务会被从任务队列中删除;

    线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程;

    工作的线程(任务队列任务的消费者) ,N个

    线程池中维护了一定数量的工作线程,他们的作用是是不停的读任务队列,从里边取出任务并处理

    工作的线程相当于是任务队列的消费者角色;

    如果任务队列为空,工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞);

    如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作;

    管理者线程(不处理任务队列中的任务),1个

    它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测;

    当任务过多的时候,可以适当的创建一些新的工作线程;

    当任务过少的时候,可以适当的销毁一些工作的线程;

    相关视频推荐

    100行代码手写线程池,人人都能实现的(自备linux环境)

    手把手C++实现线程池及线程池性能优化分析

    池式组件为性能飙升提供技术保障-线程池,内存池,异步请求池,数据库连接池,无锁队列的ringbuffer

    学习地址:C/C++Linux服务器开发/后台架构师【零声教育】-学习视频教程-腾讯课堂

    需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

     

    线程池的代码

    1.任务队列的任务结构体

    对于任务队列:

    里面存放的都是函数指针,该函数指针指向的就是处理任务的函数;

    同时还要维护一个任务函数的形参;

    1. typedef struct Task
    2. {
    3. void (*function)(void *args); //任务的函数指针
    4. void *args; //任务函数的形参
    5. } Task;

    2. 线程池的定义

    线程池里面最重要的是:

    一个任务队列;
    多个消费者线程IDs;
    一个管理者线程ID;
    管理线程池的锁;
    管理任务队列是否为满和空的条件变量;

    还有一些其他的辅助成员变量;

    1. struct ThreadPool
    2. {
    3. Task *taskQ; //任务队列
    4. /*对于一个任务队列:我们需要知道以下信息*/
    5. int queueCapacity; //队列的容量
    6. int queueSize; //当前任务的个数
    7. int queueFront; //队头取任务
    8. int queueRear; //队尾放任务
    9. /*有了任务队列后,还要有管理任务队列的线程和从任务队列拿任务的线程*/
    10. pthread_t managerID; //管理者线程
    11. /*设置为指针的目的:工作线程有多个*/
    12. pthread_t *threadIDs; //工作线程(也就是消费者)
    13. /*对于工作线程我们要知道以下这几个消息方便管理*/
    14. int minNum; //最少的工作线程数
    15. int maxNum; //最多的工作线程数
    16. int busyNum; //正在工作的线程数,也就是正在获取任务处理的线程
    17. int liveNum; //存货的工作线程数(也就是被唤醒的线程,却没有资格去获取任务的线程)
    18. int exitNum; //销毁的工作线程数(因为可能工作线程存在,但是却不工作,我们需要杀掉一些不必要的线程)
    19. /* 由于任务队列为临界资源:
    20. 工作线程(消费者)可能有多个会同时竞争该资源
    21. 同时多生产者线程之间(也就是往任务队列放任务的线程)也会竞争该资源
    22. 所以我们要保证互斥访问线程池的任务队列
    23. */
    24. pthread_mutex_t mutexpool; //锁整个线程池
    25. pthread_mutex_t mutexbusyNum; //锁增在工作线程的数量
    26. /*由于任务队列满,或者为空:
    27. 生产者和消费者都需要阻塞
    28. 所以需要条件变量,来保证
    29. */
    30. pthread_cond_t notFull; //判断线程池是否为满
    31. pthread_cond_t notEmpty; //判断线程池是否为空
    32. /*辅助成员主要判断该线程池是否还在工作*/
    33. int shutdown; //判断是否需要销毁线程池,是0不销毁,是1销毁
    34. };

    线程池的头文件声明

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. typedef struct ThreadPool ThreadPool; //线程池结构体,这里声明的原因是结构体定义在线程池源文件中
    8. //创建线程池并初始化
    9. ThreadPool* threadPoolCreate(int min,int max,int queueSize);
    10. //销毁线程池
    11. int threadPoolDestroy(ThreadPool* pool);
    12. //给线程池添加任务
    13. void threadPoolAdd(ThreadPool* pool,void(*functions)(void*),void* args);
    14. //获取线程池工作线程的个数
    15. int threadBusyNum (ThreadPool* pool);
    16. //获取线程池存活的线程的个数
    17. int threadLiveNum (ThreadPool* pool);
    18. //工作线程
    19. void* worker (void* args);
    20. //管理线程
    21. void* manager (void* args);
    22. //线程退出函数
    23. void threadExit(ThreadPool* pool);

    线程池的源文件

    1. #include"thread_pool.h"
    2. const int WORK_THREAD_NUMBER = 2; //管理者线程要添加的工作线程个数,和销毁的线程个数
    3. /*
    4. 线程池:首先要有个任务队列,在C语言中,
    5. 任务队列是需要自己定义的,C++中可以直接使用容器queue
    6. */
    7. //任务队列存放的任务就是一个函数指针
    8. typedef struct Task
    9. {
    10. void (*function)(void *args);
    11. void *args;
    12. } Task;
    13. //再搞出一个线程池
    14. struct ThreadPool
    15. {
    16. Task *taskQ; //任务队列
    17. /*对于一个任务队列:我们需要知道以下信息*/
    18. int queueCapacity; //队列的容量
    19. int queueSize; //当前任务的个数
    20. int queueFront; //队头取任务
    21. int queueRear; //队尾放任务
    22. /*有了任务队列后,还要有管理任务队列的线程和从任务队列拿任务的线程*/
    23. pthread_t managerID; //管理者线程
    24. /*设置为指针的目的:工作线程有多个*/
    25. pthread_t *threadIDs; //工作线程(也就是消费者)
    26. /*对于工作线程我们要知道以下这几个消息方便管理*/
    27. int minNum; //最少的工作线程数
    28. int maxNum; //最多的工作线程数
    29. int busyNum; //正在工作的线程数,也就是正在获取任务处理的线程
    30. int liveNum; //存货的工作线程数(也就是被唤醒的线程,却没有资格去获取任务的线程)
    31. int exitNum; //销毁的工作线程数(因为可能工作线程存在,但是却不工作,我们需要杀掉一些不必要的线程)
    32. /* 由于任务队列为临界资源:
    33. 工作线程(消费者)可能有多个会同时竞争该资源
    34. 同时多生产者线程之间(也就是往任务队列放任务的线程)也会竞争该资源
    35. 所以我们要保证互斥访问线程池的任务队列
    36. */
    37. pthread_mutex_t mutexpool; //锁整个线程池
    38. pthread_mutex_t mutexbusyNum; //锁增在工作线程的数量
    39. /*由于任务队列满,或者为空:
    40. 生产者和消费者都需要阻塞
    41. 所以需要条件变量,来保证
    42. */
    43. pthread_cond_t notFull; //判断线程池是否为满
    44. pthread_cond_t notEmpty; //判断线程池是否为空
    45. /*辅助成员主要判断该线程池是否还在工作*/
    46. int shutdown; //判断是否需要销毁线程池,是0不销毁,是1销毁
    47. };
    48. //************************************************************************************************
    49. /*由于我们的线程池被创建出来时候,就必须保证存在的,
    50. 所以我们返回值要设计为指针类型,不能是赋值拷贝的形式
    51. 并且如何考虑线程池需要传入什么参数初始化呢?
    52. */
    53. ThreadPool *threadPoolCreate(int min, int max, int queueSize)
    54. {
    55. //先搞出一个线程池
    56. ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));
    57. do // do while(0)的设计是为了,假设开辟线程池,消费者线程IDs,任务队列空间失败,可以直接跳出循环统一处理释放空间
    58. {
    59. if (pool == NULL)
    60. {
    61. printf("malloc threadPool is failed\n");
    62. break;
    63. }
    64. //搞出线程池后开始初始化里面的数据成员
    65. //首先先搞出消费者线程出来
    66. pool->threadIDs = (pthread_t *)malloc(sizeof(pthread_t) * max);
    67. if (pool->threadIDs == NULL)
    68. {
    69. printf("malloc threadIDs is failed\n");
    70. /*如果没有do while(0)的设计,这里直接返回,那么前面的pool内存池的空间没有被释放,这就会内存泄漏了*/
    71. // return NULL;
    72. //基于上面的注释考虑,这里设计break;退出dowhile(0)然后处理
    73. break;
    74. }
    75. //初始化消费者线程ID
    76. /*这么做的目的是:在管理者线程中可以通过判断线程ID是否为0,来说明该消费者线程是否被占用*/
    77. memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
    78. //初始化线程池的其他成员属性
    79. pool->minNum = min;
    80. pool->maxNum = max;
    81. pool->busyNum = 0;
    82. pool->liveNum = min;
    83. pool->exitNum = 0;
    84. //初始化锁和条件变量
    85. if (pthread_mutex_init(&pool->mutexpool, NULL) != 0 ||
    86. pthread_mutex_init(&pool->mutexpool, NULL) != 0 ||
    87. pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
    88. pthread_cond_init(&pool->notFull, NULL) != 0)
    89. {
    90. perror("mutex or condition failed:");
    91. }
    92. //初始化任务队列
    93. pool->taskQ = (Task *)malloc(sizeof(Task) * queueSize);
    94. if (pool->taskQ == NULL)
    95. {
    96. printf("malloc taskQ is failed\n");
    97. break;
    98. }
    99. pool->queueCapacity = queueSize;
    100. pool->queueSize = 0;
    101. pool->queueFront = 0;
    102. pool->queueRear = 0;
    103. //刚开始不关闭线程池
    104. pool->shutdown = 0;
    105. //创建管理者线程和消费者线程
    106. pthread_create(&pool->managerID, NULL, manager, (void *)pool);
    107. int i = 0;
    108. for (; i < min; ++i)
    109. {
    110. /*消费线程需要消费的是任务,
    111. 也就是taskQ,而taskQ又是pool的一个成员属性
    112. 所以传参时候,我们传入pool就可以获得taskQ了
    113. */
    114. pthread_create(&pool->threadIDs[i], NULL, worker, (void *)pool);
    115. }
    116. //创建成功初始化后,那么就可以把线程池返回去了
    117. return pool;
    118. } while (0);
    119. //如果break出来,那么就是异常的开辟空间失败,要释放资源
    120. if (pool)
    121. free(pool);
    122. if (pool && pool->threadIDs)
    123. free(pool->threadIDs);
    124. if (pool && pool->taskQ)
    125. free(pool->taskQ);
    126. return NULL;
    127. }
    128. //判断任务队列是否为空
    129. static int taskQIsEmpty(ThreadPool *pool)
    130. {
    131. return pool->queueSize == 0;
    132. }
    133. //判断线程池是否还工作
    134. static int isShutDown(ThreadPool *pool)
    135. {
    136. return pool->shutdown == 1 ? 1 : 0;
    137. }
    138. //消费者线程
    139. void *worker(void *args)
    140. {
    141. ThreadPool *pool = (ThreadPool *)args;
    142. /*设计为死循环是:消费者要不断从任务队列拿任务来处理*/
    143. while (1)
    144. {
    145. pthread_mutex_lock(&pool->mutexpool);
    146. //消费数据之前,要判断任务队列是否为空,空就需要挂起该线程
    147. while (taskQIsEmpty(pool) && !isShutDown(pool))
    148. {
    149. pthread_cond_wait(&pool->notEmpty, &pool->mutexpool);
    150. //线程被唤醒后,判断是否需要销毁该线程,因为有线程是多余的
    151. if (pool->exitNum > 0)
    152. {
    153. pool->exitNum--;
    154. if (pool->liveNum > pool->minNum)
    155. {
    156. pool->liveNum--;
    157. pthread_mutex_unlock(&pool->mutexpool); //退出线程前解锁,防止死锁问题
    158. threadExit(pool);
    159. }
    160. }
    161. }
    162. //还需要判断线程池是否关闭了,关闭了就退出消费者线程即可
    163. if (isShutDown(pool))
    164. {
    165. pthread_mutex_unlock(&pool->mutexpool);
    166. threadExit(pool);
    167. }
    168. //开始消费者拿任务
    169. Task task; //保存任务的变量
    170. task.function = pool->taskQ[pool->queueFront].function; //获取到任务队列的任务,就是一个函数指针
    171. task.args = pool->taskQ[pool->queueFront].args; //获取任务队列任务的函数指针参数
    172. //控制任务队列的指针移动
    173. pool->queueFront++;
    174. pool->queueFront %= pool->queueCapacity;
    175. pool->queueSize--;
    176. pthread_mutex_unlock(&pool->mutexpool);
    177. //唤醒生产者
    178. pthread_cond_signal(&pool->notFull);
    179. //拿到任务后就是处理任务
    180. // 1.处理任务前,先处理busyNum
    181. pthread_mutex_lock(&pool->mutexbusyNum);
    182. pool->busyNum++;
    183. pthread_mutex_unlock(&pool->mutexbusyNum);
    184. // 2. 这里处理任务就是调用任务函数
    185. task.function(task.args);
    186. //任务处理完就释放参数的空间
    187. free(task.args);
    188. task.args = NULL;
    189. printf("thread %ld ending working ... \n", pthread_self());
    190. // 3.处理完任务对其busyNum操作
    191. pthread_mutex_lock(&pool->mutexbusyNum);
    192. pool->busyNum--;
    193. pthread_mutex_unlock(&pool->mutexbusyNum);
    194. }
    195. }
    196. //管理者线程
    197. /*
    198. 主要是管理创建线程和销毁线程
    199. */
    200. void *manager(void *args)
    201. {
    202. ThreadPool *pool = (ThreadPool *)args;
    203. //只要线程池没关闭,那么管理者线程就一直工作
    204. while (!isShutDown(pool))
    205. {
    206. //自己定制的检查策略:我设置每个三秒检测
    207. sleep(3);
    208. //取出线程池任务的数量和消费者的工作线程数量
    209. pthread_mutex_lock(&pool->mutexpool);
    210. int queueSize = pool->queueSize;
    211. int liveNum = pool->liveNum;
    212. pthread_mutex_unlock(&pool->mutexpool);
    213. //获取忙的消费者线程数量
    214. pthread_mutex_lock(&pool->mutexbusyNum);
    215. int busyNum = pool->busyNum;
    216. pthread_mutex_unlock(&pool->mutexbusyNum);
    217. //开始管理线程
    218. // 1.添加消费者线程
    219. /*制定添加规则(也是自己设定的)
    220. 任务的个数 > 存活的线程个数 && 存活的线程个数 < 最大的线程个数
    221. */
    222. if (queueSize > liveNum && liveNum < pool->maxNum)
    223. {
    224. pthread_mutex_lock(&pool->mutexpool); //这个锁主要是操作了liveNum这个资源
    225. int counter = 0; // counter表示要添加的消费者线程数量
    226. //遍历 消费者线程IDs数组,看看哪个位置可以放入新添加的线程
    227. int i = 0;
    228. for (; i < pool->maxNum &&
    229. counter < WORK_THREAD_NUMBER &&
    230. pool->liveNum < pool->maxNum;
    231. i++)
    232. {
    233. //0表示消费者线程数组的位置可以放入线程ID
    234. if (pool->threadIDs[i] == 0)
    235. {
    236. pthread_create(&pool->threadIDs[i], NULL, worker, pool);
    237. counter++;
    238. liveNum++;
    239. }
    240. }
    241. pthread_mutex_unlock(&pool->mutexpool);
    242. }
    243. //由于线程过多,可能要进行销毁
    244. // 2. 销毁消费者线程
    245. /*
    246. 销毁线程的策略:
    247. 存活的线程数量>忙的线程数量*2 && 存活线程数量>最小线程数量
    248. */
    249. if (liveNum > busyNum * 2 && liveNum > pool->minNum)
    250. {
    251. pthread_mutex_lock(&pool->mutexpool);
    252. pool->exitNum = WORK_THREAD_NUMBER;
    253. pthread_mutex_unlock(&pool->mutexpool);
    254. //让工作者线程去自杀
    255. /*如何让他自杀呢?
    256. 由于线程池有多余的消费者线程不工作
    257. 我们可以通过唤醒消费者线程,让他去自己消亡
    258. */
    259. int i = 0;
    260. for (; i < WORK_THREAD_NUMBER; i++)
    261. {
    262. pthread_cond_signal(&pool->notEmpty);
    263. }
    264. }
    265. }
    266. }
    267. //线程退出函数
    268. void threadExit(ThreadPool *pool)
    269. {
    270. pthread_t tid = pthread_self();
    271. int i = 0;
    272. //遍历消费者线程的线程个数,找到退出线程的ID
    273. for (; i < pool->maxNum; i++)
    274. {
    275. if (pool->threadIDs[i] == tid)
    276. {
    277. pool->threadIDs[i] = 0;
    278. printf("threadExit()消费者线程 :%ld exit...\n", tid);
    279. break;
    280. }
    281. }
    282. pthread_exit(NULL);
    283. }
    284. static int taskQisFull(ThreadPool* pool)
    285. {
    286. return pool->queueCapacity == pool->queueSize;
    287. }
    288. //给线程池添加任务
    289. void threadPoolAdd(ThreadPool* pool,void(*function)(void*),void* args)
    290. {
    291. pthread_mutex_lock(&pool->mutexpool);
    292. //生产者线程:任务队列满要阻塞自己
    293. while(taskQisFull(pool) && !isShutDown(pool))
    294. {
    295. pthread_cond_wait(&pool->notFull,&pool->mutexpool);
    296. }
    297. if(isShutDown(pool))
    298. {
    299. pthread_mutex_unlock(&pool->mutexpool);
    300. return ;
    301. }
    302. //添加任务
    303. pool->taskQ[pool->queueRear].function = function;
    304. pool->taskQ[pool->queueRear].args = args;
    305. pool->queueRear++;
    306. pool->queueRear %= pool->queueCapacity;
    307. pool->queueSize++;
    308. pthread_mutex_unlock(&pool->mutexpool);
    309. //唤醒work线程:
    310. pthread_cond_signal(&pool->notEmpty);
    311. }
    312. //获取线程池工作线程的个数
    313. int threadBusyNum (ThreadPool* pool)
    314. {
    315. pthread_mutex_lock(&pool->mutexbusyNum);
    316. int busyNum = pool->busyNum;
    317. pthread_mutex_unlock(&pool->mutexbusyNum);
    318. return busyNum;
    319. }
    320. //获取线程池存活的线程的个数
    321. int threadLiveNum (ThreadPool* pool)
    322. {
    323. pthread_mutex_lock(&pool->mutexpool);
    324. int liveNum = pool->liveNum;
    325. pthread_mutex_unlock(&pool->mutexpool);
    326. return liveNum;
    327. }
    328. //销毁线程池
    329. int threadPoolDestroy(ThreadPool* pool)
    330. {
    331. if(pool == NULL)
    332. {
    333. return -1;
    334. }
    335. //关闭线程池
    336. pool->shutdown = 1;
    337. //唤醒阻塞的消费者
    338. //存活的线程有多少就唤醒多少
    339. int i = 0;
    340. for(;i < pool->liveNum;i++)
    341. {
    342. pthread_cond_signal(&pool->notEmpty);
    343. }
    344. pthread_join(pool->managerID,NULL);
    345. //释放资源
    346. if(pool->taskQ )
    347. free(pool->taskQ);
    348. if(pool->threadIDs)
    349. free(pool->threadIDs);
    350. pthread_mutex_destroy(&pool->mutexbusyNum);
    351. pthread_mutex_destroy(&pool->mutexpool);
    352. pthread_cond_destroy(&pool->notFull);
    353. pthread_cond_destroy(&pool->notEmpty);
    354. free(pool);
    355. pool = NULL;
    356. return 0;
    357. }

    线程池测试代码

    1. #include"thread_pool.h"
    2. //任务处理函数
    3. void taskFunction(void* args)
    4. {
    5. int num = *(int*)args;
    6. printf("thread: %ld is working,number:%d\n",pthread_self(),num);
    7. sleep(1);
    8. }
    9. int main()
    10. {
    11. //创建线程池
    12. ThreadPool* pool = threadPoolCreate(3,10,20);
    13. //往线程池里面放任务
    14. int i = 0;
    15. for(; i< 20; i++)
    16. {
    17. int *num = (int*)malloc(sizeof(int));
    18. *num = i+1;
    19. threadPoolAdd(pool,taskFunction,(void*)num);
    20. }
    21. sleep(10);
    22. threadPoolDestroy(pool);
    23. return 0;
    24. }

    测试线程池结果

    由于我的测试代码:只搞了3个工作线程(消费者线程),任务队列大小为20,并且搞了20个任务队列进去,所以线程池就会有三个工作线程在抢夺任务工作!

     

  • 相关阅读:
    JDBC 连接数据库的四种方式
    基于 Nebula Graph 构建百亿关系知识图谱实践
    java计算机毕业设计高校防疫物资管理系统MyBatis+系统+LW文档+源码+调试部署
    docker ,k8s 以及 k8s 和 docker的关系
    MATLAB | 你是猫猫教还是狗狗教还是ikun
    Linux高性能服务器编程 学习笔记 第三章 TCP协议详解
    Github工程中的Markdown语言应用
    Docker安装Kibana8.4.3
    中科磐云—2022广西逆向解析思路
    系统架构设计师(第二版)学习笔记----多媒体技术
  • 原文地址:https://blog.csdn.net/qq_40989769/article/details/127588603