• Muduo网络库之Channel、EPollPoller与EventLoop类【深度解析】



    前言

    重新梳理一遍muduo网络库的类与知识点。
    Channel、EPollPoller与EventLoop类是muduo库最重要的基础, 他们三类为一体,在底层负责事件循环。EventLoop包含Channel,Poller,EventLoop负责轮询访问Poller,得到激活Channel列表,使Channel自己根据自身情况调用相应回调。

    一、Channel类

    Channel类是对文件描述符(fd)以及其相关事件的封装。

    在TCP网络编程中,想要IO多路复用监听某个文件描述符,就要把这个fd和该fd感兴趣的事件通过epoll_ctl注册到IO多路复用模块上。当事件监听器监听到该fd发生了某个事件。事件监听器返回 [发生事件的fd集合]以及[每个fd都发生了什么事件]

    Channel类则封装了一个 [fd] 和这个 [fd感兴趣事件] 以及事件监听器监听到 [该fd实际发生的事件]。Channel类还提供了设置该fd的感兴趣事件,以及将该fd及其感兴趣事件注册到事件监听器或从事件监听器上移除,以及保存了该fd的每种事件对应的处理函数

    1、主要成员变量以及函数

    以下是对Channel类的各个成员变量和成员函数的解析:

    • 成员变量:

      • loop_:指向所属的事件循环EventLoop)对象的指针。

      • fd_:表示该Channel对象关联的文件描述符。

      • events_:表示该Channel对象感兴趣的事件类型(如EPOLLINEPOLLOUT等)。

      • revents_:表示由事件分发器(Poller)返回的具体发生的事件类型。

      • index_:用于记录该Channel对象在事件分发器中的位置或状态。

      • readCallback_writeCallback_closeCallback_errorCallback_:各种回调函数,用于处理不同事件的回调操作。

    • 成员函数:

      • 构造函数和析构函数:初始化和销毁Channel对象。

      • update:在Channel对象所属的事件循环中,通过调用事件分发器的相应方法,注册或更新文件描述符的事件类型。

      • remove:在Channel对象所属的事件循环中,将当前的Channel对象从事件分发器中移除。

      • handleEvent:处理文件描述符上发生的具体事件,根据事件类型调用相应的回调函数。

      • handleEventWithGuard:在保护锁的作用下执行处理事件的具体逻辑。

    2、实现原理

    void setReadCallback(ReadEventCallback cb) {read_callback_ = std::move(cb);}
    void setWriteCallback(Eventcallback cb) {write_callback_ = std::move(cb);}
    void setCloseCallback(EventCallback cb) {close_callback_ = std::move(cb);}
    void setErrorCallback(EventCallback cb) {error_callback_ = std::move(cb);} 
    
    • 1
    • 2
    • 3
    • 4

    一个文件描述符会发生可读、可写、关闭、错误事件。当发生这些事件后,就需要调用相应的处理函数来处理。外部通过调用上面这四个函数可以将事件处理函数放进Channel类中,当需要调用的时候就可以直接拿出来调用了。

        // 设置fd相应的事件状态
        void enableReading() { events_ |= kReadEvent; update(); }
        void disableReading() { events_ &= ~kReadEvent; update(); }
        void enableWriting() { events_ |= kWriteEvent; update(); }
        void disableWriting() { events_ &= ~kWriteEvent; update(); }
        void disableAll() { events_ = kNoneEvent; update(); }
        void Channel::update()
    {
        // 通过channel所属的EventLoop,调用poller的相应方法,注册fd的events事件
        loop_->updateChannel(this);
    }
    
    // 在channel所属的EventLoop中, 把当前的channel删除掉
    void Channel::remove()
    {
        loop_->removeChannel(this);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    相应的事件状态,启用或禁用读写。 当改变channel所表示fd的events事件后,update负责在poller里面更改fd相应的事件epoll_ctl

    // fd得到poller通知以后,处理事件的
    void Channel::handleEvent(Timestamp receiveTime)
    {
        if (tied_)
        {
            std::shared_ptr<void> guard = tie_.lock();
            if (guard)
            {
                handleEventWithGuard(receiveTime);
            }
        }
        else
        {
            handleEventWithGuard(receiveTime);
        }
    }
    
    // 根据poller通知的channel发生的具体事件, 由channel负责调用具体的回调操作
    void Channel::handleEventWithGuard(Timestamp receiveTime)
    {
        LOG_INFO("channel handleEvent revents:%d\n", revents_);
    
        if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
        {
            if (closeCallback_)
            {
                closeCallback_();
            }
        }
    
        if (revents_ & EPOLLERR)
        {
            if (errorCallback_)
            {
                errorCallback_();
            }
        }
    
        if (revents_ & (EPOLLIN | EPOLLPRI))
        {
            if (readCallback_)
            {
                readCallback_(receiveTime);
            }
        }
    
        if (revents_ & EPOLLOUT)
        {
            if (writeCallback_)
            {
                writeCallback_();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    当调用epoll_wait()后,可以得知事件监听器上哪些Channel(文件描述符)发生了哪些事件,事件发生后自然就要调用这些Channel对应的处理函数。 Channel::HandleEvent,让每个发生了事件的Channel调用自己保管的事件处理函数。每个Channel会根据自己文件描述符实际发生的事件(通过Channel中的revents_变量得知)和感兴趣的事件(通过Channel中的events_变量得知)来选择调用read_callback_和/或write_callback_和/或close_callback_和/或error_callback_。

    二、EPollPoller类

    EPollPoller类是对epoll的一个封装,EpollPoller是封装了用epoll方法实现的与事件监听有关的各种方法,

    1、实现原理

    EPollPoller::EPollPoller(EventLoop *loop)
        : Poller(loop)
        , epollfd_(::epoll_create1(EPOLL_CLOEXEC))
        , events_(kInitEventListSize)  // vector
    {
        if (epollfd_ < 0)
        {
            LOG_FATAL("epoll_create error:%d \n", errno);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    构造函数 EPollPoller::EPollPoller(EventLoop *loop),其中调用了 epoll_create1 创建了一个 epoll 描述符,并将初始事件列表 events_ 初始化为一定大小的 vector

    
    Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
    {
        // 实际上应该用LOG_DEBUG输出日志更为合理
        // LOG_INFO("func=%s => fd total count:%lu \n", __FUNCTION__, channels_.size());
    
        int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
        int saveErrno = errno;
        Timestamp now(Timestamp::now());
    
        if (numEvents > 0)
        {
            LOG_INFO("%d events happened \n", numEvents);
            fillActiveChannels(numEvents, activeChannels);
            if (numEvents == events_.size())
            {
                events_.resize(events_.size() * 2);
            }
        }
        else if (numEvents == 0)
        {
            LOG_DEBUG("%s timeout! \n", __FUNCTION__);
        }
        else
        {
            if (saveErrno != EINTR)
            {
                errno = saveErrno;
                LOG_ERROR("EPollPoller::poll() err!");
            }
        }
        return now;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    这个函数可以说是Poller的核心了,当外部调用poll方法的时候,该方法底层其实是通过epoll_wait获取这个事件监听器上发生事件的fd及其对应发生的事件,我们知道每个fd都是由一个Channel封装的,通过哈希表channels_可以根据fd找到封装这个fd的Channel。将事件监听器监听到该fd发生的事件写进这个Channel中的revents成员变量中。然后把这个Channel装进activeChannels中(它是一个vector>)。这样,当外界调用完poll之后就能拿到事件监听器的监听结果也就是被激活的Channe(activeChannels_),这个activeChannels就是事件监听器监听到的发生事件的fd,以及每个fd都发生了什么事件。*

    
    // 填写活跃的连接
    void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const
    {
        for (int i=0; i < numEvents; ++i)
        {
            Channel *channel = static_cast<Channel*>(events_[i].data.ptr);
            channel->set_revents(events_[i].events);
            activeChannels->push_back(channel); // EventLoop就拿到了它的poller给它返回的所有发生事件的channel列表了
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    通过遍历 events_ 列表,其中存储了通过 epoll_wait 系统调用返回的发生事件的信息。对于每一个发生事件的索引,它首先取出对应的 Channel 对象(events_[i].data.ptr),并将该事件的事件类型(events_[i].events)设置给该通道的 revents 成员。这个函数的作用是在事件触发后,将触发事件的通道添加到活跃通道列表中,以便上层的 EventLoop 使用这个列表进行进一步的处理。

    
    // 更新channel通道 epoll_ctl add/mod/del
    void EPollPoller::update(int operation, Channel *channel)
    {
        epoll_event event;
        bzero(&event, sizeof event);
        
        int fd = channel->fd();
    
        event.events = channel->events();
        event.data.fd = fd; 
        event.data.ptr = channel;
        
        if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
        {
            if (operation == EPOLL_CTL_DEL)
            {
                LOG_ERROR("epoll_ctl del error:%d\n", errno);
            }
            else
            {
                LOG_FATAL("epoll_ctl add/mod error:%d\n", errno);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    封装epoll_ctl,通过cpoll_ctl对指定的socket进行操作,如add/mod/del (muduo中由Epoller类对Channel类进行操作)

    二、EventLoop类

    Poller是封装了和事件监听有关的方法和成员,调用一次Poller::poll方法它就能给你返回事件监听器的监听结果(activeChannels)(发生事件的fd 及其 发生的事件)。作为一个网络服务器,需要有持续监听、持续获取监听结果、持续处理监听结果对应的事件的能力,也就是我们需要循环的去 【调用Poller:poll方法获取实际发生事件的Channel集合,然后调用这些Channel里面保管的不同类型事件的处理函数(调用Channel::HandlerEvent方法)。
    在这里插入图片描述

    1、功能实现

    
    // 开启事件循环
    void EventLoop::loop()
    {
        looping_ = true;
        quit_ = false;
    
        LOG_INFO("EventLoop %p start looping \n", this);
    
        while(!quit_)
        {
            activeChannels_.clear();
            // 监听两类fd   一种是client的fd,一种wakeupfd
            pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
            for (Channel *channel : activeChannels_)
            {
                // Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
                channel->handleEvent(pollReturnTime_);
                // std::thread t([&] {
                //     channel->handleEvent(pollReturnTime_);
                // });
                // 其他操作...
                // t.join(); 
                // t.detach(); 
                // Task task(OnMessage, num);
                // // cout << "addTask " << endl;
                // threadPool.addTask(task);
            }
            // 执行当前EventLoop事件循环需要处理的回调操作
            /**
             * IO线程 mainLoop accept fd《=channel subloop
             * mainLoop 事先注册一个回调cb(需要subloop来执行)    wakeup subloop后,执行下面的方法,执行之前mainloop注册的cb操作
             */ 
            doPendingFunctors();
        }
    
        LOG_INFO("EventLoop %p stop looping. \n", this);
        looping_ = false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    这个线程其实就在一直执行这个函数里面的while循环,这个while循环的大致逻辑比较简单。就是调用Poller::poll方法获取事件监听器上的监听结果。接下来在loop里面就会调用监听结果中每一个Channel的处理函数HandlerEvent( )。每一个Channel的处理函数会根据Channel类中封装的实际发生的事件,执行Channel类中封装的各事件处理函数。(比如一个Channel发生了可读事件,可写事件,则这个Channel的HandlerEvent( )就会调用提前注册在这个Channel的可读事件和可写事件处理函数,又比如另一个Channel只发生了可读事件,那么HandlerEvent( )就只会调用提前注册在这个Channel中的可读事件处理函数)

    SubReactorde的唤醒操作

    我们先来看看eventloop的loop是如何执行的。循环体中进行这几件事情:
    1、使用poller_对象进行轮询,等待事件发生;
    2、将触发事件的Channel对象存储在activeChannels_容器中。
    3、遍历activeChannels_容器中的每个Channel对象;处理活跃的Channel事件:
    4、调用doPendingFunctors()方法,执行当前EventLoop事件循环需要处理的回调操作。
    这些回调操作通常是由其他线程通过runInLoop()函数添加到当前EventLoop的任务队列中的。
    doPendingFunctors中包含了添加channel或者其他的一些操作,但是
    如果没有唤醒操作,下面poll这一步就会永远阻塞,没有活跃的channel就执行不了下面的doPendingFunctors回调函数列表。

    所以eventloop要进行添加channel或者其他的一些事件循环需要处理的回调操作的时候就需要唤醒,然后才能执行doPendingFunctors回调

    这一部分之前已经详细介绍过了
    深度解析Muduo库中的SubReatcor唤醒操作【万字解读】

  • 相关阅读:
    虹科分享 | 网络仿真器 | 预测云中对象存储系统的实际性能
    计算机毕业设计springboot+vue基本微信小程序的乐旋乒乓球课程管理系统 uniapp 小程序
    AI:业余时间打比赛—挣它个小小目标—【阿里安全×ICDM 2022】大规模电商图上的风险商品检测比赛
    【NOI模拟赛】刮痧(动态规划)
    LeetCode二叉树系列——110.平衡二叉树
    详解设计模式:桥接模式
    Sectigo https证书
    动态改标题
    Scratch软件编程等级考试三级——20201219
    第四十五篇,STL标准模板库常见算法
  • 原文地址:https://blog.csdn.net/weixin_44545838/article/details/133416396