• 基于inotify实现落盘文件的跨进程实时读写交互


    本文是在基于kafka原理单机高性能微秒级别队列改造_Sweet_Oranges的博客-CSDN博客基础上进行部分完善的。因为该文提供的代码不完整,无法使用。

    场景:
    线上业务需要一款拥有超低延迟(us),支持多消费者,并且能够处理海量的消息积压的消息队列。

    调研:

    • kafka是我们日常生活中比较常见的消息队列,非常适合做消息的离线处理。但是在一些实时性要求比较高的场景下,消息自带的延迟是不可忍受的,测试发现一条消息转发大概需要200ms的耗时,实际情况可能有所出入,但肯定是毫秒级别的。kafka从设计上就是倾向于面向大众,满足大部分需求。当然满足这些要求的成本就是通过牺牲了性能。所以说kafka适合做离线处理。而不是做一些非常实时的应用。
    • zeromq就是针对实时应用的一款消息队列,提供了各个拓扑结构的链接方式,性能不错,但不足的是当消息积压有可能会写满内存。
    • 市面上目前的消息队列都与我们的设计目标不符。

    设计:
    要想拥有微秒级别的延迟,

    • 不能走网络,数据必须放本地
    • 用磁盘以及顺序io来保证写入读取性能。生产者将数据以换行符(\n)append的形式写入文件(顺序写),消费者getline一直到文件尾(顺序读)
    • 使用inotify等待新数据的产生
    • 不适用get_line, fgets等库函数,减少数据拷贝,自己实现拆包逻辑
    • 读取的buffer_size不能设置的过小,由于下游消费通常存在一定的耗时,我们尽量一次多读取一些,否则系统调用read的成本很高
    • 当消息产生积压的情况,我们采用water_mark机制来自动调节每次数据读取的最大字节

    实现:

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #define EVENT_SIZE (sizeof(struct inotify_event))
    8. #define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
    9. using namespace std;
    10. void cb_(char * line)//消费者的数据处理函数,回调函数
    11. {
    12. printf("%s",line);
    13. }
    14. int main()
    15. {
    16. int wartermark = 1024 * 1024;//用于控制一次读取数据的大小
    17. int max_buffer_size = wartermark * 6;
    18. std::string path_ = "/tmp/writter.txt";//生产者产生的数据的落盘文件
    19. char *line_ = new char[10240]; // 每行缓冲区
    20. char *buffer_ = new char[max_buffer_size]; // 文件缓冲区
    21. char *buffer2_ = new char[BUF_LEN]; // inotify事件缓冲区
    22. int read_ = -1;
    23. // 注册监听文件变化
    24. int ifd_ = inotify_init();
    25. if (ifd_ < 0)
    26. {
    27. return;
    28. }
    29. inotify_add_watch(ifd_, path_.c_str(), IN_MODIFY | IN_CREATE | IN_DELETE);
    30. // 打开文件
    31. int fd_ = open(path_.c_str(), O_RDONLY);
    32. if (fd_ < 0)
    33. {
    34. return;
    35. }
    36. // 读取文件
    37. register long int buffer_size = wartermark;
    38. register char *cs;
    39. register int i = 0;
    40. cs = line_;
    41. bool ok_ = true;
    42. while (ok_)
    43. {
    44. while ((read_ = read(fd_, buffer_, buffer_size)) != 0)
    45. {
    46. for (; i < read_; i++)
    47. {
    48. if ((*cs++ = buffer_[i]) != '\n')
    49. {
    50. continue;
    51. }
    52. //remove last \n
    53. *(--cs) = '\0';
    54. cb_(line_);
    55. if (!ok_)
    56. break;
    57. cs = line_;
    58. }
    59. if (!ok_)
    60. break;
    61. if (read_ == wartermark)
    62. {
    63. buffer_size = max_buffer_size;
    64. }
    65. else
    66. {
    67. buffer_size = wartermark;
    68. }
    69. i = 0;
    70. }
    71. read(ifd_, buffer2_, BUF_LEN);//当没有事件时,程序阻塞在此处,直到有事件到来。
    72. }
    73. return 0;
    74. }

    消费者只需要将自己业务入口注册为cb_,就可以实现消费。

    总结:

    实际测试发现从消息生产到cb_入口消息延迟大概在100个us以内。以上代码虽然实现起来很简单,但是正是由于其简单才保证了超高的性能。

    参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/105428875

  • 相关阅读:
    Docker 安装部署与基础操作
    Spring Security OAuth Client配置加载源码分析
    Linux下安装 SkyWalking 分布式追踪系统
    Nomad系列-Nomad网络模式
    Gartner发布2024年十大战略技术趋势
    Spring(完整版)
    如何实现MQTT的Java代码
    Java(一)(引用类型的参数在传递,方法重载,面向对象编程基础)
    113.Impala ODBC驱动的安装及配置
    计算机毕设(附源码)JAVA-SSM基于Java学生宿舍管理系统
  • 原文地址:https://blog.csdn.net/jinking01/article/details/126228732