• 【APUE】补充 — 基于管道的线程池


    目录

    一、引言

    二、代码实现 

    三、思考 


    一、引言

    线程章节的 3.2 部分,我们曾经提到过线程池的实现

    在当时的代码中,我们仅仅用的一个 int 类型的变量来表示这个“池”,用来存放任务

    显然这个池太小了,如果下游线程很多,可能会出现以下情况:

    我们只需要将任务池的容量增大点,就可以很好地减少上述提到的多次调度带来的上下文切换开销

    池需要用一个数据结构来维护其池中的任务(数据),我们选用队列进行实现

    其实队列这个数据结构的特征和管道非常类似,数据从队列尾入队,从队列首出队数据写入管道的写端,然后从管道的读端读取数据

    我们接下来将动手实现一个队列(管道),并基于该管道扩充池的容量。其实内核也提供了现成的管道,我们不直接使用内核提供的管道主要有以下两点考虑:

    • 内核提供的管道可用于进程间通信,线程间通信没必要用内核提供的机制,用的话就有点儿大材小用了
    • 自己实现管道能够深入了解管道的特点,为后续讲解进程间通信做铺垫(到时候会用到内核提供的管道)

    二、代码实现 

    mypipe.h,暴露接口

    1. #ifndef MYPIPE_H__
    2. #define MYPIPE_H__
    3. #define PIPESIZE 1024
    4. #define MYPIPE_READ 0b00000001UL
    5. #define MYPIPE_WRITE 0b00000010UL
    6. typedef void mypipe_t;
    7. mypipe_t* mypipe_init(void); // 创建(初始化)一个管道
    8. int mypipe_register(mypipe_t *, int opmap); // 注册用户身份
    9. int mypipe_unregister(mypipe_t *, int opmap); // 取消注册用户身份
    10. int mypipe_read(mypipe_t *, int *buf, size_t count); // 读管道
    11. int mypipe_write(mypipe_t *, const int *buf, size_t count); // 写管道
    12. int mypipe_destroy(mypipe_t *); // 销毁管道
    13. #endif

    mypipe.c,实现接口。注意如何将在 mypipe.h 中隐藏的数据结构解除隐藏

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include "mypipe.h"
    6. struct mypipe_st { // 并发队列
    7. int head; // 指向队头
    8. int tail; // 指向队尾
    9. int data[PIPESIZE]; // 存放数据的地方
    10. int datasize; // 队列中有效数据的个数
    11. pthread_mutex_t mutex; // 互斥量
    12. pthread_cond_t cond; // 条件变量
    13. int count_rd, count_wr; // 读者写者计数
    14. // 从这里看出,我们需要用户指定其读者/写者的身份
    15. };
    16. mypipe_t * mypipe_init(void) { // 初始化一个管道
    17. struct mypipe_st *me;
    18. me = malloc(sizeof(*me));
    19. if (me == NULL)
    20. return NULL;
    21. me->head = 0;
    22. me->tail = 0;
    23. me->datasize = 0;
    24. me->count_rd = 0; // 读者个数为0
    25. me->count_wr = 0; // 写者个数为0
    26. pthread_mutex_init(&me->mutex, NULL);
    27. pthread_cond_init(&me->cond, NULL);
    28. return me;
    29. }
    30. int mypipe_register(mypipe_t *ptr, int opmap) { // 用户通过该函数指定其身份
    31. struct mypipe_st * me = ptr; // 之前在.h文件中隐藏了ptr所表征的数据结构,在这里取消隐藏
    32. pthread_mutex_lock(&me->mutex); // 可能多个线程同时注册身份,需要互斥
    33. if (opmap & MYPIPE_READ) // 将宏看成位图,注意位图操作
    34. me->count_rd++;
    35. if (opmap & MYPIPE_WRITE)
    36. me->count_wr++;
    37. pthread_mutex_unlock(&me->mutex);
    38. return 0;
    39. }
    40. int mypipe_unregister(mypipe_t *ptr, int opmap) { // 用户通过该函数取消注册其身份
    41. struct mypipe_st * me = ptr;
    42. pthread_mutex_lock(&me->mutex);
    43. if (opmap & MYPIPE_READ)
    44. me->count_rd--;
    45. if (opmap & MYPIPE_WRITE)
    46. me->count_wr--;
    47. pthread_cond_broadcast(&me->cond); // 可能读者或者写者被减为0了,需要通知一下read及write
    48. pthread_mutex_unlock(&me->mutex);
    49. return 0;
    50. }
    51. int mypipe_read(mypipe_t * ptr, int *buf, size_t count) {
    52. struct mypipe_st * me = ptr;
    53. pthread_mutex_lock(&me->mutex);
    54. while (me->datasize <= 0 && me->count_wr > 0) // 当管道中没有数据,但是还有写者,就继续等待
    55. {
    56. pthread_cond_wait(&me->cond, &me->mutex); // 等待管道中有数据
    57. // 等待写者数量变为0
    58. }
    59. if (me->datasize <= 0 && me->count_wr <= 0)
    60. {
    61. pthread_mutex_unlock(&me->mutex); // 没有写者且管道中没有数据,就直接返回读取到的字节数为0
    62. return 0;
    63. }
    64. int i;
    65. for (i = 0; i < count; ++i) { // 读取
    66. if (me->datasize <= 0)
    67. break;
    68. *(buf+i) = me->data[me->head];
    69. me->head = (me->head + 1)%PIPESIZE; // 读出来了,相当于队列首出队了一个元素
    70. me->datasize--;
    71. }
    72. pthread_cond_broadcast(&me->cond); // 告诉写者能写了
    73. pthread_mutex_unlock(&me->mutex);
    74. return i;
    75. }
    76. int mypipe_write(mypipe_t *ptr, const int *buf, size_t count) {
    77. struct mypipe_st * me = ptr;
    78. pthread_mutex_lock(&me->mutex);
    79. while (me->datasize >= PIPESIZE && me->count_rd > 0) // 当管道满,但是还有读者,就继续等待
    80. pthread_cond_wait(&me->cond, &me->mutex); // 等待读者读出数据,使管道不满
    81. // 等待读者数量变为0
    82. if (me->datasize >= PIPESIZE && me->count_rd <= 0) // 没有读者后,且管道已经满了,就没必要继续写了,直接返回
    83. {
    84. pthread_mutex_unlock(&me->mutex);
    85. return 0;
    86. }
    87. int i;
    88. for (i = 0; i < count; ++i) { // 写入
    89. if (me->datasize == PIPESIZE)
    90. break;
    91. me->data[me->tail] = *(buf+i);
    92. me->tail = (me->tail + 1)%PIPESIZE;
    93. me->datasize++;
    94. }
    95. pthread_cond_broadcast(&me->cond); // 告诉读者能读了
    96. pthread_mutex_unlock(&me->mutex);
    97. return i;
    98. }
    99. int mypipe_destroy(mypipe_t * ptr) {
    100. struct mypipe_st * me = ptr;
    101. pthread_mutex_destroy(&me->mutex);
    102. pthread_cond_destroy(&me->cond);
    103. free(ptr);
    104. return 0;
    105. }

    main.c,演示用户如何使用接口。main 首先创建 10 个线程(这 10 个线程负责从池中获取数据并判断是否为质数),然后 main 线程自身源源不断往池中写入数据。我们需要用我们上述写的管道来表征这个池,注意如何使用我们的接口:

    1. 创建管道
    2. 往管道写入数据前需要注册写者身份,写入完毕后需要取消注册的身份
    3. 从管道读取数据前需要注册读者身份,读取完毕后需要取消注册的身份
    4. 管道用完后需要销毁管道
    1. #include
    2. #include
    3. #include
    4. #include
    5. #include "mypipe.h"
    6. #define NUM_THREADS 10
    7. #define START_NUM 30000000
    8. #define END_NUM 30000200
    9. // 质数判断函数
    10. int is_prime(int number) {
    11. if (number <= 1) return 0;
    12. for (int i = 2; i * i <= number; i++) {
    13. if (number % i == 0) return 0;
    14. }
    15. return 1;
    16. }
    17. // 线程函数
    18. void *reader_function(void *arg) {
    19. mypipe_t *pipe = (mypipe_t *)arg;
    20. int number;
    21. mypipe_register(pipe, MYPIPE_READ);
    22. while (mypipe_read(pipe, &number, 1) > 0) {
    23. if (is_prime(number)) {
    24. printf("Prime number: %d\n", number);
    25. }
    26. }
    27. mypipe_unregister(pipe, MYPIPE_READ);
    28. return NULL;
    29. }
    30. int main() {
    31. pthread_t threads[NUM_THREADS];
    32. mypipe_t *pipe = mypipe_init();
    33. // 创建读线程
    34. for (int i = 0; i < NUM_THREADS; i++) {
    35. pthread_create(&threads[i], NULL, reader_function, pipe);
    36. }
    37. // 主线程写入数据
    38. mypipe_register(pipe, MYPIPE_WRITE);
    39. for (int i = START_NUM; i <= END_NUM; i++) {
    40. mypipe_write(pipe, &i, 1);
    41. }
    42. mypipe_unregister(pipe, MYPIPE_WRITE);
    43. // 等待所有读线程完成
    44. for (int i = 0; i < NUM_THREADS; i++) {
    45. pthread_join(threads[i], NULL);
    46. }
    47. // 销毁管道
    48. mypipe_destroy(pipe);
    49. return 0;
    50. }

    结果示例如下

    我们判断的是从 30000000 到 30000200 之间的质数,结果和线程章节的 3.2 中所实现代码的运行结果一致,说明代码没毛病 

    我们从我们自己创建的管道,可以看出管道的如下几个特点

    • 管道通信是单工的。一方作为读者,另一方作为写者。写永远写在尾部,读永远是从首部读
    • 管道必须凑齐读写双方才能正常运行。注意 mypipe_read 和 mypipe_write 中的 while 循环条件:只要缺失读者或者写者,另一方就可能直接返回而不会等待新的数据(哪怕后面可能有新的读者或者写者加入并读写管道)
    • 管道内部自带同步与互斥机制

    三、思考 

    上述代码的一个缺陷是:没有办法约束用户的行为!

    比如我是一个用户,我注册了写者身份后却调用的 write......

    那可不妥!!一种好的思路是引入权限的概念,并对上述接口再封装一层......

    上述蓝色字体表示封装出来的新的接口,以供用户调用绿色字体介绍了这样封装的思想和逻辑。其实这就是 UNIX “一切皆文件”思想的部分实现方式!即:就算最底层可能是完全不一样的东东(操作管道、操作设备、操作普通文件......),也会再封装一层,并提供通用的接口(如 open、read、write、close 和文件描述符 fd......)。这样一来,在用户的视角里,操作文件的接口也可以操作很多不一样的东东,因此“一切皆文件”

  • 相关阅读:
    Linux云主机安全入侵排查步骤
    2. 验证1101序列(Mealy)
    Android SELinux 参数语法介绍及基础分析
    python二次开发Solidworks:读取样条曲线数据
    springboot+redis水果超市商城系统(源码+sql+论文报告)
    java计算机毕业设计高校学生智慧党建系统设计与开发MyBatis+系统+LW文档+源码+调试部署-++
    安卓动态代理
    Crack:WebKitX ActiveX and WebKitX VHX
    python 进行图片的文字识别
    【Metal学习笔记】--02.调用Metal-cpp从零编写C++程序
  • 原文地址:https://blog.csdn.net/qq_41657851/article/details/134424783