• zlMediaKit 2 event-poller模块--reactor+管道回调执行异步任务队列+红黑树执行定时任务


    EventPoller.h

    WorkThreadPool.h

    EventPoller

    UML

    在这里插入图片描述

    继承自TaskExecutor类,最重要的是实现async方法

    结构

        std::thread::id _loop_thread_id //执行事件循环的线程id
    
    //async相关
    	//内部事件管道
        PipeWrap _pipe;
        //从其他线程切换过来的任务
        std::mutex _mtx_task;
        List<Task::Ptr> _list_task;
    
    //epoll相关
        int _epoll_fd = -1;
        unordered_map<int, std::shared_ptr<PollEventCB> > _event_map;
    
    //定时器相关
        std::multimap<uint64_t, DelayTask::Ptr> _delay_task_map;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    async/async_l

    同步任务&&本对象的轮询线程调用 直接执行

    异步任务 加入poller管理的任务队列,写入管道通知主线程

    Task::Ptr EventPoller::async_l(TaskIn task, bool may_sync, bool first) {
        TimeTicker();
        if (may_sync && isCurrentThread()) {
            task();
            return nullptr;
        }
    
        auto ret = std::make_shared<Task>(std::move(task));
        {
            lock_guard<mutex> lck(_mtx_task);
            if (first) {
                _list_task.emplace_front(ret);
            } else {
                _list_task.emplace_back(ret);
            }
        }
        //写数据到管道,唤醒主线程
        _pipe.write("", 1);
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    构造函数

    自己维护一个红黑树,初始化时就加入对**管道事件(执行异步任务)**的监听

    _loop_thread_id,后面会看到构造函数只会被TaskExecutorGetterImp::addPoller调用,且此时绑定的线程id并非执行epoll_wait的线程,在构造之后,会在在执行runLoop时分配一个新线程再次绑定_loop_thread_id

    EventPoller::EventPoller(ThreadPool::Priority priority) {
        _priority = priority;
        SockUtil::setNoBlocked(_pipe.readFD());
        SockUtil::setNoBlocked(_pipe.writeFD());
    
    #if defined(HAS_EPOLL)
        _epoll_fd = epoll_create(EPOLL_SIZE);
        if (_epoll_fd == -1) {
            throw runtime_error(StrPrinter << "创建epoll文件描述符失败:" << get_uv_errmsg());
        }
        SockUtil::setCloExec(_epoll_fd);
    #endif //HAS_EPOLL
        _logger = Logger::Instance().shared_from_this();
        _loop_thread_id = this_thread::get_id();
    
        //添加内部管道事件
        if (addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) == -1) {
            throw std::runtime_error("epoll添加管道失败");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    onPipeEvent

    没错,管道事件回调即把异步任务队列中的任务执行个编

    inline void EventPoller::onPipeEvent() {
        char buf[1024];
        int err = 0;
        do {
            if (_pipe.read(buf, sizeof(buf)) > 0) {
                continue;
            }
            err = get_uv_error(true);
        } while (err != UV_EAGAIN);
    
        decltype(_list_task) _list_swap;
        {
            lock_guard<mutex> lck(_mtx_task);
            _list_swap.swap(_list_task);
        }
    
        _list_swap.for_each([&](const Task::Ptr &task) {
            try {
                (*task)();
            } catch (ExitException &) {
                _exit_flag = true;
            } catch (std::exception &ex) {
                ErrorL << "EventPoller执行异步任务捕获到异常:" << ex.what();
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    epoll_ctl

    socket中可以看到,对应的回调函数都有一个socket实例的弱指针,回调中可以操作socket的所有

    EPOLL_CTL_ADD
    typedef union epoll_data
    {
      void *ptr;
      int fd;
      uint32_t u32;
      uint64_t u64;
    } epoll_data_t;
    
    struct epoll_event
    {
      uint32_t events;	/* Epoll events */
      epoll_data_t data;	/* User data variable */
    } __EPOLL_PACKED;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    并没有使用epoll_event.data.ptr来存放回调的数据,而是_event_map保存对应的fd对应的回调

    int EventPoller::addEvent(int fd, int event, PollEventCB cb) {
        TimeTicker();
        if (!cb) {
            WarnL << "PollEventCB 为空!";
            return -1;
        }
    
        if (isCurrentThread()) {
    #if defined(HAS_EPOLL)
            struct epoll_event ev = {0};
            ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE;
            ev.data.fd = fd;
            int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
            if (ret == 0) {
                _event_map.emplace(fd, std::make_shared(std::move(cb)));
            }
            return ret;
    #endif //HAS_EPOLL
        }
    
        async([this, fd, event, cb]() {
            addEvent(fd, event, std::move(const_cast(cb)));
        });
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    看到这里回顾之前的socket部分,socket连接之后进入工作状态的回调onRead|onWrite|emitErr,其实也是同理,事件来了,来了做什么,socket对象自己是知道的。实际上socket对象真正对应的读写等事件的cb,是我们根据session具体类型指定的。

    这就是大多数网络框架留出来的让用户自定义的onXXX回调部分,即处理业务逻辑的部分

    bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) {
        weak_ptr<Socket> weak_self = shared_from_this();
        weak_ptr<SockFD> weak_sock = sock;
        _enable_recv = true;
        _read_buffer = _poller->getSharedBuffer();
        int result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self,weak_sock,is_udp](int event) {
            auto strong_self = weak_self.lock();
            auto strong_sock = weak_sock.lock();
            if (!strong_self || !strong_sock) {
                return;
            }
    
            if (event & EventPoller::Event_Read) {
                strong_self->onRead(strong_sock, is_udp);
            }
            if (event & EventPoller::Event_Write) {
                strong_self->onWriteAble(strong_sock);
            }
            if (event & EventPoller::Event_Error) {
                strong_self->emitErr(getSockErr(strong_sock));
            }
        });
    
        return -1 != result;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    EPOLL_CTL_DEL/EPOLL_CTL_MOD

    和epoll_ctl_add相同,只不过没有del的回调没有存,没必要。mod对应的也只是修改event类型,没有对_event_map进行操作

    runLoop

    跑不了reactor模型的范式,在就绪事件里调用对应的回调(回调放在了_event_map中,没有给内核event.data.ptr传指针,自己管理自己维护)

    特别的是,blocked设置调用是否阻塞,master线程调用不能阻塞,则会else中新开一个线程,开好之后,通过信号量通知进度

    void EventPoller::runLoop(bool blocked, bool ref_self) {
        if (blocked) {
            ThreadPool::setPriority(_priority);
            lock_guard<mutex> lck(_mtx_running);
            _loop_thread_id = this_thread::get_id();
            if (ref_self) {
                s_current_poller = shared_from_this();
            }
            _sem_run_started.post();
            _exit_flag = false;
            uint64_t minDelay;
    #if defined(HAS_EPOLL)
            struct epoll_event events[EPOLL_SIZE];
            while (!_exit_flag) {
                minDelay = getMinDelay();
                startSleep();//用于统计当前线程负载情况
                int ret = epoll_wait(_epoll_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1);
                sleepWakeUp();//用于统计当前线程负载情况
                if (ret <= 0) {
                    //超时或被打断
                    continue;
                }
                for (int i = 0; i < ret; ++i) {  //reactor范式
                    struct epoll_event &ev = events[i];
                    int fd = ev.data.fd;
                    auto it = _event_map.find(fd);
                    if (it == _event_map.end()) {
                        epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
                        continue;
                    }
                    auto cb = it->second;
                    try {
                        (*cb)(toPoller(ev.events));
                    } catch (std::exception &ex) {
                        ErrorL << "EventPoller执行事件回调捕获到异常:" << ex.what();
                    }
                }
            }
        } else {
            _loop_thread = new thread(&EventPoller::runLoop, this, true, ref_self);
            _sem_run_started.wait();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    static thread_local std::weak_ptr s_current_poller; ref_self是否引用自己,给线程中执行的开了一个口子,可以操作当前的线程的poller.

    thread_local 关键字修饰的变量具有线程(thread)周期,这些变量在线程开始的时候被生成,在线程结束的时候被销毁,并且每一个线程都拥有一个独立的变量实例。

    C++ 11 关键字:thread_local

    EventPoller::Ptr EventPoller::getCurrentPoller() {
        return s_current_poller.lock();
    }
    
    • 1
    • 2
    • 3

    getMinDelay 执行定时任务/设定epoll的超时等待时间

    flushDelayTask

    执行到期的的定时任务,如果需要循环执行,再次加入一个定时任务

    距离下一个定时任务还有多久设计为epoll_wait的超时等待时间

    uint64_t EventPoller::flushDelayTask(uint64_t now_time) {
        decltype(_delay_task_map) task_copy;
        task_copy.swap(_delay_task_map);
    
        for (auto it = task_copy.begin(); it != task_copy.end() && it->first <= now_time; it = task_copy.erase(it)) {
            //已到期的任务
            try {
                auto next_delay = (*(it->second))();
                if (next_delay) {
                    //可重复任务,更新时间截止线
                    _delay_task_map.emplace(next_delay + now_time, std::move(it->second));
                }
            } catch (std::exception &ex) {
                ErrorL << "EventPoller执行延时任务捕获到异常:" << ex.what();
            }
        }
    
        task_copy.insert(_delay_task_map.begin(), _delay_task_map.end());
        task_copy.swap(_delay_task_map);
    
        auto it = _delay_task_map.begin();
        if (it == _delay_task_map.end()) {
            //没有剩余的定时器了
            return 0;
        }
        //最近一个定时器的执行延时
        return it->first - now_time;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    添加定时任务,会异步执行,引起管道事件,即往定时列表中添加定时任务。poller一直在runloop怎么添加的,所以异步执行时,会判断是否是当前poller线程,不是就写管道通知来活了

    EventPoller::DelayTask::Ptr EventPoller::doDelayTask(uint64_t delay_ms, function<uint64_t()> task) {
        DelayTask::Ptr ret = std::make_shared<DelayTask>(std::move(task));
        auto time_line = getCurrentMillisecond() + delay_ms;
        async_first([time_line, ret, this]() {
            //异步执行的目的是刷新select或epoll的休眠时间
            _delay_task_map.emplace(time_line, ret);
        });
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Any/AnyStorage

    任意对象void*类型的问题在于,当delete时,能不能获得对应的类型,正确的执行析构。shared_ptr的第二个参数允许我们传入deletor,解决这个问题

    //可以保存任意的对象
    class Any{
    public:
        using Ptr = std::shared_ptr<Any>;
    
        Any() = default;
        ~Any() = default;
    
        template <typename C,typename ...ArgsType>
        void set(ArgsType &&...args){
            _data.reset(new C(std::forward<ArgsType>(args)...),[](void *ptr){
                delete (C*) ptr;
            });
        }
        template <typename C>
        C& get(){
            if(!_data){
                throw std::invalid_argument("Any is empty");
            }
            C *ptr = (C *)_data.get();
            return *ptr;
        }
    
        operator bool() {
            return _data.operator bool (); 
            //std::shared_ptr::operator bool  true if *this stores a pointer, false otherwise.
        }
        bool empty(){
            return !bool();
        }
    private:
        std::shared_ptr<void> _data;
    };
    
    //用于保存一些外加属性
    class AnyStorage : public std::unordered_map<std::string,Any>{
    public:
        AnyStorage() = default;
        ~AnyStorage() = default;
        using Ptr = std::shared_ptr<AnyStorage>;
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    std::shared_ptr的工作原理

    shared_ptr确实会记录传入的类型,因此析构时能够正确执行。

    但是如果传入的是父类指针,就体现了析构函数虚函数的重要性

    #include 
    #include 
    #include 
    
    class test {
    public:
      test() {
        std::cout << "Test created" << std::endl;
      }
      virtual ~test() {
        std::cout << "Test destroyed" << std::endl;
      }
    };
    
    class test1: public test
    {
    public:
    	test1()
    	{
    		std::cout << "Test1 created" << std::endl;
    	}
    	~test1()
    	{
    		std::cout<< "Test1 destroyed"<< std::endl;
    	}
    };
    
    int main() {
      std::cout << "At begin of main.\ncreating std::vector>" 
                << std::endl;
      std::vector<std::shared_ptr<void>> v;
      {
        std::cout << "Creating test" << std::endl;
        v.push_back( std::shared_ptr<test>( (test*)new test1() ) );//传入的是test类型的指针,如果没有virtual不会执行test1的析构
        v.push_back( std::shared_ptr<test>( new test1() ) ); //传入的是test1类型的指针,即使没有virtual也能正确调用
        std::cout << "Leaving scope" << std::endl;
      }
      std::cout << "Leaving main" << std::endl;
      return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    EventPollerPool

    UML

    在这里插入图片描述

    INSTANCE_IMP(EventPollerPool)
        
    #define INSTANCE_IMP(class_name, ...) \
    class_name &class_name::Instance() { \
        static std::shared_ptr s_instance(new class_name(__VA_ARGS__)); \
        static class_name &s_instance_ref = *s_instance; \
        return s_instance_ref; \
    }
    
    // 声明一个单例,智能指针管理生命周期
    EventPollerPool &EventPollerPool::Instance() 
    { 
        static std::shared_ptr s_instance(new EventPollerPool()); 
        static EventPollerPool &s_instance_ref = *s_instance; 
        return s_instance_ref; 
    }
    
    EventPollerPool::EventPollerPool() {
        auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true);
        InfoL << "创建EventPoller个数:" << size;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    在池单例的构造函数中,调用TaskExecutorGetter的addPoller方法

    EventPoller::Ptr EventPollerPool::getFirstPoller() {
        return dynamic_pointer_cast<EventPoller>(_threads.front());
    }
    
    //根据负载情况获取轻负载的实例 如果优先返回当前线程,那么会返回当前线程 返回当前线程的目的是为了提高线程安全性
    EventPoller::Ptr EventPollerPool::getPoller(bool prefer_current_thread) {
        auto poller = EventPoller::getCurrentPoller();
        if (prefer_current_thread && _prefer_current_thread && poller) {
            return poller;
        }
        return dynamic_pointer_cast<EventPoller>(getExecutor());
    }
    
    /**
     * 设置 getPoller() 是否优先返回当前线程
     * 在批量创建Socket对象时,如果优先返回当前线程,
     * 那么将导致负载不够均衡,所以可以暂时关闭然后再开启
     * @param flag 是否优先返回当前线程
     */
    void EventPollerPool::preferCurrentThread(bool flag) {
        _prefer_current_thread = flag;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    WorkThreadPool

    长得和EventPoller一样…唯一不同在于线程优先级不同 PRIORITY_LOWEST ,EventPollerPool对应的线程优先级PRIORITY_HIGHEST

    总结

    • 如何实现一个可以存储任意类型Any结构,对应的资源释放怎么管理,shared_ptr, 析构函数虚函数的重要性

    • thread_local 具有线程周期,这些变量在线程开始的时候被生成,在线程结束的时候被销毁,并且每一个线程都拥有一个独立的变量实例

    • 对于异步任务执行的实现,pipe事件监听+带锁的异步任务队列

    • 如果添加定时任务,如何在epoll_wait的循环中不错过定时任务,如何处理定时任务

    • 多个poller线程的负载均衡怎么做的 (没有什么高大上,记录wait和run的时间,选择空闲比最小的)

  • 相关阅读:
    【数字IC设计】DC自动添加门控时钟
    【FPGA】FPGA实现UART串口通信回环
    【校招VIP】前端算法考察之链表算法
    JavaScript的DOM操作(二)
    Strings数据类型
    代码配置仓库GitLab安装部署
    Vue中linq的应用及语句示例
    Python编程陷阱(七)
    发现学习的新契机——广东开放大学电大搜题服务
    基于FPGA的PSRAM接口设计与实现
  • 原文地址:https://blog.csdn.net/qq_41565920/article/details/127717673