• 1.8.C++项目:仿muduo库实现并发服务器之eventloop模块的设计


    项目完整在:

    一、eventloop模块:进行事件监控,以及事件处理的模块

    在这里插入图片描述

    • 进行事件监控管理的模块
    • 这个模块就是我们所说的One thread one loop 中的loop,也就是我们所说的Reactor
    • 这个模块必定是一个模块对于一个线程

    二、提供的功能

    这个模块和线程是一一对应的!
    监听了一个链接,如果这个连接一旦就绪,就要进行事件处理!
    但是如果这个描述符,在多个线程中都触发了了事件,进行处理,就会存在线程安全问题
    因此我们需要将一个链接的事件监控, 以及连接事件处理,以及其他操作都放在同一个线程中!
    如何保证一个连接的所有操作都在eventloop对应的线程中!
    给eventLOOP模块中,都添加一个任务队列!
    对连接的所有操作,都进行一次封装,将对连接的操作当作任务都添加到任务队列中!

    三、实现思想

    (一)功能

    1. 在线程中对描述符进行事件监控!
    2. 有描述符就绪则对描述符进行事件处理,(如何保证处理回调函数中的操作都在线程中)
    3. 所有的就绪事件处理完了,这时候再去将任务队列中的所有任务一一执行! 这样能够保证对于所有链接的所有操作,都是在一个线程中进行的,不涉及线程安全问题!
      但是对于任务队列中的操作有线程安全的问题,只需要给task的操作架一把锁即可!

    (二)意义

    对于服务器的所有事件都是由EventLoop模块来完成
    每一个Connection连接,都会绑定一个EventLoop模块和线程,因为外界对于连接的所有操作,都要放到同一个线程中进行!

    (三)功能设计

    1. 事件监控
      使用Poller模块
      有事件就绪则进行事件处理!
    2. 执行任务队列中的任务!
      注意点:
      因为有可能因为等待描述符IO事件就绪,执行流流程阻塞,这个时候任务对立中的任务得不到执行!
      因此得有一个事件通知的东西,能够唤醒事件监控的阻塞!
      当事件就绪,需要处理的时候,处理过程中,如果对连接要进行某些操作!
      这些操作必须要在Eventloop对应的线程中进行,保证对连接的各项操作都是线程安全的。
    3. 如果执行的操作就在本线程中,不需要将操作压入队列了,可以直接执行!
    4. 如果执行的操作不在线程中,才需要加入任务池,等到事件处理完了之后就行执行任务!

    四、框架

     class Eventloop {
    private:
            std::thread::id _thread_id; // 线程ID
            int _event_fd // eventfd 唤醒IO事件监控有可能的阻塞!!!
            Poller _poller; // 进行所有描述符的事件监控
            using Functor = std::function<void()>;
            std::vector<Functor> _task; // 任务池
            std::mutex _mutex; // 实现任务池操作的线程安全!!!
    public:
            void runAllTask();
    public:
            Eventloop();
            void runInLoop(const Functor&cb); // 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。
            void queueInLoop(const Functor&cb);  // 将操作压入任务池!
            bool isInLoop(); //永远判断当前线程是否是EventLoop所对应的线程
            void updateEvent(Channel* channel); // 添加/修改描述符的事件监控
            void removeEvent(Channel* channel);  // 移除描述符的监控
            void Start(); // 任务监控完毕进行处理任务! 三步走:事件监控-》就绪事件处理-》执行任务
    
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    五、代码

    
    class EventLoop {
    private:
            using Functor = std::function<void()>;
            std::thread::id _thread_id; // 线程ID
            int _event_fd; // eventfd 唤醒IO事件监控有可能的阻塞!!!
            std::unique_ptr<Channel> _event_channel; 
             Poller _poller;//进行所有描述符的事件监控
            
            std::vector<Functor> _tasks; // 任务池
            std::mutex _mutex; // 实现任务池操作的线程安全!!!
            TimerWheel _timer_wheel;//定时器模块
    public: 
            // 执行任务池中的所有任务!!
            void runAllTask() {
                    std::vector<Functor> functor; {
                            std::unique_lock<std::mutex> _lock(_mutex); // 出了作用域,锁就会被解开!!
                            _tasks.swap(functor);
                    }
                    for (auto &f : functor) {
                            f();
                    }
                    return ;
            }
            static int createEventFd() {
                    int efd = eventfd(0,EFD_CLOEXEC | EFD_NONBLOCK);
                    if (efd < 0) {
                            ERR_LOG("CREATE ENVENTED FAILED !!!");
                            abort();
                    }
                    return efd;
            }
            void readEventfd() {
                    uint64_t res = 0;
                    int ret = read(_event_fd,&res,sizeof(res));
                    if (ret < 0) {
                            if (errno == EINTR || errno == EAGAIN) {
                            return;
                    }
                            ERR_LOG("READ EVENTFD FAILED!");
                            abort();
                    }
                    return ;
            }
            void weakEventFd() {
                    uint64_t val = 1;
                    int ret = write(_event_fd,&val,sizeof(val));
                    if (ret < 0) {
                    if (errno == EINTR) {
                        return;
                    }
                    ERR_LOG("READ EVENTFD FAILED!");
                    abort();
                }
                return ;
    
            }
    public:
            EventLoop():_thread_id(std::this_thread::get_id()), 
                        _event_fd(createEventFd()), 
                        _event_channel(new Channel(this, _event_fd)),
                        _timer_wheel(this) {
                //给eventfd添加可读事件回调函数,读取eventfd事件通知次数
                _event_channel->setReadCallback(std::bind(&EventLoop::readEventfd, this));
                //启动eventfd的读事件监控
                _event_channel->enableRead();
            }
            void runInLoop(const Functor&cb) { // 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。
                    if (isInLoop()) {
                            return cb();
                    }
            }
            void queueInLoop(const Functor&cb) { // 将操作压入任务池!
                    std::unique_lock<std::mutex> _lock(_mutex);
                    //唤醒有可能因为没有事件就绪,而导致的epoll阻塞;
                    //其实就是给eventfd写入一个数据,eventfd就会触发可读事件
                    _tasks.push_back(cb);
                    weakEventFd();
            }
            bool isInLoop() { //永远判断当前线程是否是EventLoop所对应的线程
                    return (_thread_id == std::this_thread::get_id());
            }
            void updateEvent(Channel* channel) {// 添加/修改描述符的事件监控
                    return _poller.UpdateEvent(channel); 
            }
            void removeEvent(Channel* channel) {   // 移除描述符的监控
                    return _poller.removeEvent(channel);
            }
            void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }
            void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }
            void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }
            bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
            void Start() { // 任务监控完毕进行处理任务! 
                    // 三步走:事件监控-》就绪事件处理-》执行任务
                    std::vector<Channel*> actives;
                    _poller.Poll(&actives);
    
                    for (auto &channel : actives) {
                            channel -> handleEvent();
                    }
                    runAllTask();
            }
    
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
  • 相关阅读:
    node通过node-java库调用java
    Hash Table Mock
    .so文件【Linux下的程序函数库,即编译好的可以供其他程序使用的代码和数据】
    Socket套接字
    line-height用了这么久,你真的了解他么
    Java 复习 final和静态类不能被继承
    只有正规才有机会,CTF/AWD竞赛标准参考书来了
    Nignx服务器,项目部署和Yapi,Swagger工具
    3D图表有效提升数据大屏档次
    python实战故障诊断之CWRU数据集(五):线性判别模型及二次判别模型的应用
  • 原文地址:https://blog.csdn.net/weixin_54447296/article/details/133498144