• 【Linux多线程服务端编程】| 【05】高效的多线程日志


    索引

    在这里插入图片描述

    1 简介

    【C++模块实现】| 【01】日志系统实现
    【日志类型】

    • 【诊断日志】:log4j、logback、log4cxx、log4cpp等;
    • 【交易日志】:数据库的write-ahead log、文件系统的journaling等,通过回放日志可逐步恢复每一次修改后的状态;

    【日志功能】
    日志通常用来故障诊断和追踪、性能分析等;

    • 对于关键线程,需要记录:
      • 收到每条内部消息的id等;
      • 收到的每条外部消息的全文;
      • 收到的每条消息的全文,每条消息都有全局唯一的id;
      • 关键内部状态的变更,等;

    【日志库】:可分为前、后端;

    • 前端供应用程序使用的接口,并生成日志消息;
    • 后端负责把日志消息到目的地;
    • 一般为singleton;

    【如何将数据高效地传输到后端】

    • 前端(生产者),尽量做到低延迟、低CPU开销、无阻塞;
    • 后端(消费者),要有足够大的吞吐量,并占用较少的资源;

    【前端API风格】

    • 使用printf()风格;
    • 使用stream<<风格;
    • stream的优势:当输出的日志级别高于语句的日志级别,则打印日志是个空操作,开销接近0,而printf无法做到这一点;

    2 功能需求

    • 日志级别,TRACE、DEBUG、INFO、WARN、ERROR、FATAL,在运行时可调,在不同场景下设置不同的级别,一次来看到更多的日志信息,或更少的;
    • 日志消息可能有多个目的地,文件、socket等,一般不往网络写日志,防止加大网络的消耗,而对于日志文件的滚动时必须的;
      • rolling根据文件的大小和时间;
      • 文件名可根据进程名字,创建时间、机器名称、进程id、.log后缀;
    • 日志消息格式可配置;
    • 可设置运行时过滤器,控制不同组件的日志消息的级别的目的地;
    • 上述中,第一项必须,其余……;

    日志文件压缩与归档

    • 应该交给专门的脚本去做,可随意更换压缩算法或归档策略;

    如何解决崩溃,日志丢失

    • 【1】定期3s将缓冲区内的日志消息flush到硬盘;
    • 【2】每条内存张的日志都带有cookie,为某函数地址,可通过core dump文件中查找cookie来找到未写入磁盘的消息;

    日志格式

    在这里插入图片描述

    • 不需要运行时配置,一般不会改变,可节省每条日志解析的开销;
    • 尽量每条日志占一行,容易用awk、sed、grep等分析日志;
    • 可用gettimeofday该函数不是系统调用,开销小;
    • 始终使用GMT时区,可省去本地时区转换麻烦,更易追查事件的顺序;
    • 打印线程id,便于分析多线程的时序,可检测死锁;
    • 打印日志级别,查看是否有ERROR;
    • 打印源码文件和行号;
    • 每行字段都以4个空格分割,便于解析;

    建议
    运行时的日志过滤器,控制不同的部件的输出日志级别,但可用放到编译器做,让整个程序有一个整体的输出级别即可;

    3 性能需求

    • 每秒写几千上万条日志没有显示性能损失;
    • 能应对一个进程产生大量日志数据到场景,如1G/min;
    • 不阻塞正常的执行流程;
    • 在多线程中,没有争用;
    • 日志库可瞬时写满带宽;
    • 若每条日志消息平均长度为110字节,则1秒要写100万条;

    【优化】

    • 时间戳字符串中日期和时间两部分时缓存的,1s内只需格式化微秒部分;
    • 日志消息前4个字段定长,避免在运行期求字符串长度不会反复调用strlen;
    • 线程id预先格式化为str,输出时只需简单拷贝几个字节;
    • 每行日志消息的src文件名采用编译器计算获取;

    4 多线程异步日志

    • 使用一个背景线程负责收集日志消息,写入日志文件,其他业务线程往日志线程发送日志;
    • 由于在one loop per thread在非阻塞中应彻底避免磁盘IO;
    • 我们使用队列将日志前端数据传到后端;

    【直接写日志】

    • 在网络IO或业务线程中写入可能会阻塞几秒,由此导致请求超时,误发心跳包等;

    muduo双缓冲技术

    • 准备两块buffer:A,B;
    • 前端往A填数据,后端将B的数据写入文件;
    • 当A写满后,交换A、B;
    • 批处理,减少线程唤醒的频度,降低开销;
    • 为了及时将消息写入文件,若A未满,也将3s写入一次;

      上述中LargeBuffer大小为4M,auto_type类似unique_ptr,buffers为后端写入的buffer;
      【发送方】
      在这里插入图片描述
    • 其中currentBuffer_为当前缓冲区,nextBuffer_为下一个缓冲区,BufferVector记录写满的缓冲区;
    • 若当前缓冲区装得下消息,则直接添加;
    • 若当前缓冲区装不下消息,则先将currentBuffer_加入到BufferVector中等待写入,并尝试nextBuffer_是否可用;
      • 若可用则将nextBuffer_作为currentBuffer_;
      • 若不可用则将currentBuffer_重新申请内存;
      • 在将数据添加到currentBuffer_;
      • 并通知后端写入;

    【接受方】
    在这里插入图片描述

    • 后端先申请两个Buffer,以及一个写入的BufferVector;
    • 先等待buffers_是否满足,将currentBuffer_加入buffers_,将newBuffer1给currentBuffer_;
    • 在将buffersToWrite和buffers_交换,若nextBuffer_为空,则将newBuffer2给它;

    re-fill newBufer1和newBuffer2会被buffer重新填充,便于替换前端的当前缓冲和预备缓冲,四个缓冲会在启动时填为0,避免出现page fault;

    4.1 前后端交互情况

    前后端各有两个缓冲区以及一个缓冲区数组;

    【前端写日志频度不高,后端3s后超时写入文件】
    在这里插入图片描述
    【3s超时前写满了缓冲区,唤醒后端线程开始写入文件】
    在这里插入图片描述
    【前端需要分配新buffer的情况】

    • 该情况下在短时间内密集写入日志消息,用完两个buf并重新分配新的buf;
      在这里插入图片描述
    • A,B写满,但后端并没有马上写,导致新分配了E,但在1.8+后,后端获取控制权开始写;
    • 【为什么释放E,继续使用A、B呢?】使用A、B(在对象开始就分配)不会造成page fault;

    【文件写入速度慢,导致前端耗尽了两个缓冲区,并分配新的缓冲区】
    在这里插入图片描述

    • 后续将C,D的内存交给new1,new2,E的内存释放;
    4.2 改进措施
    • 可将空闲的缓冲区该成缓冲区数组,在初始化时,给空闲缓冲数组放入足够多的缓冲区;

    在这里插入图片描述

    4.3 若日志消息堆积,该如何处理

    即前端陷入死循环,发送的日志,超过后端处理;

    • 对于同步日志,阻塞IO回限制前端写入速度,起到节流阀的作用;
    • 而对于异步日志,将会在内存中不断堆积,引发性能问题,程序奔溃;
    • 出现该问题,我们可用直接丢弃多余日志;
    4.4 部分源码
    // 往队列增加数据
    void AsyncLogging::append(const char* logline, int len)
    {
      muduo::MutexLockGuard lock(mutex_);
      // 若缓冲区的长度满足加入数据的长度,则直接添加
      if (currentBuffer_->avail() > len)
      {
        currentBuffer_->append(logline, len);
      }
      
      else
      {
      	// 将当前缓冲区添加到写入队列中
        buffers_.push_back(std::move(currentBuffer_));
    
    	// 判断下一个缓冲区是否有效,若有效,则将该缓冲区分配给当前缓冲区
        if (nextBuffer_)
        {
          currentBuffer_ = std::move(nextBuffer_);
        }
        else
        {
        	// 若没有空间,则重新申请
          currentBuffer_.reset(new Buffer); // Rarely happens
        }
    	// 将添加的数据追加的当前缓冲区中
        currentBuffer_->append(logline, len);
    	// 提醒线程写入
        cond_.notify();
      }
    }
    
    // 执行线程函数:写入磁盘
    void AsyncLogging::threadFunc()
    {
      assert(running_ == true);
      latch_.countDown();	// 数量减一
      LogFile output(basename_, rollSize_, false);	// 打开logFile
      BufferPtr newBuffer1(new Buffer);		// 缓冲区1
      BufferPtr newBuffer2(new Buffer);		// 缓冲区2
      newBuffer1->bzero();
      newBuffer2->bzero();					// 初始化为0
      BufferVector buffersToWrite;			// 写入队列
      buffersToWrite.reserve(16);			// 设置大小为16
      
      while (running_)
      {
        assert(newBuffer1 && newBuffer1->length() == 0);
        assert(newBuffer2 && newBuffer2->length() == 0);
        assert(buffersToWrite.empty());
    
        {
          muduo::MutexLockGuard lock(mutex_);
    	  // 等待秒数到达
          if (buffers_.empty())  // unusual usage!
          {
            cond_.waitForSeconds(flushInterval_);
          }
    	  // 将当前缓冲区移动到所有数据都添加到写入队列中
          buffers_.push_back(std::move(currentBuffer_));
    	  // 将buf1的缓冲区交给当当前缓冲区
          currentBuffer_ = std::move(newBuffer1);
    	  // 将队列转移到写入队列
          buffersToWrite.swap(buffers_);
    		// 若该缓冲区为空,则将buf2给它
    	  if (!nextBuffer_)
          {
            nextBuffer_ = std::move(newBuffer2);
          }
        }
    
        assert(!buffersToWrite.empty());
    
    	// 限制大小,若数据堆积太多,则直接删除
        if (buffersToWrite.size() > 25)
        {
          char buf[256];
          snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
                   Timestamp::now().toFormattedString().c_str(),
                   buffersToWrite.size()-2);
          fputs(buf, stderr);
          output.append(buf, static_cast<int>(strlen(buf)));	// 追加
          buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
        }
    
    	// 遍历队列
        for (const auto& buffer : buffersToWrite)
        {
          // FIXME: use unbuffered stdio FILE ? or use ::writev ?
          output.append(buffer->data(), buffer->length());
        }
    
    	// 删除其他无关的数据,只保留2个
        if (buffersToWrite.size() > 2)
        {
          // drop non-bzero-ed buffers, avoid trashing
          buffersToWrite.resize(2);
        }
    
    	// 若buf1为空
        if (!newBuffer1)
        {
          assert(!buffersToWrite.empty());
    		// 将最后一个队列给它
    	  newBuffer1 = std::move(buffersToWrite.back());
          buffersToWrite.pop_back();
          newBuffer1->reset();
        }
    
    	// 若buf2为空
        if (!newBuffer2)
        {
          assert(!buffersToWrite.empty());
    	  // 将最后一个队列给它
          newBuffer2 = std::move(buffersToWrite.back());
          buffersToWrite.pop_back();
          newBuffer2->reset();
        }
    
        buffersToWrite.clear();	// 清空
        output.flush();		// 
      }
      output.flush();
    }
    
    

    5 其他方案

    【使用队列】
    高效的前后端消息传递可使用BlockingQueue/BoundedBlockingQueue,但需要每条日志都分配内存,后端线程需要将其释放;

    【更改core dump文件名】
    通过sysctl设置kernel.core_pattern参数或修改/proc/sys/kernel/core_pattern,让core dump都产生不同的文件;

  • 相关阅读:
    阶段总结(技术向)
    [附源码]java毕业设计网上手机商城
    信息学奥赛一本通:1159:斐波那契数列
    【尚品汇】项目笔记
    golang 通过案列感受下内存分析
    【wpf】wpf中的那些模板之深度解析
    神经网络分类任务
    算法萌新闯力扣:存在重复元素II
    看一遍学会Vue结合axios使用mockjs
    持续提升信息安全运维保障服务能力,天玑科技助力企业快速实现数字化转型
  • 原文地址:https://blog.csdn.net/weixin_45926547/article/details/127079978