• C++异步:asio的scheduler实现!

    前面也提到过,libunifex的scheduler实现离实用级其实还有一些差距。对比asio相关的实现,处理细节和完备度上都有较大落差,基于总览篇提到的整体实践思路,我们将更多使用asio的scheduler来作为execution的底层调度器。所以从本篇开始,我们将详细介绍asio相关的实现,本篇主要介绍asio传统的lambda post调度器。



    ​特定情况下,任务与任务之间存在依赖关系,这点上asio本身提供的strand 的支持,利用strand,我们可以在业务层尽可能少的使用锁等同步原语的情况下,对一个流水线式的组合任务进行编码,如图所示:

    ​这种多part的流水线式的任务,是很适合使用strand进行封装,得到预期的结果的。使用asio作为通用的并发框架,肯定是一种可行的方式,实际上网易不少项目就是这么做的,最早是他们的服务器使用asio作为底层的并发框架,后来国内知名度较高的messiah引擎,也借鉴并发扬了这种方式,使用asio作为底层基础的并发框架。本文我们也是更多的从源码着手,集中在asio scheduler这部分的实现代码上,来深入了解它的实现和特点。



    1. asio::static_thread_pool pool(1);
    2. auto ex1 = ctx.get_executor();
    3. // Get the number of available threads in the pool.
    4. std::size_t n = asio::query(ex1, asio::execution::occupancy);
    5. // Require an executor with blocking.never property.
    6. auto ex2 = asio::require(ex1, asio::execution::blocking.never);
    7. asio::execution::execute(ex2, []{ /*...*/ });
    8. // Prefer an executor that uses a custom allocator.
    9. auto ex3 = asio::prefer(ex2, asio::execution::allocator(my_allocator));
    10. asio::execution::execute(ex3, []{ /*...*/ });


    • query(): 查询某属性的值。

    • require(): 获取满足对应属性的对象。

    • prefer(): 获取包含定制内容的对象。


    但其实对于库本身的实现来说,我们也容易看到,利用property对多种并发泛式进行约束的方式,本身就具备一定的复杂度,实际上并不那么成熟。对于库的构建来说,很难说它提供的是一个简单易扩展的机制。这个其实tag_invoke机制本身也有跟property相关的对比,个人认为,同样是对库的定制和对泛型的支持的目的,基于cpo的tag invoke本身应该是更值得选择的,而property本身我感觉就比cpo的理解成本要高,用于构建库代码,也会导致库代码本身的复杂度变高,在它没有成为C++标准的一部分之前,这种复杂度的引入肯定是不那么合适的。

    这种复杂度的增加我们从当前asio 1.22代码仓库可以比较容易看出,主体功能变化不大(对比1.16版本),但引入了相当多的代码用于在兼容低版本c++的情况下对property等基础功能进行支持,导致整体代码复杂度剧增, 但实际带来的便利性基本看不到。如果抛开对新特性的实验本身,这些调整对asio的版本迭代来说,绝对跟优雅本身相去甚远。

    对比向早期execution的靠拢,asio对c++20 coroutine的支持还是可圈可点的,这个从作者近期的实例代码讲解中也能感受到,像awaitable的“||” “&&”等支持,很好的扩展了协程中多任务处理的语义,更容易用更少的代码实现出简单易理解,易维护的异步代码。





    此处我们需要注意的是,真正比较完整的实现了高效的操作系统级的AysncIO,并被大家接受使用的,也就只有Windows平台的IOCP,当然,这种情况最近几年得到了改善,linux平台的新秀io_uring,也被越来越多的人关注和使用起来,不过此处我们选的是1.16的版本,并未包含io_uring的实现,我们先暂时不考虑它的存在。操作系统级的async io实现制约了asio本身Proactor模型的跨平台实现,相关的异步任务调度,也自然的分裂成了两套实现:

    • 对于windows来说,因为IOCP的存在,asio的Proactor模型本身可以完全使用IOCP本身来实现,而对于其他平台,asio就只能选用妥协的方式,使用Reactor+外围Scheduler的模式,来模拟Proactor模型,最终实现一个业务层与IOCP使用体验完全一致的跨平台的Proactor模型。 

    • 对于我们当前的项目来说,因为优先选择的是跨平台的一致性和维护的简洁性,所以我们当前阶段,主要使用的是第2种方法中的scheduler,这也是本文分析的重点。而且Reactor本身的实现也跟scheduler的工作是解耦的,所以我们分析中可以直接略过reactor部分,只关注scheduler整体机制的实现了。


    asio的arbitrary task的投递是通过post来完成的,我们也会以此作为起点,来分析一个函数对象,是如何被asio进行处理最终存储起来的。



    1. asio::io_context ctx{};
    2. auto wg = asio::make_work_guard(ctx);
    3. std::thread tmp_thread([&ctx] { ctx.run(); });
    4. std::allocator<void> alloc;
    5. ctx.get_executor().post([] {
    6. std::cout << "task run!" << std::endl;
    7. }, alloc);
    8. std::this_thread::sleep_for(1s);


    • io_context->scheduler的传递过程

    1. template <typename Function, typename Allocator>
    2. void io_context::executor_type::post(Function&& f, const Allocator& a) const
    3. {
    4. typedef typename std::decay<Function>::type function_type;
    5. // Allocate and construct an operation to wrap the function.
    6. typedef detail::executor_op<function_type, Allocator, detail::operation> op;
    7. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
    8. p.p = new (p.v) op(static_cast<Function&&>(f), a);
    9. ASIO_HANDLER_CREATION((this->context(), *p.p,
    10. "io_context", &this->context(), 0, "post"));
    11. io_context_.impl_.post_immediate_completion(p.p, false);
    12. p.v = p.p = 0;
    13. }



    1. struct ptr
    2. {
    3. const Alloc* a;
    4. void* v;
    5. op* p;
    6. ~ptr()
    7. {
    8. reset();
    9. }
    10. static op* allocate(const Alloc& a)
    11. {
    12. typedef typename ::asio::detail::get_recycling_allocator<
    13. Alloc, purpose>::type recycling_allocator_type;
    14. ASIO_REBIND_ALLOC(recycling_allocator_type, op) a1(
    15. ::asio::detail::get_recycling_allocator<
    16. Alloc, purpose>::get(a));
    17. return a1.allocate(1);
    18. }
    19. void reset()
    20. {
    21. if (p)
    22. {
    23. p->~op();
    24. p = 0;
    25. }
    26. if (v)
    27. {
    28. typedef typename ::asio::detail::get_recycling_allocator<
    29. Alloc, purpose>::type recycling_allocator_type;
    30. ASIO_REBIND_ALLOC(recycling_allocator_type, op) a1(
    31. ::asio::detail::get_recycling_allocator<
    32. Alloc, purpose>::get(*a));
    33. a1.deallocate(static_cast<op*>(v), 1);
    34. v = 0;
    35. }
    36. }
    37. }


    • std::allocator_traits

    • 以及它的member alias templates : rebind_alloc




    1. typedef typename std::decay<Function>::type function_type;
    2. // Allocate and construct an operation to wrap the function.
    3. typedef detail::executor_op<function_type, Allocator, detail::operation> op;
    4. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
    5. p.p = new (p.v) op(static_cast<Function&&>(f), a);
    6. io_context_.impl_.post_immediate_completion(p.p, false);
    7. p.v = p.p = 0;

    一开始我们在类型为op::ptr的临时变量p初始化的时候,传入了分配器的指针,分配好的一段内存到其中,然后我们再利用replacement new对它进行初始化,注意这个地方不要被实现绕晕了,p.p的类型就是executor_op的指针,然后executor_op负责持有函数对象,这样定制了executor_op的内存分配,就间接的把包含其中的函数对象的内存分配也一并处理了。剩下的部分就是将executor_op的指针丢给io_context.impl,其实就是scheduler,因为我们并不希望相关scope结束的时候,刚申请的executor_op被马上释放,所以这里需要将p.v和p.p都置成空。到目前为止executor_op的定义和scheduler::post_immediate_completion(),我们接下来继续展开相关的实现。

    • executor_op

    1. template <typename Handler, typename Alloc,
    2. typename Operation = scheduler_operation>
    3. class executor_op : public Operation
    4. {
    5. public:
    7. template <typename H>
    8. executor_op(H&& h, const Alloc& allocator)
    9. : Operation(&executor_op::do_complete),
    10. handler_(static_cast(h)),
    11. allocator_(allocator)
    12. {
    13. }
    14. static void do_complete(void* owner, Operation* base,
    15. const asio::error_code& /*ec*/,
    16. std::size_t /*bytes_transferred*/)
    17. {
    18. // Take ownership of the handler object.
    19. executor_op* o(static_cast(base));
    20. Alloc allocator(o->allocator_);
    21. ptr p = { detail::addressof(allocator), o, o };
    23. // Make a copy of the handler so that the memory can be deallocated before
    24. // the upcall is made. Even if we're not about to make an upcall, a
    25. // sub-object of the handler may be the true owner of the memory associated
    26. // with the handler. Consequently, a local copy of the handler is required
    27. // to ensure that any owning sub-object remains valid until after we have
    28. // deallocated the memory here.
    29. Handler handler(static_cast(o->handler_));
    30. p.reset();
    31. // Make the upcall if required.
    32. if (owner)
    33. {
    34. fenced_block b(fenced_block::half);
    36. asio_handler_invoke_helpers::invoke(handler, handler);
    37. std::invoke(handler);
    39. }
    40. }
    41. private:
    42. Handler handler_;
    43. Alloc allocator_;
    44. };


    1. // Base class for all operations. A function pointer is used instead of virtual
    2. // functions to avoid the associated overhead.
    3. class scheduler_operation ASIO_INHERIT_TRACKED_HANDLER
    4. {
    5. public:
    6. using operation_type = scheduler_operation;
    7. void complete(void* owner, const asio::error_code& ec,
    8. std::size_t bytes_transferred)
    9. {
    10. func_(owner, this, ec, bytes_transferred);
    11. }
    12. void destroy()
    13. {
    14. func_(nullptr, this, asio::error_code(), 0);
    15. }
    16. protected:
    17. using func_type = void (*)(void*, scheduler_operation*, const asio::error_code&, std::size_t);
    18. scheduler_operation(func_type func)
    19. : next_(nullptr),
    20. func_(func),
    21. task_result_(0)
    22. {
    23. }
    24. // Prevents deletion through this type.
    25. ~scheduler_operation()
    26. {
    27. }
    28. private:
    29. friend class op_queue_access;
    30. scheduler_operation* next_;
    31. func_type func_;
    32. protected:
    33. friend class scheduler;
    34. unsigned int task_result_; // Passed into bytes transferred.
    35. };


    这个类型, 也就是executor_op::do_complete()的类型,这样对于scheduler层面,把相应的任务看成都是scheduler_operation,并且都可以按照func_type的形式来调用就好了。执行的细节我们暂且先搁下,聊到operation的执行的时候再一并来解析。

    • scheduler::post_immediate_completion()

    1. void scheduler::post_immediate_completion(
    2. scheduler::operation* op, bool is_continuation)
    3. {
    4. #if defined(ASIO_HAS_THREADS)
    5. if (one_thread_ || is_continuation)
    6. {
    7. if (thread_info_base* this_thread = thread_call_stack::contains(this))
    8. {
    9. ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
    10. static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
    11. return;
    12. }
    13. }
    14. #else // defined(ASIO_HAS_THREADS)
    15. (void)is_continuation;
    16. #endif // defined(ASIO_HAS_THREADS)
    17. work_started();
    18. mutex::scoped_lock lock(mutex_);
    19. op_queue_.push(op);
    20. wake_one_thread_and_unlock(lock);
    21. }

    这部分就比较简单了,主要是将对应的operation存储到scheduler上的op_queue中,op_queue是一个operation的链表实现,用作一个FIFO队列,相关的代码也比较简单,大家可以自己查阅。比较特殊的是is_continuation参数,如果为true,或者scheduler工作在单线程run()模式下,则会判断当前正在执行scheudler::run()的线程是不是当前线程, 如果是当前线程,则直接无锁方式将任务推送到线程的op_queue上,算是一个fast path实现了。

    • post()过程小结




    • scheduler::do_run_one()

    1. std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
    2. scheduler::thread_info& this_thread,
    3. const asio::error_code& ec)
    4. {
    5. while (!stopped_)
    6. {
    7. if (!op_queue_.empty())
    8. {
    9. // Prepare to execute first handler from queue.
    10. operation* o = op_queue_.front();
    11. op_queue_.pop();
    12. bool more_handlers = (!op_queue_.empty());
    13. std::size_t task_result = o->task_result_;
    14. if (more_handlers && !one_thread_)
    15. wake_one_thread_and_unlock(lock);
    16. else
    17. lock.unlock();
    18. // Ensure the count of outstanding work is decremented on block exit.
    19. work_cleanup on_exit = { this, &lock, &this_thread };
    20. (void)on_exit;
    21. // Complete the operation. May throw an exception. Deletes the object.
    22. o->complete(this, ec, task_result);
    23. return 1;
    24. }
    25. else
    26. {
    27. wakeup_event_.clear(lock);
    28. wakeup_event_.wait(lock);
    29. }
    30. }
    31. return 0;
    32. }



    1. static void do_complete(void* owner, Operation* base,
    2. const asio::error_code& /*ec*/,
    3. std::size_t /*bytes_transferred*/)
    4. {
    5. // Take ownership of the handler object.
    6. executor_op* o(static_cast(base));
    7. Alloc allocator(o->allocator_);
    8. ptr p = { detail::addressof(allocator), o, o };
    10. // Make a copy of the handler so that the memory can be deallocated before
    11. // the upcall is made. Even if we're not about to make an upcall, a
    12. // sub-object of the handler may be the true owner of the memory associated
    13. // with the handler. Consequently, a local copy of the handler is required
    14. // to ensure that any owning sub-object remains valid until after we have
    15. // deallocated the memory here.
    16. Handler handler(static_cast(o->handler_));
    17. p.reset();
    18. // Make the upcall if required.
    19. if (owner)
    20. {
    21. fenced_block b(fenced_block::half);
    23. asio_handler_invoke_helpers::invoke(handler, handler);
    24. std::invoke(handler);
    26. }
    27. }


    • scheduler::run()

    1. std::size_t scheduler::run(asio::error_code& ec)
    2. {
    3. ec = asio::error_code();
    4. if (outstanding_work_ == 0)
    5. {
    6. stop();
    7. return 0;
    8. }
    9. thread_info this_thread;
    10. this_thread.private_outstanding_work = 0;
    11. thread_call_stack::context ctx(this, this_thread);
    12. mutex::scoped_lock lock(mutex_);
    13. std::size_t n = 0;
    14. for (; do_run_one(lock, this_thread, ec); lock.lock())
    15. if (n != (std::numeric_limits<std::size_t>::max)())
    16. ++n;
    17. return n;
    18. }

    注意开始处对outstanding_work_数量的判断,如果为0,则run会马上执行scheduler的stop(),并退出,这也是为什么我们之前的测试代码中会创建一个work_guard对象的原因,这样保证outstanding_work_至少是1, 不会在op_queue_为空的情况下就结束scheduler的执行。一般开启独立的工作线程,如下所示:

    std::thread tmp_thread([&ctx] { ctx.run(); });


    • scheduler::run_one()

    1. std::size_t scheduler::run_one(asio::error_code& ec)
    2. {
    3. ec = asio::error_code();
    4. if (outstanding_work_ == 0)
    5. {
    6. stop();
    7. return 0;
    8. }
    9. thread_info this_thread;
    10. this_thread.private_outstanding_work = 0;
    11. thread_call_stack::context ctx(this, this_thread);
    12. mutex::scoped_lock lock(mutex_);
    13. return do_run_one(lock, this_thread, ec);
    14. }




    • poll(): 非阻塞的执行当前op_queue_已有的任务,执行完退出。

    • poll_one(): 尝试非阻塞的执行一个任务。

    • wait_one(): 与run_one()类似, 只是多了一个对time_out的判断。  


    1. while(!stop_) {
    2. //Add some logic jobs here
    3. //...
    4. ctx.poll();
    5. std::this_thread::sleep(1ms);
    6. }


    • run()过程总结



    前面我们的示例中,只有一个线程在执行scheduler::run(),所以相关任务是严格按照post()的先后顺序来执行的。那么如果我们更多的利用多核, 使用多个线程执行同一个context的run(),那必然任务被哪个线程调度到并执行,会变成一个不可预测的事情,这种情况下,如果任务之间存在依赖,我们又不希望在业务侧过多的使用同步原语,那应该怎么做呢? 我们通过两段示例代码来展开这个问题的处理方案。



    1. asio::io_context ctx{};
    2. auto wg = asio::make_work_guard(ctx);
    3. std::thread tmp_thread([&ctx] { ctx.run(); });
    4. std::thread tmp_thread1([&ctx] { ctx.run(); });
    5. std::thread tmp_thread2([&ctx] { ctx.run(); });
    6. std::thread tmp_thread3([&ctx] { ctx.run(); });
    7. std::allocator<void> alloc;
    8. char buf[256] = {0};
    9. for (int i = 0; i < 10; i++) {
    10. sprintf_s(buf, sizeof(buf), "task id: %d run!", i);
    11. std::string tmpstr(buf);
    12. ctx.get_executor().post([tmpstr] {
    13. std::cout << tmpstr.c_str() << std::endl;
    14. }, alloc);
    15. }
    16. std::this_thread::sleep_for(5s);


    1. 能够相象得到,在多核电脑上,我们得到的输出必然不是一个整齐的从09的输出:
    2. task id: 0 run!task id: 2 run!
    3. task id: 1 run!
    4. task id: 3 run!
    5. task id: 5 run!
    6. task id: 7 run!
    7. task id: 6 run!
    8. task id: 4 run!
    9. task id: 8 run!
    10. task id: 9 run!

    那么如何才能保证所有task是按照post顺序依次执行的呢,答案就是本节的主角: strand   


    1. asio::io_context ctx{};
    2. auto wg = asio::make_work_guard(ctx);
    3. std::thread tmp_thread([&ctx] { ctx.run(); });
    4. std::thread tmp_thread1([&ctx] { ctx.run(); });
    5. std::thread tmp_thread2([&ctx] { ctx.run(); });
    6. std::thread tmp_thread3([&ctx] { ctx.run(); });
    7. std::allocator<void> alloc;
    8. asio::io_context::strand strand(ctx);
    9. char buf[256] = {0};
    10. for (int i = 0; i < 10; i++) {
    11. sprintf_s(buf, sizeof(buf), "task id: %d run!", i);
    12. std::string tmpstr(buf);
    13. strand.post([tmpstr] {
    14. std::cout << tmpstr.c_str() << std::endl;
    15. }, alloc);
    16. }
    17. std::this_thread::sleep_for(5s);


    1. task id: 0 run!
    2. task id: 1 run!
    3. task id: 2 run!
    4. task id: 3 run!
    5. task id: 4 run!
    6. task id: 5 run!
    7. task id: 6 run!
    8. task id: 7 run!
    9. task id: 8 run!
    10. task id: 9 run!

    我们发现所有task已经按照post顺序逐一打印了,这是如何做到的呢? 接下来我们将对strand的实现进行分析。



    • strand相关的operation定义

    • strand上的task的投递

    • strand上的task的执行


    1. // The underlying implementation of a strand.
    2. class strand_impl
    3. : public operation
    4. {
    5. public:
    6. strand_impl(): operation(&strand_service::do_complete), locked_(false)
    7. private:
    8. // Only this service will have access to the internal values.
    9. friend class strand_service;
    10. friend struct on_do_complete_exit;
    11. friend struct on_dispatch_exit;
    12. // Mutex to protect access to internal data.
    13. asio::detail::mutex mutex_;
    14. // Indicates whether the strand is currently "locked" by a handler. This
    15. // means that there is a handler upcall in progress, or that the strand
    16. // itself has been scheduled in order to invoke some pending handlers.
    17. bool locked_;
    18. // The handlers that are waiting on the strand but should not be run until
    19. // after the next time the strand is scheduled. This queue must only be
    20. // modified while the mutex is locked.
    21. op_queue<operation> waiting_queue_;
    22. // The handlers that are ready to be run. Logically speaking, these are the
    23. // handlers that hold the strand's lock. The ready queue is only modified
    24. // from within the strand and so may be accessed without locking the mutex.
    25. op_queue ready_queue_;
    26. };

    这部分代码本身注释比较多, 我们主要关注几点:

    • 构造函数处, 我们将strand_impl的complete关联到了strand_service::do_complete()处。

    • 首先strand的operation本身是带锁的,后面也会提到,相关的锁粒度非常小。

    • strand的operation包含两个队列,一个ready_queue_和一个waiting_queue_。

    • 一个locked_标志,这些共同配合,使得strand能够达成最小粒度锁的实现。

    • 注释比较详细,结合相关的post和complete过程理解更佳。

    • strand上的task投递


    • strand::post()开始执行。

    • 内部会触发strand_service::post()的执行。

    • 会继续触发strand_service::do_post()的执行。


    level 1: strand::post():

    1. template <typename Function, typename Allocator>
    2. void post(Function&& f, const Allocator& a) const
    3. {
    4. typename std::decay::type tmp(static_cast(f));
    5. service_.post(impl_, tmp);
    6. (void)a;
    7. }


    level 2: strand_service::post():

    1. template <typename Handler>
    2. void strand_service::post(strand_service::implementation_type& impl,
    3. Handler& handler)
    4. {
    5. bool is_continuation =
    6. asio_handler_cont_helpers::is_continuation(handler);
    7. // Allocate and construct an operation to wrap the handler.
    8. typedef completion_handler<Handler> op;
    9. typename op::ptr p = { asio::detail::addressof(handler),
    10. op::ptr::allocate(handler), 0 };
    11. p.p = new (p.v) op(handler);
    12. ASIO_HANDLER_CREATION((this->context(),
    13. *p.p, "strand", impl, 0, "post"));
    14. do_post(impl, p.p, is_continuation);
    15. p.v = p.p = 0;
    16. }

    这部分代码基本就是我们之前分析过的scheduler::post()的翻版了,略有差异的地方是此处使用的不是execution_op,而是使用了另外一个类型 completion_handler\,该类型的实现与execution_op基本没有太大的差别,除了completion_handler本身不保存外部传入的Alloc这点外。它本身也是完成对传入的Function的类型擦除,提供一个统一类型的do_completion()接口,方便scheduler侧对相应的task进行延迟调用,相关的实现对比之前讲述的post()部分可以比较好的理解,这里不再赘述了。

    level 3: strand_service::do_post():

    1. void strand_service::do_post(implementation_type& impl,
    2. operation* op, bool is_continuation)
    3. {
    4. impl->mutex_.lock();
    5. if (impl->locked_)
    6. {
    7. // Some other handler already holds the strand lock. Enqueue for later.
    8. impl->waiting_queue_.push(op);
    9. impl->mutex_.unlock();
    10. }
    11. else
    12. {
    13. // The handler is acquiring the strand lock and so is responsible for
    14. // scheduling the strand.
    15. impl->locked_ = true;
    16. impl->mutex_.unlock();
    17. impl->ready_queue_.push(op);
    18. io_context_.post_immediate_completion(impl, is_continuation);
    19. }
    20. }

    这部分也有很详细的注释, 详细解释了分支的处理情况:

    • impl(就是strand独有的strand_impl这个operation) locked_标识为true,则将任务推送至waiting_queue_。

    • impl locked_标识为false,则将任务推送至read_queue_,并将locked_标识置为true。


    • strand上的task执行void strand_service::do_complete(void* owner, operation* base,


    1. void strand_service::do_complete(void* owner, operation* base,
    2. const asio::error_code& ec, std::size_t /*bytes_transferred*/)
    3. {
    4. if (owner)
    5. {
    6. strand_impl* impl = static_cast<strand_impl*>(base);
    7. // Indicate that this strand is executing on the current thread.
    8. call_stack<strand_impl>::context ctx(impl);
    9. // Ensure the next handler, if any, is scheduled on block exit.
    10. on_do_complete_exit on_exit;
    11. on_exit.owner_ = static_cast<io_context_impl*>(owner);
    12. on_exit.impl_ = impl;
    13. // Run all ready handlers. No lock is required since the ready queue is
    14. // accessed only within the strand.
    15. while (operation* o = impl->ready_queue_.front())
    16. {
    17. impl->ready_queue_.pop();
    18. o->complete(owner, ec, 0);
    19. }
    20. }
    21. }


    1. struct strand_service::on_do_complete_exit
    2. {
    3. io_context_impl* owner_;
    4. strand_impl* impl_;
    5. ~on_do_complete_exit()
    6. {
    7. impl_->mutex_.lock();
    8. impl_->ready_queue_.push(impl_->waiting_queue_);
    9. bool more_handlers = impl_->locked_ = !impl_->ready_queue_.empty();
    10. impl_->mutex_.unlock();
    11. if (more_handlers)
    12. owner_->post_immediate_completion(impl_, true);
    13. }
    14. };


    整体结构分为三层: 从下到上依次是:

    • Core部分的timer_queue实现

    • 中间的timer调度器实现(多选一)

    • 业务层使用的service和timer实现




    • min-heap实现简述


    • 其父节点为(i-1)/2

    • 左儿子为 2*(i+1) - 1

    • 右儿子为 2*(i+1)





    • 在队列尾部先加入相关元素。

    • 根据当前元素的大小,逐步执行shift-up操作,直到找到一个合适的位置。(满足min-heap约束)





    • 先将队尾元素与根节点交换。

    • 然后执行shift-down操作, 直到找到合适的位置。

    接上面的例子,我们删除0号节点,则有如下的情况 :



    • asio相关的代码实现

    1. template <typename Time_Traits>
    2. class timer_queue
    3. : public timer_queue_base
    4. {
    5. public:
    6. // The time type.
    7. using time_type = typename Time_Traits::time_type;
    8. // The duration type.
    9. using duration_type = typename Time_Traits::duration_type;
    10. // Per-timer data.
    11. class per_timer_data
    12. {
    13. public:
    14. per_timer_data() :
    15. heap_index_((std::numeric_limits<std::size_t>::max)()),
    16. next_(0), prev_(0)
    17. {
    18. }
    19. private:
    20. friend class timer_queue;
    21. // The operations waiting on the timer.
    22. op_queue<wait_op> op_queue_;
    23. // The index of the timer in the heap.
    24. std::size_t heap_index_;
    25. // Pointers to adjacent timers in a linked list.
    26. per_timer_data* next_;
    27. per_timer_data* prev_;
    28. };
    29. private:
    30. // The head of a linked list of all active timers.
    31. per_timer_data* timers_;
    32. struct heap_entry
    33. {
    34. // The time when the timer should fire.
    35. time_type time_;
    36. // The associated timer with enqueued operations.
    37. per_timer_data* timer_;
    38. };
    39. // The heap of timers, with the earliest timer at the front.
    40. std::vector<heap_entry> heap_;
    41. };


    1. // Move the item at the given index up the heap to its correct position.
    2. void up_heap(std::size_t index)
    3. {
    4. while (index > 0)
    5. {
    6. std::size_t parent = (index - 1) / 2;
    7. if (!Time_Traits::less_than(heap_[index].time_, heap_[parent].time_))
    8. break;
    9. swap_heap(index, parent);
    10. index = parent;
    11. }
    12. }
    13. // Move the item at the given index down the heap to its correct position.
    14. void down_heap(std::size_t index)
    15. {
    16. std::size_t child = index * 2 + 1;
    17. while (child < heap_.size())
    18. {
    19. std::size_t min_child = (child + 1 == heap_.size()
    20. || Time_Traits::less_than(
    21. heap_[child].time_, heap_[child + 1].time_))
    22. ? child : child + 1;
    23. if (Time_Traits::less_than(heap_[index].time_, heap_[min_child].time_))
    24. break;
    25. swap_heap(index, min_child);
    26. index = min_child;
    27. child = index * 2 + 1;
    28. }
    29. }
    30. // Swap two entries in the heap.
    31. void swap_heap(std::size_t index1, std::size_t index2)
    32. {
    33. heap_entry tmp = heap_[index1];
    34. heap_[index1] = heap_[index2];
    35. heap_[index2] = tmp;
    36. heap_[index1].timer_->heap_index_ = index1;
    37. heap_[index2].timer_->heap_index_ = index2;
    38. }


    1. // Add a new timer to the queue. Returns true if this is the timer that is
    2. // earliest in the queue, in which case the reactor's event demultiplexing
    3. // function call may need to be interrupted and restarted.
    4. bool enqueue_timer(const time_type& time, per_timer_data& timer, wait_op* op)
    5. {
    6. // Enqueue the timer object.
    7. if (timer.prev_ == 0 && &timer != timers_)
    8. {
    9. if (this->is_positive_infinity(time))
    10. {
    11. // No heap entry is required for timers that never expire.
    12. timer.heap_index_ = (std::numeric_limits::max)();
    13. }
    14. else
    15. {
    16. // Put the new timer at the correct position in the heap. This is done
    17. // first since push_back() can throw due to allocation failure.
    18. timer.heap_index_ = heap_.size();
    19. heap_entry entry = { time, &timer };
    20. heap_.push_back(entry);
    21. up_heap(heap_.size() - 1);
    22. }
    23. // Insert the new timer into the linked list of active timers.
    24. timer.next_ = timers_;
    25. timer.prev_ = 0;
    26. if (timers_)
    27. timers_->prev_ = &timer;
    28. timers_ = &timer;
    29. }
    30. // Enqueue the individual timer operation.
    31. timer.op_queue_.push(op);
    32. // Interrupt reactor only if newly added timer is first to expire.
    33. return timer.heap_index_ == 0 && timer.op_queue_.front() == op;
    34. }


    • time为positive_infinity的情况,直接不创建heap_entry,仅将heap_index_设置为最大的size_t。

    • 正常情况则如前面示例中提到的, 创建新的heap_entry并加入到数组尾部。

    • 使用up_heap()调整heap_entry到合适的位置。

    • timer链表的处理。

    • 返回新的timer是否是根节点(如果是根节点, 则表示整个timer_queue的最小expired时间有调整, 外围的Timer Scheduler需要做额外的处理)。


    当然,只有一个timer_queue,肯定是不够的,我们还得有地方驱动timer_queue的执行,对应的operation超时后才会得到处理。这就是下一部分Timer Scheduler涉及的内容。

    (二)Timer Scheduler-winrt_timer_scheduler实现


    ​我们能看到,asio自带的timer scheduler实现有很多,我们直接打开timer_scheduler.h文件也能看到:

    1. #if defined(ASIO_WINDOWS_RUNTIME)
    2. # include "asio/detail/winrt_timer_schedupp"
    3. #elif defined(ASIO_HAS_EPOLL)
    4. # include "asio/detail/epoll_reactor.hpp"
    5. #elif defined(ASIO_HAS_KQUEUE)
    6. # include "asio/detail/kqueue_reactor.hpp"
    7. #elif defined(ASIO_HAS_DEV_POLL)
    8. # include "asio/detail/dev_poll_reactor.hpp"
    9. #else
    10. # include "asio/detail/select_reactor.hpp"
    11. #endif


    • Windows下一般是直接使用iocp context作为timer scheduler。

    • linux下是使用epoll_reactor作为timer scheduler。

    • mac和ios下一般是kqueue_reactor。

    • 其他情况是select_reactor。

    • winrt_timer_scheduler其实是个cross platform的实现,不依赖任何特定平台的特性。


    如上节提到的,asio默认有好些timer scheduler实现,那我们为什么偏好于使用比较冷门的winrt_timer_scheduler呢? 主要是以下几个原因:

    • 跨平台实现,不依赖特定平台的特殊Api,所有平台表现一致。

    • 定制性,像游戏类的业务,一般都会有自己的虚拟时间,直接选择绑定系统时间的操作系统级实现,不一定是好的选择。

    • 如果我们仅用asio scheduler部分的能力,与reactor等实现共用Api并不是一个很好的选择。


    • winrt_timer_scheduler实现


    1. void winrt_timer_scheduler::run_thread()
    2. {
    3. asio::detail::mutex::scoped_lock lock(mutex_);
    4. while (!stop_thread_)
    5. {
    6. const long max_wait_duration = 5 * 60 * 1000000;
    7. long wait_duration = timer_queues_.wait_duration_usec(max_wait_duration);
    8. event_.wait_for_usec(lock, wait_duration);
    9. event_.clear(lock);
    10. op_queue<operation> ops;
    11. timer_queues_.get_ready_timers(ops);
    12. if (!ops.empty())
    13. {
    14. lock.unlock();
    15. scheduler_.post_deferred_completions(ops);
    16. lock.lock();
    17. }
    18. }
    19. }


    • operation最终是回归context::run()的线程进行执行的。

    • 此处的timer_queues是一个timer_queue_set,仅仅是一个多timer_queue的容器,存在的作用是外围多个特化的deadline_timer_service时,每个service会创建一个timer_queue。

    • 循环开始处的等待,我们前面已经知道min-heap根节点的特性,所以此处取出根节点就能知道最大的等待时间,大部分时候都能保证这个专有的timer线程不会空耗cpu。


    1. template <typename Time_Traits>
    2. void winrt_timer_scheduler::schedule_timer(timer_queue<Time_Traits>& queue, const typename Time_Traits::time_type& time, typename timer_queue<Time_Traits>::per_timer_data& timer, wait_op* op) {
    3. asio::detail::mutex::scoped_lock lock(mutex_);
    4. if (shutdown_) {
    5. scheduler_.post_immediate_completion(op, false);
    6. return;
    7. }
    8. bool earliest = queue.enqueue_timer(time, timer, op);
    9. scheduler_.work_started();
    10. if (earliest) event_.signal(lock);
    11. }


    这里也能体现出min-heap实现对定时器场合的适用性,操作和获取根节点的成本都比较低,这样就为我们在外围实现高效的timer scheduler提供了便利。

    • epoll_reactor timer部分支持浅析


    当我们使用epoll_reactor作为timer scheduler的时候,整体系统的工作流程图如下:


    1. // Add the timer descriptor to epoll.
    2. if (timer_fd_ != -1)
    3. {
    4. ev.events = EPOLLIN | EPOLLERR;
    5. ev.data.ptr = &timer_fd_;
    6. epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);


    1. void epoll_reactor::run(long usec, op_queue<operation>& ops)
    2. {
    3. int timeout;
    4. if (usec == 0)
    5. timeout = 0;
    6. else
    7. {
    8. timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
    9. }
    10. bool check_timers = false
    11. // Block on the epoll descriptor.
    12. epoll_event events[128];
    13. int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
    14. // Dispatch the waiting events.
    15. for (int i = 0; i < num_events; ++i)
    16. {
    17. void* ptr = events[i].data.ptr;
    18. if (ptr == &timer_fd_)
    19. {
    20. check_timers = true;
    21. }
    22. }
    23. if (check_timers)
    24. {
    25. mutex::scoped_lock common_lock(mutex_);
    26. timer_queues_.get_ready_timers(ops);
    27. itimerspec new_timeout;
    28. itimerspec old_timeout;
    29. int flags = get_timeout(new_timeout);
    30. timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
    31. }
    32. }

    最后我们还是利用timer_queue来获取所有已经超时的任务,但相比较独立的线程驱动,此处共用io_context内部持有的reactor即可完成所有操作了,另外因为只是多出一个timer_fd,对应的开销基本也是可以忽略的。需要注意的是每次timer_fd触发后,我们需要重新对timer_fd设置超时时间。 使用像epoll_reactor这种来驱动超时,优缺点都很明显:

    优点: 高性能。


    • 特定系统专供实现。

    • 定制性差,时间强行跟系统进行绑定了,不方便支持虚拟时间等概念。


    这部分是业务使用时直接能接触到的部分,内部实现主要是处理operation的传递,以及对上面介绍的Time Scheduler的一层Wrapper,这部分我们在前面展开的已经比较多了,大量实现是比较类同的,感兴趣的读者可以自行阅读相关代码,有前面的基础,这部分代码理解起来也是比较快的,我们这里就不逐一展开讲述了。asio预定义的几种定时器:

    1. using system_timer = basic_waitable_timer<std::chrono::system_clock>;
    2. using steady_timer = basic_waitable_timer<std::chrono::steady_clock>;
    3. using high_resolution_timer = basic_waitable_timer<
    4. std::chrono::high_resolution_clock>;


    1. asio::steady_timer t(ctx);
    2. t.expires_after(1s);
    3. t.async_wait([](asio::error_code ec) {
    4. std::cout << "timeout!" << std::endl;
    5. });





    Asio作为广为人知的网络库,单就的scheduler部分来说,使用比较现代化的c++特性,整体围绕operation进行组织,提供了可以执行任意任务的 scheduler,又在此基础上实现了可以在业务层尽量避免直接使用同步原语的strand,然后又提供了相对高效的timer实现,整体的性能,易用性,完成度可以说都达到了一个比较完美的程度,首先它自己对各种网络api的支持,其次上面说到的一些工业项目对其scheduler部分的成功使用,都说明它是一个成熟度相当高,泛用性非常好的一个库,很长一段时间应该都会被广泛使用了。




