• 基于信号量与环形队列实现读写异步缓存队列


    目录

    一、需求 

    二、技术思路

    三、示例代码


    一、需求 

             实现一个读写异步的缓存队列,主要实现功能如下:

            1、缓存队列作为临界资源,要是线程安全的,不会出现读线程与写线程同时操作缓存队列的情况发生。

            2、当缓存队列被塞满以后,写线程阻塞等待读线程读取数据。

            3、当缓存队列空时,读线程需要阻塞等待写线程写入数据。

            4、可指定缓存队列的长度,缓存队列中,存放 byte 类型的数据。

    二、技术思路

            基于信号量与环形队列实现。

            参考《现代操作系统》,使用了三个信号量:一个称为full,用来记录充满的缓冲槽数目;一个称为empty,记录空的缓冲槽总数;一个称为mutex,用来确保生产者和消费者不会同时访问缓冲区。full的初值为0,empty的初值为缓冲区中槽的数目,mutex初值为1。供两个或多个进程使用的信号量,其初值为1,保证同时只有一个进程可以进入临界区 。

            环形队列是为了能够重复利用队列中的空间。当读数据或写数据抵达队列边界时,能够跳到开头衔接上。由于已经使用信号量做队列长度的约束,所以在使用环形队列时可以省不少事,只需要实现首尾跳转即可,无需关心队头与队尾位置的问题。

    三、示例代码

            仅作参考,未贴头文件。

    queue.c 

    1. #include "queue.h"
    2. typedef struct shared_queue
    3. {
    4. sem_t queue_full;
    5. sem_t queue_empty;
    6. sem_t queue_lock;
    7. item *queue;
    8. int queue_len;
    9. // ring queue
    10. int tail;
    11. int font;
    12. }sharq;
    13. int queue_init(sharq *que, int len)
    14. {
    15. if (que == NULL)
    16. {
    17. printf("[error] : the pointer of queue is NULL\n");
    18. return ERROR;
    19. }
    20. que->queue = (item *)malloc(sizeof(item) * len);
    21. if (que->queue == NULL)
    22. {
    23. printf("[error] : queue malloc failed\n");
    24. return ERROR;
    25. }
    26. que->queue_len = len;
    27. que->tail = 0;
    28. que->font = 0;
    29. sem_init(&que->queue_full, 0, 0);
    30. sem_init(&que->queue_empty, 0, len);
    31. sem_init(&que->queue_lock, 0, 1);
    32. return OK;
    33. }
    34. int queue_write(sharq *que, byte *input, int len_in)
    35. {
    36. sem_wait(&que->queue_empty);
    37. sem_wait(&que->queue_lock);
    38. // insert
    39. item *it = que->queue + que->tail;
    40. it->buff = (byte *)malloc(len_in);
    41. if (it->buff == NULL)
    42. {
    43. printf("[error] : item malloc failed\n");
    44. return ERROR;
    45. }
    46. memcpy(it->buff, input, len_in);
    47. it->buff_len = len_in;
    48. // ring queue
    49. que->tail++;
    50. que->tail %= que->queue_len;
    51. sem_post(&que->queue_lock);
    52. sem_post(&que->queue_full);
    53. return OK;
    54. }
    55. int queue_read(sharq *que, byte *output, int len_out)
    56. {
    57. sem_wait(&que->queue_full);
    58. sem_wait(&que->queue_lock);
    59. // delete
    60. item *it = que->queue + que->font;
    61. if (len_out < it->buff_len)
    62. {
    63. printf("[error] : length of output_buff is lower than item_buff\n");
    64. return ERROR;
    65. }
    66. memcpy(output, it->buff, it->buff_len);
    67. // ring queue
    68. que->font++;
    69. que->font %= que->queue_len;
    70. sem_post(&que->queue_lock);
    71. sem_post(&que->queue_empty);
    72. return OK;
    73. }
    74. int queue_destroy(sharq *que)
    75. {
    76. if (que == NULL)
    77. {
    78. printf("[error] : can not destroy NULL queue\n");
    79. return ERROR;
    80. }
    81. sem_destroy(&que->queue_full);
    82. sem_destroy(&que->queue_empty);
    83. sem_destroy(&que->queue_lock);
    84. if (que->queue != NULL)
    85. {
    86. free(que->queue);
    87. }
    88. return OK;
    89. }

    main.c 

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include "./queue/queue.h"
    7. #define TEST_SUM 2000
    8. void *producer(void *args)
    9. {
    10. sharq *queue = (sharq *)args;
    11. // test data
    12. char strs[50] = "this is a test ";
    13. int len = strlen(strs);
    14. for (int i = 0; i < TEST_SUM; i++)
    15. {
    16. strs[len - 1] = '0' + i % 10;
    17. int res = queue_write(queue, (byte *)strs, len);
    18. if (res == ERROR)
    19. {
    20. printf("[ERROR] : write error\n");
    21. }
    22. }
    23. printf("write Over(%d)!\n", TEST_SUM);
    24. }
    25. void *consumer(void *args)
    26. {
    27. sharq *queue = (sharq *)args;
    28. byte *read_buff = (byte *)malloc(BLOCK);
    29. int count = 0;
    30. while(1)
    31. {
    32. bzero(read_buff, BLOCK);
    33. int res = queue_read(queue, read_buff, BLOCK);
    34. if (res == ERROR)
    35. {
    36. printf("[ERROR] : writer failed\n");
    37. break;
    38. }
    39. //printf("str : %s\n", (char *)read_buff);
    40. count++;
    41. if (count == TEST_SUM)
    42. {
    43. break;
    44. }
    45. }
    46. printf("read Over(%d)!\n", TEST_SUM);
    47. free(read_buff);
    48. }
    49. int main(void)
    50. {
    51. sharq queue;
    52. queue_init(&queue, 10);
    53. pthread_t pro_t, con_t;
    54. pthread_create(&pro_t, NULL, producer, &queue);
    55. pthread_create(&con_t, NULL, consumer, &queue);
    56. pthread_join(pro_t, NULL);
    57. pthread_join(con_t, NULL);
    58. return 0;
    59. }

    四、数据填充

             此处无需再看,只是为了满足CSDN发文助手的文章质量检测。

            以下内容由“废话生成器”生成。

            鲁巴金曾经说过,读书是在别人思想的帮助下,建立起自己的思想。这不禁令我深思。 在这种困难的抉择下,本人思来想去,寝食难安。 我们都知道,只要有意义,那么就必须慎重考虑。 我们都知道,只要有意义,那么就必须慎重考虑。 我认为, 塞涅卡在不经意间这样说过,真正的人生,只有在经过艰难卓绝的斗争之后才能实现。我希望诸位也能好好地体会这句话。 总结的来说, 罗曼·罗兰在不经意间这样说过,只有把抱怨环境的心情,化为上进的力量,才是成功的保证。这启发了我, 维龙在不经意间这样说过,要成功不需要什么特别的才能,只要把你能做的小事做得好就行了。这不禁令我深思。 既然如何, 在这种困难的抉择下,本人思来想去,寝食难安。 检测垃圾的发生,到底需要如何做到,不检测垃圾的发生,又会如何产生。 马云曾经说过,最大的挑战和突破在于用人,而用人最大的突破在于信任人。这句话语虽然很短,但令我浮想联翩。

  • 相关阅读:
    网课自动暂停解决方法、挂课后台播放方法、解决继续教育自动暂停
    今年快30岁的我,还是选择了裸辞···
    java基础---缓冲流、数据流、打印流
    阿里云CentOS 安装 Nginx
    [附源码]java毕业设计高校学生疫情防控信息管理系统
    Methods and Interfaces Part3
    vue3-admin-element框架登录如何修改?
    【视频】复杂网络分析CNA简介与R语言对婚礼数据聚类(社区检测)和可视化|数据分享
    数据结构·单链表
    7-zip
  • 原文地址:https://blog.csdn.net/qq_37437983/article/details/126212626