
这个模块和线程是一一对应的!
监听了一个链接,如果这个连接一旦就绪,就要进行事件处理!
但是如果这个描述符,在多个线程中都触发了了事件,进行处理,就会存在线程安全问题!
因此我们需要将一个链接的事件监控, 以及连接事件处理,以及其他操作都放在同一个线程中!
如何保证一个连接的所有操作都在eventloop对应的线程中!
给eventLOOP模块中,都添加一个任务队列!
对连接的所有操作,都进行一次封装,将对连接的操作当作任务都添加到任务队列中!
对于服务器的所有事件都是由EventLoop模块来完成
每一个Connection连接,都会绑定一个EventLoop模块和线程,因为外界对于连接的所有操作,都要放到同一个线程中进行!
class Eventloop {
private:
std::thread::id _thread_id; // 线程ID
int _event_fd // eventfd 唤醒IO事件监控有可能的阻塞!!!
Poller _poller; // 进行所有描述符的事件监控
using Functor = std::function<void()>;
std::vector<Functor> _task; // 任务池
std::mutex _mutex; // 实现任务池操作的线程安全!!!
public:
void runAllTask();
public:
Eventloop();
void runInLoop(const Functor&cb); // 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。
void queueInLoop(const Functor&cb); // 将操作压入任务池!
bool isInLoop(); //永远判断当前线程是否是EventLoop所对应的线程
void updateEvent(Channel* channel); // 添加/修改描述符的事件监控
void removeEvent(Channel* channel); // 移除描述符的监控
void Start(); // 任务监控完毕进行处理任务! 三步走:事件监控-》就绪事件处理-》执行任务
};
class EventLoop {
private:
using Functor = std::function<void()>;
std::thread::id _thread_id; // 线程ID
int _event_fd; // eventfd 唤醒IO事件监控有可能的阻塞!!!
std::unique_ptr<Channel> _event_channel;
Poller _poller;//进行所有描述符的事件监控
std::vector<Functor> _tasks; // 任务池
std::mutex _mutex; // 实现任务池操作的线程安全!!!
TimerWheel _timer_wheel;//定时器模块
public:
// 执行任务池中的所有任务!!
void runAllTask() {
std::vector<Functor> functor; {
std::unique_lock<std::mutex> _lock(_mutex); // 出了作用域,锁就会被解开!!
_tasks.swap(functor);
}
for (auto &f : functor) {
f();
}
return ;
}
static int createEventFd() {
int efd = eventfd(0,EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0) {
ERR_LOG("CREATE ENVENTED FAILED !!!");
abort();
}
return efd;
}
void readEventfd() {
uint64_t res = 0;
int ret = read(_event_fd,&res,sizeof(res));
if (ret < 0) {
if (errno == EINTR || errno == EAGAIN) {
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return ;
}
void weakEventFd() {
uint64_t val = 1;
int ret = write(_event_fd,&val,sizeof(val));
if (ret < 0) {
if (errno == EINTR) {
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return ;
}
public:
EventLoop():_thread_id(std::this_thread::get_id()),
_event_fd(createEventFd()),
_event_channel(new Channel(this, _event_fd)),
_timer_wheel(this) {
//给eventfd添加可读事件回调函数,读取eventfd事件通知次数
_event_channel->setReadCallback(std::bind(&EventLoop::readEventfd, this));
//启动eventfd的读事件监控
_event_channel->enableRead();
}
void runInLoop(const Functor&cb) { // 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。
if (isInLoop()) {
return cb();
}
}
void queueInLoop(const Functor&cb) { // 将操作压入任务池!
std::unique_lock<std::mutex> _lock(_mutex);
//唤醒有可能因为没有事件就绪,而导致的epoll阻塞;
//其实就是给eventfd写入一个数据,eventfd就会触发可读事件
_tasks.push_back(cb);
weakEventFd();
}
bool isInLoop() { //永远判断当前线程是否是EventLoop所对应的线程
return (_thread_id == std::this_thread::get_id());
}
void updateEvent(Channel* channel) {// 添加/修改描述符的事件监控
return _poller.UpdateEvent(channel);
}
void removeEvent(Channel* channel) { // 移除描述符的监控
return _poller.removeEvent(channel);
}
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }
void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }
void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }
bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
void Start() { // 任务监控完毕进行处理任务!
// 三步走:事件监控-》就绪事件处理-》执行任务
std::vector<Channel*> actives;
_poller.Poll(&actives);
for (auto &channel : actives) {
channel -> handleEvent();
}
runAllTask();
}
};