EventLoop相当于Reactor模型中的Reactor组件,是Event事件和Demultiplex多路分发器的桥梁。EventLoop的执行时序图如下:
loop()的处理流程:
对mainLoop来说,Poller上只有一个fd就是listenfd,pool()即开始监听listenfd上到来的新连接,handleEvent()做的就是accept()得到connfd,并打包成Channel,然后通过eventfd唤醒subLoop把这个Channel添加到它的Poller中。如果只有一个loop,mainLoop就要把这个loop加入到自己的Poller上。
对subLoop来说,刚开始Poller上只监听了一个fd就是eventfd,随着mainLoop触发eventfd上的事件,subLoop上监听的fd除了eventfd外就是和accept(listenfd)得到的connfd了。接下来除了除了eventfd触发的新连接事件,还有就是connfd上触发的各种事件了。
重写EventLoop.h:
#pragma once
#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"
#include
#include
#include
#include
#include
class Channel;
class Poller;
//时间循环类 主要包含了两个大模块 Channel Poller(epoll的抽象)
class EventLoop : noncopyable
{
public:
using Functor = std::function<void()>;
EventLoop();
~EventLoop();
//开启事件循环
void loop();
//结束事件循环
void quit();
Timestamp pollReturnTime() const{ return pollReturnTime_; }
//在当前Loop中执行
void runInLoop(Functor cb);
//把cb放入队列中,唤醒loop所在的线程,执行cb
void queueInLoop(Functor cb);
//用来唤醒loop所在线程的,往eventfd写一个字节
void wakeup();
//EventLoop的方法 =》 Poller的方法
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
void hasChannel(Channel* channel);
//判断EventLoop对象是否在自己的线程里面
bool isInLoopThread() const {return threadId_ == CurrentThread::tid(); }
private:
void handleRead(); // wake up, eventfd被触发
void doPendingFunctors(); //执行回调
using ChannelList = std::vector<Channel*>;
std::atomic_bool looping_;
std::atomic_bool quit_;
const pid_t threadId_; //基类当前loop所在线程的id
Timestamp pollReturnTime_; //poller返回发生事件的channels的时间点
std::unique_ptr<Poller> poller_;
int wakeupFd_; //主要作用:当mainLoop获取新用户的channel,通过轮询算法选择一个subLoop,通过该成员唤醒subLoop处理channel
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_;
std::atomic_bool callingPendingFunctors_; //标识当前线程释放有需要执行的回调操作
std::vector<Functor> pendingFunctors_; //存储loop需要执行的所有回调操作,线程不安全,因为poll()返回后会从这取回调,别的线程会往这放回调
std::mutex mutex_; //用来保护上面vector容器的线程安全操作
};
mainLoop开启循环后,一次循环的执行流程如下:
EventLoop中的接口大部分都是提供给subLoop使用的,subLoop一次循环的执行流程:
重写EventLoop.cc:
#include "EventLoop.h"
#include "Logger.h"
#include "Poller.h"
#include "Channel.h"
#include
#include
#include
#include
#include
//防止一个线程创建多个EventLoop
__thread EventLoop* t_loopInThisThread = 0;
//定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000;
//用来创建wakeupfd,用来唤醒subReactor处理新来的channel
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if(evtfd < 0)
{
LOG_FATAL("eventfd error:%d \n",errno);
}
return evtfd;
}
EventLoop::EventLoop()
:looping_(false)
,quit_(false)
,callingPendingFunctors_(false)
,threadId_(CurrentThread::tid())
,poller_(Poller::newDefaultPoller(this))
,wakeupFd_(createEventfd())
,wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
if(t_loopInThisThread)
{
LOG_FATAL("Another EventLoop %p exists in this thread %d \n", t_loopInThisThread, threadId_);
}
else
{
t_loopInThisThread = this;
}
//设置wakeupfd的事件类型以及发生事件后的回调操作
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
//每个EventLoop都将监听wakeupChannel的EPOLLIN读事件
wakeupChannel_->enableReading();
}
//开启事件循环
void EventLoop::loop()
{
looping_ = true;
quit_ = false;
//LOG_INFO("EventLoop %p start looping. \n", this);
while(!quit_)
{
activeChannels_.clear();
//监听两类fd 一类是clientfd,另一种是wakeupfd
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for(Channel* channel : activeChannels_)
{
//Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
channel->handleEvent(pollReturnTime_);
}
//执行当前EventLoop事件循环需要处理的回调操作
/**
* IO线程 mainloop accept fd 打包 =》 channel 分发给 subloop
* mainLoop事先注册一个回调cb(需要subLoop来执行) wakeup subLoop后,执行下面的方法,执行之前mainLoop注册的cb操作
*/
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping. \n", this);
}
//结束事件循环 1、loop在自己的线程调用quit 2、在非loop的线程中,调用loop的quit
void EventLoop::quit()
{
quit_ = true;
if(!isInLoopThread())
{
wakeup();
}
}
//在当前Loop中执行
void EventLoop::runInLoop(Functor cb)
{
if(isInLoopThread()) //在当前loop线程中执行cb
{
cb();
}
else //在非当前loop线程中执行cb,需要唤醒loop所在线程,执行cb
{
queueInLoop(cb);
}
}
//把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
//唤醒相应的,需要执行上面回调操作的loop线程
// ||callingPendingFunctors_表示当前loop正在执行回调,但是loop又有了新的回调,为了防止有回调未处理时阻塞等待
if(!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
//用来唤醒loop所在线程的 向wakeupfd_写一个数据
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::wakeup() write %lu bytes instead of 8 \n", n);
}
}
//EventLoop的方法 =》 Poller的方法
void EventLoop::updateChannel(Channel* channel)
{
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel* channel)
{
poller_->removeChannel(channel);
}
void EventLoop::hasChannel(Channel* channel)
{
poller_->hasChannel(channel);
}
void EventLoop::doPendingFunctors() //执行回调
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for(const Functor functor : functors)
{
functor(); //执行当前loop需要执行的回调操作
}
callingPendingFunctors_ = false;
}
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR ("EventLoop::handleRead() reads %ld bytes instead of 8", n);
}
}
EventLoop::~EventLoop()
{
wakeupChannel_->disableALL();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}