• Linux 64位 C++协程池原理分析及代码实现


    导语

    本文介绍了协程的作用、结构、原理,并使用C++和汇编实现了64位系统下的协程池。文章内容避免了协程晦涩难懂的部分,用大量图文来分析原理,适合新手阅读学习。

    GitHub源码

    1. Web服务器问题

    现代分布式Web后台服务逻辑通常由一系列RPC请求组成,若串行则耗时比较长。

    此时一般都会使用线程池并行运行RPC请求,如图中GetData函数

    假设请求数据包不大,那么可假设GetData耗时组成如下图所示。在非阻塞读情况下,CPU将在Wait环节空转浪费资源(不断地read,得到返回码-1)。

    2. 协程的引入

    有没有办法只用一个线程并行执行GetData呢?答案是:可以!我们假设有3个并行的GetData任务,下图线程1通过跳转控制流,减少CPU资源浪费。执行流为①~⑦,在Wait阶段则跳到其他任务如①~⑤。运行结束后也跳到其他任务如⑥~⑦。通过这种方式,3个GetData能用一个线程以52ms的耗时并行执行。

    如果GetData任务可以被这样分配,则可以减少线程切换的消耗。因为协程的调度是线程内用户态执行的,CPU消耗非常小。

    相关视频推荐

    协程!协程!协程!给你一个吊打面试官的机会!

    用ntyco来解决,大块数据传输,连续包处理接收

    协程在 reactor 网络模型中的应

    免费学习地址:c/c++ linux服务器开发/后台架构师

    需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

    3. 协程的原理

    从上文可知,协程之间的切换本质是函数的跳转,即如何让正在执行的函数跳转到另一个新的函数上,以及下次如何又跳转回来。如下面代码所示:

    1. void func1() {
    2. printf("① 跳转到func2");
    3. Coroutine::CoYield(); // 通过该函数跳到func2
    4. printf("③ func2跳转回func1");
    5. }
    6. void func2() {
    7. printf("② func2执行完毕");
    8. }

    要实现这种能力,需要结合汇编知识。首先研究如下简单函数的汇编语言

    1. #include <iostream>
    2. using namespace std;
    3. class Object {
    4. public:
    5. int val[12];
    6. };
    7. int func(Object *pObj1, Object *pObj2) {
    8. pObj1->val[0] = 1;
    9. pObj1->val[11] = 11;
    10. pObj2->val[0] = 2;
    11. pObj2->val[11] = 12;
    12. int arr[100];
    13. arr[0] = 3;
    14. arr[99] = 99;
    15. return pObj1->val[0];
    16. }
    17. int main() {
    18. Object obj, obj2;
    19. int a = func(&obj, &obj2);
    20. return a;
    21. }

    下面看看在64位系统汇编中,func函数是如何执行的。

    push %rbp是进入func函数执行的第一个指令,作用是把rbp的地址压到栈顶。因为rsp始终指向栈顶,所以压栈后,rsp的地址下移8字节。rdi和rsi相差48个字节,该空间被class Object内的int val[12]占用。

    前两个指令让rbp指向rsp往下296字节的位置。后面两个指令把rdi和rsi地址保存在最下面。

    为什么rsp下移296字节?首先,上述代码使用了临时变量int arr[100],需要有400个字节的栈空间;其次,x64系统存有128字节的红色区域可使用;最后,rdi和rsi地址共占16字节。因此,rbp到红色区域底部的空间一共是 288 + 8 + 104 + 8 + 8 = 416字节。

    接下来才开始执行func函数第一行代码,给val[0]赋值。

    然后分别给pObj1和pObj2的成员变量赋值

    接下来给临时变量arr赋值

    最后让eax指向返回值,恢复函数栈的栈底和栈顶。

    4. 协程的结构

    从前面我们知道,每个函数在内存中都有栈顶rsp和栈底rbp。这两个值决定了函数可操作的内存范围,如下图所示

    既然协程切换是从一个函数切换到另一个函数,那么就需要知道两个函数的rbp和rsp。然而,函数的rbp和rsp是执行时设定的,代码层面难以获得。
    既然如此,我们可以实现腾出空间,让函数在预期的rbp和rsp内。定义一个类如下:

    1. class Coroutine {
    2. void* m_pRegister[14];
    3. char m_pStack[1024];
    4. std::function<void()> m_func;
    5. };

    那么在内存模型中,该类的布局如下所示

    这样的协程在能被使用前需要做初始化,如下图所示

    在其他协程切换过来时,cpu寄存器可按m_pRegister预设的地址赋值,开始执行DoWork函数,函数代码如下:

    1. static void Coroutine::DoWork(Coroutine *pThis) {
    2. pThis->m_func();
    3. pThis->Yield(); // 转让控制流给同线程的其他协程
    4. }

    由于是静态函数,需令参数pThis为协程地址。所以,初始化时需要设置m_pRegister中的rdi为this。上述第二行代码执行时,rbp会设为this。所以执行m_func时,如下图所示:

    5. 协程间的切换

    下面以Coroutine1切换到Coroutine2为例。主要分为两步:
    1. 保存Coroutine1的上下文

    2. 加载Coroutine2的上下文

    切换代码可见源代码Coroutine::Switch

    6. 协程池的实现

    本文实现协程池比较简单,初始化创建线程并设置thread_local变量以保存协程队列状态。并且,每个线程额外创建一个main协程用作Guard。

    在执行时,每个线程通过轮询的方式切换协程,若协程无任务则尝试CAS获取Job,否则直接执行已有Job。当Job执行完或主动CoYield时,切换到下一个协程。

    为了避免CAS空转,在没有任务时会阻塞休眠。当任务来临时则Notify所有线程的协程。

    7. 源代码

    example.cpp

    1. /**
    2. * @file example.cpp
    3. * @author souma
    4. * @brief 使用协程池的示例,编译命令如下
    5. * g++ example.cpp coroutine.cpp -lpthread -O3
    6. * @version 0.1
    7. * @date 2023-06-06
    8. *
    9. * @copyright Copyright (c) 2023
    10. *
    11. */
    12. #include <iostream>
    13. #include <array>
    14. #include "coroutine.h"
    15. using namespace std;
    16. using namespace comm;
    17. void func(const string &sTaskName, uint32_t uWaitSeconds) {
    18. printf("[%ld] [%s start], wait seconds[%u]\n", time(nullptr), sTaskName.c_str(), uWaitSeconds);
    19. time_t iStartSec = time(nullptr);
    20. // 默认可用65535字节的栈内存,具体可看CO_STACK_SIZE
    21. uint32_t uArrSize = 65535/4;
    22. int arr[uArrSize];
    23. while (time(nullptr) - iStartSec < uWaitSeconds) {
    24. // 操作栈内存
    25. for (int i = 0; i < uArrSize; ++i) {
    26. arr[i] = i;
    27. }
    28. // 切换控制流
    29. printf("[%ld] [%s] -> [协程池]\n", time(nullptr), sTaskName.c_str());
    30. usleep(100);
    31. Coroutine::CoYield(); // 只需这一个函数即可切换控制流
    32. printf("[%ld] [协程池] -> [%s]\n", time(nullptr), sTaskName.c_str());
    33. }
    34. // 检查栈内存是否正确
    35. for (int i = 0; i < uArrSize; ++i) {
    36. if (arr[i] != i) {
    37. printf("栈内存错误\n");
    38. exit(-1);
    39. }
    40. }
    41. printf("[%ld] [%s end], expect_timecost[%d], real_timecost[%ld]\n", time(nullptr), sTaskName.c_str(), uWaitSeconds, time(nullptr) - iStartSec);
    42. }
    43. int main() {
    44. // 如果想当线程池用,可以令第一个参数为线程数,第二个参数为1
    45. // 在该场景下,使用小线程大协程不仅CPU消耗低,整体耗时也很低,可以自行测试。
    46. CoroutinePool oPool(2, 300);
    47. oPool.Run();
    48. time_t iStartTime = time(nullptr);
    49. const int iTaskCnt = 400;
    50. vector<shared_ptr<Future>> vecFuture;
    51. for (int i = 0; i < iTaskCnt; ++i) {
    52. // 模拟GetData中的Wait环节, 1 ~ 5秒等待
    53. shared_ptr<Future> pFuture = oPool.Submit([i](){func("Task" + to_string(i), random() % 5 + 1);});
    54. if (pFuture != nullptr) {
    55. vecFuture.emplace_back(pFuture);
    56. }
    57. }
    58. // 阻塞等待所有Task完成
    59. for (auto it = vecFuture.begin(); it != vecFuture.end(); ++it) {
    60. (*it)->Get();
    61. }
    62. printf("demo's finished, time cost[%ld]\n", time(nullptr) - iStartTime);
    63. return 0;
    64. }

    coroutine.h

    1. /**
    2. * @file coroutine.h
    3. * @author souma
    4. * @brief 多线程无栈式协程池,请不要用-O0编译否则会产生coredump
    5. * @version 0.1
    6. * @date 2023-06-06
    7. *
    8. * @copyright Copyright (c) 2023
    9. *
    10. */
    11. #pragma once
    12. #include <functional>
    13. #include <memory>
    14. #include <vector>
    15. #include <queue>
    16. #include <thread>
    17. #include <mutex>
    18. #include <signal.h>
    19. #include <pthread.h>
    20. #include <condition_variable>
    21. #include <unistd.h>
    22. namespace comm {
    23. class Future;
    24. class CoroutinePool;
    25. class Coroutine;
    26. struct CoroutinePoolCtx;
    27. struct CoroutineTaskCtx;
    28. struct CoroutinePoolCtx {
    29. std::vector<std::shared_ptr<Coroutine>> m_vecCoroutine;
    30. std::shared_ptr<Coroutine> m_pMainCoroutine;
    31. uint32_t m_uCursor;
    32. uint32_t m_uWorkCnt;
    33. };
    34. struct CoroutineTaskCtx {
    35. std::function<void()> m_userFunc;
    36. std::shared_ptr<Future> m_pFuture;
    37. };
    38. // class ArraySyncQueue start
    39. template <class T>
    40. class ArraySyncQueue {
    41. public:
    42. ArraySyncQueue(uint32_t uCapacity, uint32_t uSleepUs = 100, uint32_t uRetryTimes = 3);
    43. bool Push(T *pObj);
    44. T* Pop();
    45. inline bool IsFull() const { return m_uPushCursor == m_uPopCursor - 1 || (m_uPopCursor == 0 && m_uPushCursor == m_vecQueue.size() - 1); }
    46. bool IsEmpty() const { return m_uPopCursor == m_uPushCursor; }
    47. ~ArraySyncQueue();
    48. private:
    49. uint32_t GetNextCursor(uint32_t uCursor);
    50. private:
    51. std::vector<T*> m_vecQueue;
    52. uint32_t m_uPushCursor = 0;
    53. uint32_t m_uPopCursor = 0;
    54. uint32_t m_uSleepUs;
    55. uint32_t m_uRetryTimes;
    56. };
    57. // class ArraySyncQueue end
    58. // class Coroutine start
    59. class Coroutine {
    60. public:
    61. friend class CoroutinePool;
    62. /**
    63. * @brief 调用该函数将执行流交给其他协程,仅在协程池环境下有效
    64. *
    65. * @return true:协程切换成功, false:不在协程池环境中运行
    66. */
    67. static bool CoYield();
    68. Coroutine(const Coroutine &) = delete;
    69. Coroutine(Coroutine &&) = delete;
    70. Coroutine & operator=(const Coroutine &) = delete;
    71. Coroutine & operator=(Coroutine &&) = delete;
    72. private:
    73. // 4096是预留给库使用的栈内存大小,后者是留给用户使用的栈内存大小
    74. constexpr static uint32_t CO_STACK_SIZE = 4096 + 65535;
    75. Coroutine();
    76. /**
    77. * @brief 当前协程是否绑定了任务
    78. *
    79. * @return true:是
    80. */
    81. inline bool HasTask() const { return m_pTaskCtx != nullptr; }
    82. /**
    83. * @brief 两个协程切换,从pPrev切换到pNext
    84. */
    85. static void Switch(Coroutine *pPrev, Coroutine *pNext);
    86. /**
    87. * @brief 将控制流转给同线程的其他协程
    88. */
    89. void Yield();
    90. /**
    91. * @brief 这个是给main协程用的
    92. */
    93. void Register();
    94. /**
    95. * @brief 这个是给执行用户任务的协程用的
    96. */
    97. void Register(std::shared_ptr<CoroutineTaskCtx> pTaskCtx);
    98. /**
    99. * @return CoroutinePoolCtx& 当前线程的协程上下文
    100. */
    101. static CoroutinePoolCtx & GetCtx();
    102. /**
    103. * @brief 让当前线程的cursor往后移,轮询协程
    104. */
    105. static void MoveCursor();
    106. /**
    107. * @brief 协程包一层的函数
    108. */
    109. static void DoWork(Coroutine *pThis);
    110. /**
    111. *
    112. * @return void* 获得自建rsp地址
    113. */
    114. void* GetRsp();
    115. /**
    116. * 保存寄存器的值到m_pStack中
    117. */
    118. void SaveReg();
    119. private:
    120. void* m_pRegister[14];
    121. char m_pStack[CO_STACK_SIZE];
    122. std::shared_ptr<CoroutineTaskCtx> m_pTaskCtx;
    123. };
    124. // class Coroutine end
    125. // class CoroutinePool start
    126. class CoroutinePool {
    127. public:
    128. friend class Coroutine;
    129. /**
    130. * @brief 建立一个多线程协程池,即创建uThreadCnt个线程,每个线程含有uCoroutineCnt个协程
    131. 调用Run开始运行,调用Stop或直接析构结束
    132. * @param uThreadCnt 线程数,小于1则为1
    133. * @param uCoroutineCnt 每个线程的协程数,小于1则为1
    134. * @param uJobQueueSize 总任务队列大小,小于1则为1
    135. */
    136. CoroutinePool(uint32_t uThreadCnt, uint32_t uCoroutineCnt, uint32_t uJobQueueSize = 1024000);
    137. /**
    138. * @brief 线程安全,可重入
    139. * @return true:正常
    140. */
    141. bool Run();
    142. /**
    143. * @brief 停止协程池 (会先保证池中任务完成再停止),线程安全可重入
    144. *
    145. */
    146. void Stop();
    147. /**
    148. * @param userFunc 用户函数
    149. * @return std::shared_ptr<Future> nullptr:协程池队列满了,提交不了
    150. */
    151. std::shared_ptr<Future> Submit(const std::function<void()> &userFunc);
    152. ~CoroutinePool();
    153. CoroutinePool(const CoroutinePool &) = delete;
    154. CoroutinePool(CoroutinePool &&) = delete;
    155. CoroutinePool & operator=(const CoroutinePool &) = delete;
    156. CoroutinePool & operator=(CoroutinePool &&) = delete;
    157. private:
    158. static void LoopWork(CoroutinePool &oPool);
    159. private:
    160. bool m_bStarted;
    161. uint32_t m_uThreadCnt;
    162. uint32_t m_uRoutineCnt;
    163. ArraySyncQueue<CoroutineTaskCtx> m_queueJob;
    164. std::vector<std::shared_ptr<std::thread>> m_vecThread;
    165. std::mutex m_oMutex;
    166. std::condition_variable m_oCondition;
    167. };
    168. // class CoroutinePool end
    169. // class Future start
    170. class Future {
    171. public:
    172. /**
    173. * @brief 阻塞获得结果
    174. *
    175. * @param uTimeoutMs 超时时间
    176. * @return true:成功, false:超时
    177. */
    178. bool Get(uint32_t uTimeoutMs = -1);
    179. /**
    180. * @brief 设置状态为完成
    181. */
    182. void SetFinished();
    183. Future();
    184. Future(const Future&) = delete;
    185. Future(Future&&) = delete;
    186. Future & operator=(const Future&) = delete;
    187. Future & operator=(Future&&) = delete;
    188. private:
    189. std::mutex m_oMutex;
    190. std::condition_variable m_oCondition;
    191. bool m_bFinished;
    192. };
    193. // class Future end
    194. }

    coroutine.cpp

    1. /**
    2. * @file coroutine.cpp
    3. * @author souma
    4. * @brief 协程池的具体实现
    5. * @version 0.1
    6. * @date 2023-06-06
    7. *
    8. * @copyright Copyright (c) 2023
    9. *
    10. */
    11. #include "coroutine.h"
    12. #include <cstring>
    13. using namespace std;
    14. namespace comm {
    15. // class Coroutine start
    16. Coroutine::Coroutine() {
    17. m_pTaskCtx = nullptr;
    18. }
    19. void Coroutine::Register() {
    20. m_pTaskCtx = make_shared<CoroutineTaskCtx>();
    21. m_pTaskCtx->m_userFunc = [](){};
    22. m_pTaskCtx->m_pFuture = nullptr;
    23. SaveReg();
    24. }
    25. void Coroutine::Register(shared_ptr<CoroutineTaskCtx> pTaskCtx) {
    26. m_pTaskCtx = pTaskCtx;
    27. SaveReg();
    28. }
    29. inline void Coroutine::Yield() {
    30. Coroutine::Switch(this, Coroutine::GetCtx().m_pMainCoroutine.get());
    31. }
    32. bool Coroutine::CoYield() {
    33. if (GetCtx().m_vecCoroutine.size() == 0) {
    34. return false;
    35. }
    36. GetCtx().m_vecCoroutine[GetCtx().m_uCursor]->Yield();
    37. return true;
    38. }
    39. CoroutinePoolCtx & Coroutine::GetCtx() {
    40. thread_local CoroutinePoolCtx coroutinePoolCtx;
    41. return coroutinePoolCtx;
    42. }
    43. void Coroutine::MoveCursor() {
    44. GetCtx().m_uCursor = GetCtx().m_uCursor == GetCtx().m_vecCoroutine.size() - 1 ? 0 : GetCtx().m_uCursor + 1;
    45. }
    46. extern "C" __attribute__((noinline, weak))
    47. void Coroutine::Switch(Coroutine *pPrev, Coroutine *pNext) {
    48. // 1.保存pPrev协程的上下文, rdi和pPrev同指向
    49. // 2.加载pNext协程的上下文, rsi和pNext同指向
    50. asm volatile(R"(
    51. movq %rsp, %rax
    52. movq %rbp, 104(%rdi)
    53. movq %rax, 96(%rdi)
    54. movq %rbx, 88(%rdi)
    55. movq %rcx, 80(%rdi)
    56. movq %rdx, 72(%rdi)
    57. movq 0(%rax), %rax
    58. movq %rax, 64(%rdi)
    59. movq %rsi, 56(%rdi)
    60. movq %rdi, 48(%rdi)
    61. movq %r8, 40(%rdi)
    62. movq %r9, 32(%rdi)
    63. movq %r12, 24(%rdi)
    64. movq %r13, 16(%rdi)
    65. movq %r14, 8(%rdi)
    66. movq %r15, (%rdi)
    67. movq (%rsi), %r15
    68. movq 8(%rsi), %r14
    69. movq 16(%rsi), %r13
    70. movq 24(%rsi), %r12
    71. movq 32(%rsi), %r9
    72. movq 40(%rsi), %r8
    73. movq 48(%rsi), %rdi
    74. movq 64(%rsi), %rax
    75. movq 72(%rsi), %rdx
    76. movq 80(%rsi), %rcx
    77. movq 88(%rsi), %rbx
    78. movq 96(%rsi), %rsp
    79. movq 104(%rsi), %rbp
    80. movq 56(%rsi), %rsi
    81. movq %rax, (%rsp)
    82. xorq %rax, %rax
    83. )");
    84. }
    85. void Coroutine::DoWork(Coroutine *pThis) {
    86. pThis->m_pTaskCtx->m_userFunc();
    87. pThis->m_pTaskCtx->m_pFuture->SetFinished();
    88. pThis->m_pTaskCtx.reset();
    89. Coroutine::GetCtx().m_uWorkCnt--;
    90. pThis->Yield();
    91. }
    92. void* Coroutine::GetRsp() {
    93. // m_pRegister和m_pStack中间预留一个指针空间
    94. auto sp = std::end(m_pStack) - sizeof(void*);
    95. // 预定Rsp的地址保证能够整除8字节
    96. sp = decltype(sp)(reinterpret_cast<size_t>(sp) & (~0xF));
    97. return sp;
    98. }
    99. void Coroutine::SaveReg() {
    100. void *pStack = GetRsp();
    101. memset(m_pRegister, 0, sizeof m_pRegister);
    102. void **pRax = (void**)pStack;
    103. *pRax = (void*) DoWork;
    104. // rsp
    105. m_pRegister[12] = pStack;
    106. // rax
    107. m_pRegister[8] = *pRax;
    108. // rdi
    109. m_pRegister[6] = this;
    110. }
    111. // class Coroutine end
    112. // class CoroutinePool start
    113. CoroutinePool::CoroutinePool(uint32_t uThreadCnt, uint32_t uCoroutineCnt, uint32_t uJobQueueSize) : m_queueJob(uJobQueueSize) {
    114. m_bStarted = false;
    115. m_uThreadCnt = max(uThreadCnt, 1u);
    116. m_uRoutineCnt = max(uCoroutineCnt, 1u);
    117. }
    118. bool CoroutinePool::Run() {
    119. if (!__sync_bool_compare_and_swap(&m_bStarted, false, true)) {
    120. return false;
    121. }
    122. for (decltype(m_uThreadCnt) i = 0; i < m_uThreadCnt; ++i) {
    123. m_vecThread.emplace_back(make_shared<thread>(CoroutinePool::LoopWork, ref(*this)));
    124. }
    125. return true;
    126. }
    127. void CoroutinePool::Stop() {
    128. if (!__sync_bool_compare_and_swap(&m_bStarted, true, false)) {
    129. return;
    130. }
    131. m_oCondition.notify_all();
    132. for (auto it = m_vecThread.begin(); it != m_vecThread.end(); ++it) {
    133. (*it)->join();
    134. }
    135. m_vecThread.clear();
    136. }
    137. shared_ptr<Future> CoroutinePool::Submit(const function<void()> &userFunc) {
    138. shared_ptr<Future> pNewFuture = make_shared<Future>();
    139. CoroutineTaskCtx *pTaskCtx = new CoroutineTaskCtx;
    140. pTaskCtx->m_pFuture = pNewFuture;
    141. pTaskCtx->m_userFunc = userFunc;
    142. if (!m_queueJob.Push(pTaskCtx)) {
    143. delete pTaskCtx, pTaskCtx = nullptr;
    144. return nullptr;
    145. }
    146. m_oCondition.notify_all();
    147. return pNewFuture;
    148. }
    149. CoroutinePool::~CoroutinePool() {
    150. Stop();
    151. }
    152. void CoroutinePool::LoopWork(CoroutinePool &oPool) {
    153. Coroutine::GetCtx().m_uCursor = 0;
    154. Coroutine::GetCtx().m_uWorkCnt = 0;
    155. Coroutine::GetCtx().m_pMainCoroutine = shared_ptr<Coroutine>(new Coroutine);
    156. Coroutine::GetCtx().m_pMainCoroutine->Register();
    157. Coroutine::GetCtx().m_vecCoroutine.clear();
    158. for (decltype(oPool.m_uRoutineCnt) i = 0; i < oPool.m_uRoutineCnt; ++i) {
    159. Coroutine::GetCtx().m_vecCoroutine.emplace_back(shared_ptr<Coroutine>(new Coroutine));
    160. }
    161. Coroutine *pMainCoroutine, *pCurCoroutine;
    162. while (oPool.m_bStarted || Coroutine::GetCtx().m_uWorkCnt > 0 || !oPool.m_queueJob.IsEmpty()) {
    163. pMainCoroutine = Coroutine::GetCtx().m_pMainCoroutine.get();
    164. pCurCoroutine = Coroutine::GetCtx().m_vecCoroutine[Coroutine::GetCtx().m_uCursor].get();
    165. if (pCurCoroutine->HasTask()) {
    166. Coroutine::Switch(pMainCoroutine, pCurCoroutine);
    167. Coroutine::MoveCursor();
    168. continue;
    169. }
    170. CoroutineTaskCtx *pTaskCtx = oPool.m_queueJob.Pop();
    171. if (pTaskCtx == nullptr) {
    172. if (Coroutine::GetCtx().m_uWorkCnt > 0) {
    173. Coroutine::MoveCursor();
    174. continue;
    175. }
    176. unique_lock<mutex> oLock(oPool.m_oMutex);
    177. oPool.m_oCondition.wait(oLock);
    178. continue;
    179. }
    180. pCurCoroutine->Register(shared_ptr<CoroutineTaskCtx>(pTaskCtx));
    181. ++Coroutine::GetCtx().m_uWorkCnt;
    182. Coroutine::Switch(pMainCoroutine, pCurCoroutine);
    183. Coroutine::MoveCursor();
    184. }
    185. }
    186. // class CoroutinePool end
    187. // class Future start
    188. Future::Future() {
    189. m_bFinished = false;
    190. }
    191. bool Future::Get(uint32_t uTimeoutMs) {
    192. unique_lock<mutex> oLock(m_oMutex);
    193. if (m_bFinished) {
    194. return true;
    195. }
    196. return m_oCondition.wait_for(oLock, chrono::milliseconds(uTimeoutMs)) == cv_status::no_timeout;
    197. }
    198. void Future::SetFinished() {
    199. {
    200. unique_lock<mutex> oLock(m_oMutex);
    201. m_bFinished = true;
    202. }
    203. m_oCondition.notify_all();
    204. }
    205. // class Future end
    206. // class ArraySyncQueue start
    207. template <class T>
    208. ArraySyncQueue<T>::ArraySyncQueue(uint32_t uCapacity, uint32_t uSleepUs, uint32_t uRetryTimes) {
    209. for (uint32_t i = 0; i < std::max(uCapacity, 1u); ++i) {
    210. m_vecQueue.emplace_back(nullptr);
    211. }
    212. m_uSleepUs = uSleepUs;
    213. m_uRetryTimes = uRetryTimes;
    214. }
    215. template <class T>
    216. bool ArraySyncQueue<T>::Push(T *pObj) {
    217. if (pObj == nullptr) {
    218. return false;
    219. }
    220. uint32_t uRetryTimes = 0;
    221. while (uRetryTimes <= m_uRetryTimes) {
    222. uint32_t uPushCursor = m_uPushCursor;
    223. if (uPushCursor == m_uPopCursor - 1 || (m_uPopCursor == 0 && uPushCursor == m_vecQueue.size() - 1)) {
    224. // 队列满了
    225. return false;
    226. }
    227. if (!__sync_bool_compare_and_swap(&m_vecQueue[uPushCursor], nullptr, pObj)) {
    228. uRetryTimes++;
    229. usleep(m_uSleepUs);
    230. continue;
    231. }
    232. m_uPushCursor = GetNextCursor(uPushCursor);
    233. return true;
    234. }
    235. // 竞争失败
    236. return false;
    237. }
    238. template <class T>
    239. T* ArraySyncQueue::Pop() {
    240. uint32_t uRetryTimes = 0;
    241. while (uRetryTimes <= m_uRetryTimes) {
    242. uint32_t uPopCursor = m_uPopCursor;
    243. if (uPopCursor == m_uPushCursor) {
    244. return nullptr;
    245. }
    246. T* pToReturn = m_vecQueue[uPopCursor];
    247. if (pToReturn == nullptr || !__sync_bool_compare_and_swap(&m_vecQueue[uPopCursor], pToReturn, nullptr)) {
    248. usleep(m_uSleepUs);
    249. uRetryTimes++;
    250. continue;
    251. }
    252. m_uPopCursor = GetNextCursor(uPopCursor);
    253. return pToReturn;
    254. }
    255. return nullptr;
    256. }
    257. template <class T>
    258. uint32_t ArraySyncQueue<T>::GetNextCursor(uint32_t uCursor) {
    259. if (uCursor == m_vecQueue.size() - 1) {
    260. return 0;
    261. }
    262. return uCursor + 1;
    263. }
    264. template <class T>
    265. ArraySyncQueue<T>::~ArraySyncQueue() {
    266. m_uRetryTimes = -1;
    267. do {
    268. T *pObj = Pop();
    269. if (pObj == nullptr) {
    270. return;
    271. }
    272. delete pObj, pObj = nullptr;
    273. } while (true);
    274. }
    275. // class ArraySyncQueue end
    276. }

    8. 补充说明

    8.1. 为什么不能-O0编译?

    在-O0的情况下,编译器会给函数(coroutine.cpp:57)Coroutine::Switch包一层汇编指令,导致实际执行汇编指令不是期望的。具体可以分别用-O0和-O3在GDB下disassemble看到差异。

    8.2. 如果函数使用栈很大怎么办?

    源码中定义的协程栈为CO_STACK_SIZE=4096 + 65535KB,若用户函数使用的栈超过该范围会产生coredump。简单可行的解法是:1.尽量使用堆变量;2.改大CO_STACK_SIZE。

  • 相关阅读:
    SpringCloud微服务实战——搭建企业级开发框架(三十六):使用Spring Cloud Stream实现可灵活配置消息中间件的功能
    检查OpenGL的版本
    14:00面试,14:06就出来了,问的问题有点变态。。。
    用C#实现简单的线性回归
    业内专业人士揭秘:双11即将来临,挑选SSD硬盘避坑指南
    基于云的 LDAP 目录服务和本地 LDAP 相比有哪些优势?
    Numpy-01(安装、Ndarray对象介绍创建及使用)
    无法更新chrome
    js书写规范
    项目实战-智慧监督下的合同预付款控制策略-物料价格下行-智慧监督-合同预付款预警推送大数据
  • 原文地址:https://blog.csdn.net/qq_40989769/article/details/133827752