本文是在基于kafka原理单机高性能微秒级别队列改造_Sweet_Oranges的博客-CSDN博客基础上进行部分完善的。因为该文提供的代码不完整,无法使用。
场景:
线上业务需要一款拥有超低延迟(us),支持多消费者,并且能够处理海量的消息积压的消息队列。
调研:
设计:
要想拥有微秒级别的延迟,
实现:
- #include
- #include
- #include
- #include
- #include
- #include
-
- #define EVENT_SIZE (sizeof(struct inotify_event))
- #define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
- using namespace std;
-
- void cb_(char * line)//消费者的数据处理函数,回调函数
- {
- printf("%s",line);
- }
-
- int main()
- {
- int wartermark = 1024 * 1024;//用于控制一次读取数据的大小
- int max_buffer_size = wartermark * 6;
-
- std::string path_ = "/tmp/writter.txt";//生产者产生的数据的落盘文件
- char *line_ = new char[10240]; // 每行缓冲区
- char *buffer_ = new char[max_buffer_size]; // 文件缓冲区
- char *buffer2_ = new char[BUF_LEN]; // inotify事件缓冲区
-
- int read_ = -1;
- // 注册监听文件变化
- int ifd_ = inotify_init();
- if (ifd_ < 0)
- {
- return;
- }
- inotify_add_watch(ifd_, path_.c_str(), IN_MODIFY | IN_CREATE | IN_DELETE);
- // 打开文件
- int fd_ = open(path_.c_str(), O_RDONLY);
- if (fd_ < 0)
- {
- return;
- }
- // 读取文件
- register long int buffer_size = wartermark;
- register char *cs;
- register int i = 0;
- cs = line_;
- bool ok_ = true;
- while (ok_)
- {
- while ((read_ = read(fd_, buffer_, buffer_size)) != 0)
- {
- for (; i < read_; i++)
- {
- if ((*cs++ = buffer_[i]) != '\n')
- {
- continue;
- }
- //remove last \n
- *(--cs) = '\0';
- cb_(line_);
- if (!ok_)
- break;
- cs = line_;
- }
- if (!ok_)
- break;
- if (read_ == wartermark)
- {
- buffer_size = max_buffer_size;
- }
- else
- {
- buffer_size = wartermark;
- }
- i = 0;
- }
- read(ifd_, buffer2_, BUF_LEN);//当没有事件时,程序阻塞在此处,直到有事件到来。
- }
-
- return 0;
- }
消费者只需要将自己业务入口注册为cb_,就可以实现消费。
实际测试发现从消息生产到cb_入口消息延迟大概在100个us以内。以上代码虽然实现起来很简单,但是正是由于其简单才保证了超高的性能。
参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/105428875