• muduo源码剖析之AsyncLogging异步日志类


    简介

    AsyncLogging是muduo的日志,程序如果直接让文件写日志可能会发生阻塞,muduo前端设计了2个BufferPtr,分别是currentBuffer_和nextBuffer_,还有一个存放BufferPtr的vector(buffers_)。

    多个前端线程往currentBuffer_写数据,currentBuffer_写满了将其放入buffers_,通知后端线程读。前端线程将currentBuffer_和nextBuffer_替换继续写currentBuffer_。
    后端也有2个BufferPtr,分别为newBuffer1和newBuffer2,还有一个BufferVector(buffersToWrite)。后端线程在收到前端通知之后,利用buffersToWrite和buffers_进行交换,并且用newBuffer1和newBuffer2归还给前端的currentBuffer_和nextBuffer_,然后把日志写入文件

    muduo日志文件只提供写入本地文件

    后端线程写入条件:

    1. 前端线程缓冲区写完后通过条件变量通知后端线程写入
    2. 超时,muduo设置的是默认时间是3秒(AsyncLogging构造函数第三个参数flushInterval_),

    源码剖析

    AsyncLogging.h

    // Use of this source code is governed by a BSD-style license
    // that can be found in the License file.
    //
    // Author: Shuo Chen (chenshuo at chenshuo dot com)
    
    #ifndef MUDUO_BASE_ASYNCLOGGING_H
    #define MUDUO_BASE_ASYNCLOGGING_H
    
    #include "muduo/base/BlockingQueue.h"
    #include "muduo/base/BoundedBlockingQueue.h"
    #include "muduo/base/CountDownLatch.h"
    #include "muduo/base/Mutex.h"
    #include "muduo/base/Thread.h"
    #include "muduo/base/LogStream.h"
    
    #include 
    #include 
    
    namespace muduo
    {
    
    class AsyncLogging : noncopyable
    {
     public:
    
      AsyncLogging(const string& basename,
                   off_t rollSize,
                   int flushInterval = 3);
    
      ~AsyncLogging()
      {
        if (running_)
        {
          stop();
        }
      }
      
      void append(const char* logline, int len);
    
      void start()
      {
        running_ = true;
        thread_.start();
        latch_.wait();//保证后端线程已经开始运行
      }
    
      void stop() NO_THREAD_SAFETY_ANALYSIS
      {
        running_ = false;
        cond_.notify();
        thread_.join();
      }
    
     private:
    
      void threadFunc();
    
      typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
      typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
      typedef BufferVector::value_type BufferPtr;
    
      //存储超时时间变量
      const int flushInterval_;		
      //后端线程启动标志
      std::atomic<bool> running_;
      //日志文件名
      const string basename_;
      //预留的日志大小
      const off_t rollSize_;
      //调用后端写入线程
      muduo::Thread thread_;
      //该变量保证后端日志写入线程已经运行
      muduo::CountDownLatch latch_;
      
      //条件变量与互斥锁,用来保证前端线程与后端线程的线程同步
      muduo::MutexLock mutex_;
      muduo::Condition cond_ GUARDED_BY(mutex_);
        
      //前端线程当前写入缓冲区
      BufferPtr currentBuffer_ GUARDED_BY(mutex_);
      //前端线程下一个备用缓冲区
      BufferPtr nextBuffer_ GUARDED_BY(mutex_);
      //带写入文件已填满的缓冲区队列
      BufferVector buffers_ GUARDED_BY(mutex_);
    };
    
    }  // namespace muduo
    
    #endif  // MUDUO_BASE_ASYNCLOGGING_H
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    AsyncLogging.cc

    前端线程

    前端在生成一条日志消息的时候会调用AsyncLogging::append()。在这个函数中,如果当前缓冲(currentBuff_)剩余的空间足够大,则会直接把日志消息拷贝(追加)到当前缓冲中,这是最常见的情况。
    这里拷贝一条日志消息并不会带来多大开销。前后端代码的其余部分都没有拷贝,而是简单的指针交换。否则,说明当前缓冲已经写满,就把它送入(移入)buffers,并试图把预备好的另一块缓冲(nextBuffer_)移用(move)为当前缓冲,然后追加日志消息并通知(唤醒)后端开始写入日志数据。
    以上两种情况在临界区之内都没有耗时的操作,运行时间为常数。
    如果前端写入速度太快,一下子把两块缓冲都用完了,那么只好分配一块新的buffer,作为当前缓冲,这是极少发生的情况

    后端线程

    首先准备好两块空闲的buffer,以备在临界区内交换。
    在临界区内,等待条件触发,这里的条件有两个:其一是超时,其二是前端写满了一个或多个buffer。
    注意这里是非常规的conditionvariable用法,它没有使用while循环,而且等待时间有上限。当“条件”满足时,先将当前缓冲(currentBuffe_)移入buffers_,并立刻将空闲的newBuffer1移为当前缓冲。
    注意这整段代码位于临界区之内,因此不会有任何race condition。接下来将buffer_与buffersToWrite交换,后面的代码可以在临界区之外安全地访问buffersToWrite,将其中的日志数据写入文件。
    临界区里最后干的一件事情是用newBuffer2替换nextBuffer,这样前端始终有一个预备buffer可供调配。nextBuffer_可以减少前端临界区分配内存的概率,缩短前端临界区长度。注意到后端临界区内也没有耗时的操作,运行时间为常数。会buffersToWrite内的buffer重新填充newBuffer1和newBuffer2,这样下一次执行的时候还有两个空闲buffer可用于替换前端的当前缓冲和预备缓冲。
    最后,这四个缓冲在程序启动的时候会全部填充为0,这样可以避免程序热身时page fault引发性能不稳定。

    // Use of this source code is governed by a BSD-style license
    // that can be found in the License file.
    //
    // Author: Shuo Chen (chenshuo at chenshuo dot com)
     
    #include "muduo/base/AsyncLogging.h"
    #include "muduo/base/LogFile.h"
    #include "muduo/base/Timestamp.h"
     
    #include 
     
    using namespace muduo;
     
    //异步日志
    AsyncLogging::AsyncLogging(const string& basename,
                               off_t rollSize,
                               int flushInterval)
      : flushInterval_(flushInterval),//后端线程超时时间变量,默认为3s
        running_(false),//后端线程启动标志
        basename_(basename),//设置输出日志文件名
        rollSize_(rollSize),// 预留的日志大小
        thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),// 执行该异步日志记录器的线程
        latch_(1),//该变量作用是保证后端线程已进入
        mutex_(),
        cond_(mutex_),
        currentBuffer_(new Buffer), //当前缓冲区
        nextBuffer_(new Buffer),//预备缓冲区
        buffers_()//缓冲区队列
    {
      currentBuffer_->bzero(); //清空
      nextBuffer_->bzero();//清空
      buffers_.reserve(16);//设置缓冲区队列大小为16  
    }
     
    //所有LOG_*最终都会调用append函数
    void AsyncLogging::append(const char* logline, int len)
    {
      muduo::MutexLockGuard lock(mutex_);
      if (currentBuffer_->avail() > len)     // 如果当前buffer还有空间,就添加到当前日志
      {
        currentBuffer_->append(logline, len);//调用vector的append
      }
      else
      {
        //将使用完后的buffer添加到 buffer vector后
        buffers_.push_back(std::move(currentBuffer_));
     
        if (nextBuffer_)    // 重新设置当前buffer
        {
          currentBuffer_ = std::move(nextBuffer_);
        }
        else
        {
          currentBuffer_.reset(new Buffer); // Rarely happens
          //如果前端写入速度太快了,一下子把两块缓冲都用完了,那么只好分配一块新的buffer,作当前缓冲,这是极少发生的情况
        }
        currentBuffer_->append(logline, len);
     
        // 通知日志线程,有数据可写
        cond_.notify();
      }
    }
     
    void AsyncLogging::threadFunc()             // 线程调用的函数,主要用于周期性的flush数据到日志文件中
    {
      assert(running_ == true);
      latch_.countDown();
      LogFile output(basename_, rollSize_, false);//打开日志文件
      BufferPtr newBuffer1(new Buffer);           //这两个是后台线程的buffer 
      BufferPtr newBuffer2(new Buffer);
      newBuffer1->bzero();//clear
      newBuffer2->bzero();//clear
      BufferVector buffersToWrite;			//用来和前台线程的buffers_进行swap
      buffersToWrite.reserve(16);			//预留空间
      while (running_)
      {
        assert(newBuffer1 && newBuffer1->length() == 0);
        assert(newBuffer2 && newBuffer2->length() == 0);
        assert(buffersToWrite.empty());
     
        {
          muduo::MutexLockGuard lock(mutex_);
    	  //如果buffer为空,那么表示没有数据需要写入文件,那么就等待指定的时间(默认三秒)
          if (buffers_.empty())  // unusual usage!
          {
            cond_.waitForSeconds(flushInterval_);
          }
     
    	  //无论cond是因何而醒来,都要将currentBuffer_放到buffers_中。  
          //如果是因为时间到而醒,那么currentBuffer_还没满,此时也要将之写入LogFile中。  
          //如果已经有一个前台buffer满了,那么在前台线程中就已经把一个前台buffer放到buffers_中  
          //了。此时,还是需要把currentBuffer_放到buffers_中(注意,前后放置是不同的buffer,  
          //因为在前台线程中,currentBuffer_已经被换成nextBuffer_指向的buffer了)
          buffers_.push_back(std::move(currentBuffer_));   //currentBuffer_是当前缓冲区
     
    	  /*---归还一个buffer---*/ // 将新的buffer转成当前缓冲区
          currentBuffer_ = std::move(newBuffer1);
    	  //使用新的未使用的 buffersToWrite 交换 buffers_,将buffers_中的数据在异步线程中写入LogFile中
          buffersToWrite.swap(buffers_);//内部指针交换,而非复制
          if (!nextBuffer_)
          {
            nextBuffer_ = std::move(newBuffer2);/*-----假如需要,归还第二个----*/
          }
        }
     
        assert(!buffersToWrite.empty());
     
    	// 如果将要写入文件的buffer列表中buffer的个数大于25,那么将多余数据删除  
        // 消息堆积
        //前端陷入死循环,拼命发送日志消息,超过后端的处理能力
        //这是典型的生产速度超过消费速度,会造成数据在内存中的堆积
        //严重时引发性能问题(可用内存不足),
        //或程序崩溃(分配内存失败)
        if (buffersToWrite.size() > 25)
        {
          char buf[256];
          snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
                   Timestamp::now().toFormattedString().c_str(),
                   buffersToWrite.size()-2);
          fputs(buf, stderr);
          output.append(buf, static_cast<int>(strlen(buf)));
     
    	  // 丢掉多余日志,以腾出内存,仅保留两块缓冲区
          buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
        }
     
    	 // 将buffersToWrite的数据写入到日志中
        for (const auto& buffer : buffersToWrite)
        {
          // FIXME: use unbuffered stdio FILE ? or use ::writev ?
          output.append(buffer->data(), buffer->length());
        }
     
    	// 重新调整buffersToWrite的大小 
        if (buffersToWrite.size() > 2)
        {
          // drop non-bzero-ed buffers, avoid trashing
          buffersToWrite.resize(2);
        }
     
        if (!newBuffer1)
        {
          assert(!buffersToWrite.empty());
    	  // 从buffersToWrite中弹出一个作为newBuffer1
          newBuffer1 = std::move(buffersToWrite.back());
          buffersToWrite.pop_back();
          newBuffer1->reset();// 清理newBuffer1
        }
     
    	//前台buffer是由newBuffer1 2 归还的。现在把buffersToWrite的buffer归还给后台buffer
        if (!newBuffer2)
        {
          assert(!buffersToWrite.empty());
          newBuffer2 = std::move(buffersToWrite.back());
          buffersToWrite.pop_back();
          newBuffer2->reset();
        }
     
        buffersToWrite.clear();
        output.flush();//刷新日志文件(写入)
      }
      output.flush();
    }
     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164

    如果日志消息堆积怎么办

    万一前端陷入死循环,拼命发送日志消息,超过后端的处理(输出)能力,会导致什么后果?对于同步日志来说,这不是问题,因为阻塞IO自然就限制了前端的写入速度,起到了节流阀的作用。但是对于异步日志来说,这就是典型的生产速度高于消费速度问题,会造成数据在内存中堆积,严重时引发性能问题(可用内存不足)或程序崩溃(分配内存失败)。
    muduo日志库处理日志堆积的方法很简单:直接丢掉多余的日志buffer,以腾出内存,见这样可以防止日志库本身引起程序故障,是一种自我保护措施。

  • 相关阅读:
    笔试强训 Day6
    Jetson Orin平台多路 FPDlink Ⅲ相机采集套装推荐
    AUTOSAR之网络管理API定义
    网络基础入门(网络基础概念详解)
    Linux内核--链表结构
    Python进阶复习-自带库
    Ego-Swarm编译与运行
    [Redis]Redis客户端
    vue3+ts组件练习(defineExpose defineEmits defineProps)
    DataGridXL 2.0 for JavaScript Crack
  • 原文地址:https://blog.csdn.net/weixin_50448879/article/details/133976344