• muduo源码剖析之EventLoop事件循环类


    简介

    EventLoop.cc就相当于一个reactor,多线程之间的函数调用(用eventfd唤醒),epoll处理,超时队列处理,对channel的处理。运行loop的进程被称为IO线程,EventLoop提供了一些API确保相应函数在IO线程中调用,确保没有用互斥量保护的变量只能在IO线程中使用,也封装了超时队列的基本操作。

    成员及属性解析

    一个事件循环,注意,一个创建了EventLoop对象的线程是workloop线程

    主要接口

    loop

    死循环,阻塞在Poller的poll函数,等待唤醒唤醒后执行ChannelList中每个Channel的回调最后执行任务队列中的Functor

    runInLoop

    在IO线程中执行用户回调Functor,若调用者非IO线程,则会调用queueInLoop

    queueInLoop

    当调用者并非当前EventLoop所在线程时,将Functor存入EventLoop的任务队列从而保证Functor由IO线程执行,这是线程安全的保证之一

    updateChannel与removeChannel

    核心中的核心,通过这个公有接口建立起Channel和Poller沟通的桥梁Channel通过这个接口向Poller注册或者移除自己的fd实现了Poller和Channel两端的解耦

    核心实现:handleEvent

    遍历所有的activeChannelList_,并依次执行这些Channel中注册的回调函数这个环节非常非常关键,是一切事件派发机制中回调执行的地方

    主要成员

    wakeupchannel_

    通过eventfd唤醒的channel

    EventLoop可以通过这个Channel唤醒自己执行定时任务

    activeChannelList_

    通过一次poll获得的所有发生事件的Channel指针列表

    pendingFunctors_

    所有非IO线程调用的用户回调都会存放在这个队列中,通过mutex互斥量保护

    poller_

    一个多路复用实例

    源码剖析

    EventLoop.h

    #ifndef MUDUO_NET_EVENTLOOP_H
    #define MUDUO_NET_EVENTLOOP_H
    
    #include 
    #include 
    #include 
    
    #include 
    
    #include "muduo/base/Mutex.h"
    #include "muduo/base/CurrentThread.h"
    #include "muduo/base/Timestamp.h"
    #include "muduo/net/Callbacks.h"
    #include "muduo/net/TimerId.h"
    
    namespace muduo
    {
    namespace net
    {
    
    class Channel;
    class Poller;
    class TimerQueue;
    
    ///
    /// Reactor, at most one per thread.
    ///
    /// This is an interface class, so don't expose too much details.
    class EventLoop : noncopyable
    {
     public:
      typedef std::function<void()> Functor;
    
      EventLoop();
      ~EventLoop();  // force out-line dtor, for std::unique_ptr members.
    
      //开启事件循环
      void loop();
    
      //退出事件循环
      void quit();
    
      //轮询返回的时间,通常意味着数据到达。
      Timestamp pollReturnTime() const { return pollReturnTime_; }
    
      int64_t iteration() const { return iteration_; }
    
      /// Runs callback immediately in the loop thread.
      /// It wakes up the loop, and run the cb.
      /// If in the same loop thread, cb is run within the function.
      /// Safe to call from other threads.
      ///在当前loop中执行cb
      void runInLoop(Functor cb);
      /// Queues callback in the loop thread.
      /// Runs after finish pooling.
      /// Safe to call from other threads.
      ///将cb放入队列中,唤醒loop所在的线程执行
      void queueInLoop(Functor cb);
    
      size_t queueSize() const;
    
      // timers
    
      ///
      /// Runs callback at 'time'.
      /// Safe to call from other threads.
      ///
      TimerId runAt(Timestamp time, TimerCallback cb);
      ///
      /// Runs callback after @c delay seconds.
      /// Safe to call from other threads.
      ///
      TimerId runAfter(double delay, TimerCallback cb);
      ///
      /// Runs callback every @c interval seconds.
      /// Safe to call from other threads.
      ///
      TimerId runEvery(double interval, TimerCallback cb);
      ///
      /// Cancels the timer.
      /// Safe to call from other threads.
      ///
      void cancel(TimerId timerId);
    
      // internal usage
      //唤醒loop所在的线程
      void wakeup();
        
      //调用poller的方法
      void updateChannel(Channel* channel);
      void removeChannel(Channel* channel);
      bool hasChannel(Channel* channel);
    
      // pid_t threadId() const { return threadId_; }
      void assertInLoopThread()
      {
        if (!isInLoopThread())
        {
          abortNotInLoopThread();
        }
      }
        //判断eventloop对象是否在自己的线程
      bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
      // bool callingPendingFunctors() const { return callingPendingFunctors_; }
      bool eventHandling() const { return eventHandling_; }
    
      void setContext(const boost::any& context)
      { context_ = context; }
    
      const boost::any& getContext() const
      { return context_; }
    
      boost::any* getMutableContext()
      { return &context_; }
    
      static EventLoop* getEventLoopOfCurrentThread();
    
     private:
      void abortNotInLoopThread();
      void handleRead();  // waked up
      void doPendingFunctors();//在loop一次后执行pendingFunctors_中的所有方法(会清空队列)
    
      void printActiveChannels() const; // DEBUG
    
      typedef std::vector<Channel*> ChannelList;
    
      bool looping_; /* atomic */
      std::atomic<bool> quit_;//标识loop的退出
      bool eventHandling_; /* atomic */
      //标识当前loop是否需要有执行的回调操作
      bool callingPendingFunctors_; /* atomic */
      int64_t iteration_;
      const pid_t threadId_;//记录thread所在的线程pid
      Timestamp pollReturnTime_;
      std::unique_ptr<Poller> poller_;
      std::unique_ptr<TimerQueue> timerQueue_;
        
      //主要作用,当mainLoop获取到一个accept新用户的channel,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理,使用eventfd
      int wakeupFd_;
      // unlike in TimerQueue, which is an internal class,
      // we don't expose Channel to client.
      std::unique_ptr<Channel> wakeupChannel_;
      boost::any context_;
    
      // scratch variables
      ChannelList activeChannels_;
      Channel* currentActiveChannel_;
    
      mutable MutexLock mutex_;//保证pendingFunctors_的线程安全操作
      std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);//存储loop需要执行的所有操作
    };
    
    }  // namespace net
    }  // namespace muduo
    
    #endif  // MUDUO_NET_EVENTLOOP_H
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156

    eventloop.cc

    // Copyright 2010, Shuo Chen.  All rights reserved.
    // http://code.google.com/p/muduo/
    //
    // Use of this source code is governed by a BSD-style license
    // that can be found in the License file.
     
    // Author: Shuo Chen (chenshuo at chenshuo dot com)
     
    #include "muduo/net/EventLoop.h"
     
    #include "muduo/base/Logging.h"
    #include "muduo/base/Mutex.h"
    #include "muduo/net/Channel.h"
    #include "muduo/net/Poller.h"
    #include "muduo/net/SocketsOps.h"
    #include "muduo/net/TimerQueue.h"
     
    #include 
     
    #include 
    #include 
    #include 
     
    using namespace muduo;
    using namespace muduo::net;
     
    namespace
    {
    //保证一个线程只有一个loop
    __thread EventLoop* t_loopInThisThread = 0;
    //poll超时时间
    const int kPollTimeMs = 10000;
     
    int createEventfd()
    {
      int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
      if (evtfd < 0)
      {
        LOG_SYSERR << "Failed in eventfd";
        abort();
      }
      return evtfd;
    }
     
    #pragma GCC diagnostic ignored "-Wold-style-cast"
    class IgnoreSigPipe
    {
     public:
      IgnoreSigPipe()
      {
        ::signal(SIGPIPE, SIG_IGN);
        // LOG_TRACE << "Ignore SIGPIPE";
      }
    };
    #pragma GCC diagnostic error "-Wold-style-cast"
     
    IgnoreSigPipe initObj;
    }  // namespace
     
    EventLoop* EventLoop::getEventLoopOfCurrentThread()
    {
      return t_loopInThisThread;
    }
     
    //创建了EventLoop对象的线程称为IO线程
    EventLoop::EventLoop()
      : looping_(false),                                     //判断是否在loop
        quit_(false),                                        //判断是否退出的标志
        eventHandling_(false),                               //处理handevent的标志
        callingPendingFunctors_(false),                      //判断当前是不是在执行方法队列
        iteration_(0),
        threadId_(CurrentThread::tid()),                     //当前线程ID
        poller_(Poller::newDefaultPoller(this)),             //创建一个 poll 或 epoll 对象
        timerQueue_(new TimerQueue(this)),                   //创建一个计时器
        wakeupFd_(createEventfd()),                          //发送唤醒loop消息的描述符,随便写点消息即可唤醒
        wakeupChannel_(new Channel(this, wakeupFd_)),        //wakeupChannel_用来自己给自己通知的一个通道,该通道会纳入到poller来管理
        currentActiveChannel_(NULL)                          //当前活跃的channel链表指针
    {
      LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
      if (t_loopInThisThread)                                //判断是否是本线程的loop,是一个loop类型的指针
      {
        LOG_FATAL << "Another EventLoop " << t_loopInThisThread
                  << " exists in this thread " << threadId_;        //用LOG_FATAL终止abort它
      }
      else
      {
        t_loopInThisThread = this; //this赋给线程局部数据指针
      }
     
      //设定wakeupChannel的回调函数,即EventLoop自己的的handleRead函数
      wakeupChannel_->setReadCallback(
          std::bind(&EventLoop::handleRead, this));          //channel->handleEventWithGuard会调用到handleRead
      // we are always reading the wakeupfd
      wakeupChannel_->enableReading();    //注册wakeupFd_到poller
    }
     
    EventLoop::~EventLoop()
    {
      LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
                << " destructs in thread " << CurrentThread::tid();
      wakeupChannel_->disableAll(); //从监听队列fd里移除
      wakeupChannel_->remove();  //移除epoll里面的channel
      ::close(wakeupFd_);
      t_loopInThisThread = NULL;
    }
     
    void EventLoop::loop()
    {
      assert(!looping_);
      assertInLoopThread(); //事件循环必须在IO线程中,即创建该evenloop的线程
      looping_ = true; //是否正在循环
      quit_ = false;  // FIXME: what if someone calls quit() before loop() ?
      LOG_TRACE << "EventLoop " << this << " start looping";
     
      while (!quit_)
      {
        activeChannels_.clear();                            //activeChannels_是一个vector
        //等待io复用函数返回
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); //调用poll返回活动的事件,有可能是唤醒返回的
        ++iteration_;
          
        //根据设置的日志等级打印跟踪信息
        if (Logger::logLevel() <= Logger::TRACE)
        {
          printActiveChannels();
        }
        // TODO sort channel by priority  按优先级排序
        //处理IO事件
        eventHandling_ = true;
        for (Channel* channel : activeChannels_)            //遍历通道来进行处理
        {
          currentActiveChannel_ = channel;
          currentActiveChannel_->handleEvent(pollReturnTime_);  //pollReturnTime_是poll返回的时刻
        }
        currentActiveChannel_ = NULL;                       //处理完了赋空
        eventHandling_ = false;
     
        //执行方法队列中的方法[方法队列functors,我们可以跨线程的往里面添加新的方法,这些方法会在处理完io事件后执行]
        doPendingFunctors();                                //这个设计也能够进行计算任务
      }
     
      LOG_TRACE << "EventLoop " << this << " stop looping";
      looping_ = false;
    }
     
    void EventLoop::quit()
    {
      quit_ = true;  //设置退出标志
      // There is a chance that loop() just executes while(!quit_) and exits,
      // then EventLoop destructs, then we are accessing an invalid object.
      // Can be fixed using mutex_ in both places.
      if (!isInLoopThread())
      {
        wakeup(); //唤醒
      }
    }
     
    //在I/O线程中调用某个函数
    //实际上就是如果是I/O线程主动调用该函数想要执行,那就同步执行该函数。如果是其他线程施加给I/O线程的任务,那么其他线程就需要把回调函数加入I/O线程的队列,等待异步执行
    void EventLoop::runInLoop(Functor cb) 
    {
      if (isInLoopThread())  //判断是否是本线程的loop
      {
        cb();
      }
      else
      {
        queueInLoop(std::move(cb)); 
      }
    }
     
    void EventLoop::queueInLoop(Functor cb)//把方法添加到队列中,该方法会出现在多个线程中,操作要加锁
    {
      {
      MutexLockGuard lock(mutex_);
      pendingFunctors_.push_back(std::move(cb));//std::function支持移动初始化,所以这里用move提升性能。(减少一次拷贝)
      }
     
      if (!isInLoopThread() || callingPendingFunctors_)//如果调用的queneInLoop的线程不是IO线程,那么唤醒
      {//如果在IO线程调用queueInLoop(),而此时正在调用pending functor,由于doPendingFunctors()调用的Functor可能再次调用queueInLoop(cb),这是queueInLoop()就必须wakeup(),否则新增的cb可能就不能及时调用了
        wakeup();
      }
    }
     
    size_t EventLoop::queueSize() const
    {
      MutexLockGuard lock(mutex_);
      return pendingFunctors_.size();
    }
     
    TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)//在指定的时间调用callback
    {
      return timerQueue_->addTimer(std::move(cb), time, 0.0);
    }
     
    TimerId EventLoop::runAfter(double delay, TimerCallback cb)//等一段时间调用callback
    {
      Timestamp time(addTime(Timestamp::now(), delay));//微妙
      return runAt(time, std::move(cb));
    }
     
    TimerId EventLoop::runEvery(double interval, TimerCallback cb)//以固定的间隔反复的调用callback
    {
      Timestamp time(addTime(Timestamp::now(), interval));
      return timerQueue_->addTimer(std::move(cb), time, interval);
    }
     
    void EventLoop::cancel(TimerId timerId) //取消timer
    {
      return timerQueue_->cancel(timerId);
    }
     
    void EventLoop::updateChannel(Channel* channel)   //更新通道,用epoll_ctl更新fd
    {
      assert(channel->ownerLoop() == this);  //判断channel的loop是不是当前loop
      assertInLoopThread();  
      poller_->updateChannel(channel);
    }
     
    void EventLoop::removeChannel(Channel* channel) //移除通道,将channel从ChannelMap移除并EPOLL_CTL_DEL掉fd
    {
      assert(channel->ownerLoop() == this);  //表示当前的loop
      assertInLoopThread();
      if (eventHandling_)  //正在处理channel
      {
        assert(currentActiveChannel_ == channel ||   //当前的channel或不是活跃的channel
            std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
      }
      poller_->removeChannel(channel);
    }
     
    bool EventLoop::hasChannel(Channel* channel)//查找事件分发器是否在channels_中
    {
      assert(channel->ownerLoop() == this);
      assertInLoopThread();
      return poller_->hasChannel(channel);
    }
     
    void EventLoop::abortNotInLoopThread()
    {
      LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
                << " was created in threadId_ = " << threadId_
                << ", current thread id = " <<  CurrentThread::tid();
    }
     
    void EventLoop::wakeup()
    {
      uint64_t one = 1;
      ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);   //随便写点数据进去就唤醒了
      if (n != sizeof one)
      {
        LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
      }
    }
     
    void EventLoop::handleRead()      //读取唤醒的数据
    {
      uint64_t one = 1;
      ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
      if (n != sizeof one)
      {
        LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
      }
    }
     
    // 1. 不是简单的在临界区内依次调用functor,而是把回调列表swap到functors中,这一方面减小了
    //临界区的长度,意味着不会阻塞其他线程的queueInLoop(),另一方面也避免了死锁(因为Functor可能再次调用quueInLoop)
    // 2. 由于doPendingFunctors()调用的Functor可能再次调用queueInLoop(cb),这是queueInLoop()就必须wakeup(),否则新增的cb可能就不能及时调用了
    // 3. muduo没有反复执行doPendingFunctors()直到pendingFunctors为空,这是有意的,否则I/O线程可能陷入死循环,无法处理I/O事件
    void EventLoop::doPendingFunctors()
    {
      std::vector<Functor> functors;
      callingPendingFunctors_ = true;
     
      //注意这里的临界区,这里使用了一个栈上变量functors和pendingFunctors交换
      {
      MutexLockGuard lock(mutex_);
      functors.swap(pendingFunctors_);  //pendingFunctors_是存放Functor的vector
      }
      //此处其它线程就可以往pendingFunctors添加任务
     
      for (const Functor& functor : functors)
      {
        functor();
      }
      callingPendingFunctors_ = false;
    }
     
    void EventLoop::printActiveChannels() const
    {
      for (const Channel* channel : activeChannels_)
      {
        LOG_TRACE << "{" << channel->reventsToString() << "} ";
      }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
  • 相关阅读:
    网络路由详解
    C语言之变量和数据类型
    MySQL索引原理之索引与约束
    Python网络爬虫(一):HTML/CSS/JavaScript介绍
    Java中网络的基本介绍。网络通信,网络,ip地址,域名,端口,网络通信协议,TCP/IP传输过程,网络通信协议模型,TCP协议,UDP协议
    Linux高性能服务器编程 学习笔记 第七章 Linux服务器程序规范
    Go 原生的 git 实现库:go-git
    java毕业设计开题报告javaweb户籍管理系统|户口
    1025 PAT Ranking
    PHP安全问题:远程溢出、DoS、safe_mode绕过漏洞
  • 原文地址:https://blog.csdn.net/weixin_50448879/article/details/133997417