以下是从sylar服务器中学的,对其的复习;
参考资料
函数只有两个行为:调用与返回。一旦函数返回后,它在栈上所拥有的状态将被销毁。协程相比函数多了两个动作:挂起与恢复。当协程主动挂起时,它的控制权将转交给另一个协程,这时它所拥有的状态仍被保留着,另一个协程获取控制权后,在将来某个时间点也可能选择挂起,从而使原协程的控制权得以恢复,一旦协程像函数一样返回,它所拥有的状态将被销毁。
协程是能暂停执行以在之后恢复的函数。
同样是单线程环境下,协程的yield和resume一定是同步进行的,一个协程的yield,必然对应另一个协程的resume,因为线程不可能没有执行主体。并且,协程的yield和resume是完全由应用程序来控制的。与线程不同,线程创建之后,线程的运行和调度也是由操作系统自动完成的,但协程创建后,协程的运行和调度都要由应用程序来完成,就和调用函数一样,所以协程也被称为用户态线程。
所谓创建协程,其实就是把一个函数包装成一个协程对象,然后再用协程的方式把这个函数跑起来;所谓协程调度,其实就是创建一批的协程对象,然后再创建一个调度协程,通过调度协程把这些协程对象一个一个消化掉(协程可以在被调度时继续向调度器添加新的调度任务);所谓IO协程调度,其实就是在调度协程时,如果发现这个协程在等待IO就绪,那就先让这个协程让出执行权,等对应的IO就绪后再重新恢复这个协程的运行;所谓定时器,就是给调度协程预设一个协程对象,等定时时间到了就恢复预设的协程对象。
ucontext_t接口
// 上下文结构体定义
typedef struct ucontext_t {
// 当前上下文结束后,下一个激活的上下文对象的指针,只在当前上下文是由makecontext创建时有效
struct ucontext_t *uc_link;
// 当前上下文的信号屏蔽掩码
sigset_t uc_sigmask;
// 当前上下文使用的栈内存空间,只在当前上下文是由makecontext创建时有效
stack_t uc_stack;
// 平台相关的上下文具体内容,包含寄存器的值
mcontext_t uc_mcontext;
...
} ucontext_t;
// 获取当前的上下文
int getcontext(ucontext_t *ucp);
// 恢复ucp指向的上下文,这个函数不会返回,而是会跳转到ucp上下文对应的函数中执行,相当于变相调用了函数
int setcontext(const ucontext_t *ucp);
// 修改由getcontext获取到的上下文指针ucp,将其与一个函数func进行绑定,支持指定func运行时的参数,
// 在调用makecontext之前,必须手动给ucp分配一段内存空间,存储在ucp->uc_stack中,这段内存空间将作为func函数运行时的栈空间,
// 同时也可以指定ucp->uc_link,表示函数运行结束后恢复uc_link指向的上下文,
// 如果不赋值uc_link,那func函数结束时必须调用setcontext或swapcontext以重新指定一个有效的上下文,否则程序就跑飞了
// makecontext执行完后,ucp就与函数func绑定了,调用setcontext或swapcontext激活ucp时,func就会被运行
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);
// 恢复ucp指向的上下文,同时将当前的上下文存储到oucp中,
// 和setcontext一样,swapcontext也不会返回,而是会跳转到ucp上下文对应的函数中执行,相当于调用了函数
int swapcontext(ucontext_t *oucp, const ucontext_t *ucp);
sylar使用非对称协程模型,也就是子协程只能和线程主协程切换,而不能和另一个子协程切换,并且在程序结束时,一定要再切回主协程,以保证程序能正常结束
sylar对每个协程,设计了6种状态
/**
* @brief 协程状态
*/
enum State {
/// 初始化状态
INIT,
/// 暂停状态
HOLD,
/// 执行中状态
EXEC,
/// 结束状态
TERM,
/// 可执行状态
READY,
/// 异常状态
EXCEPT
};
fiber类包含以下成员变量
/// 协程id
uint64_t m_id = 0;
/// 协程栈大小
uint32_t m_stacksize = 0;
/// 协程状态
State m_state = READY;
/// 协程上下文
ucontext_t m_ctx;
/// 协程栈地址
void *m_stack = nullptr;
/// 协程入口函数
std::function<void()> m_cb;
定义了两个全局静态变量,用于生成协程id和统计当前的协程数
/// 全局静态变量,用于生成协程id
static std::atomic<uint64_t> s_fiber_id{0};
/// 全局静态变量,用于统计当前的协程数
static std::atomic<uint64_t> s_fiber_count{0};
以下两个线程局部变量用于保存协程上下文信息
/// 线程局部变量,当前线程正在运行的协程
static thread_local Fiber *t_fiber = nullptr;
/// 线程局部变量,当前线程的主协程,切换到这个协程,就相当于切换到了主线程中运行,智能指针形式
static thread_local Fiber::ptr t_thread_fiber = nullptr;
构造函数
带参数的构造函数用于构造子协程,初始化子协程的ucontext_t上下文和栈空间,要求必须传入协程的入口函数,以及可选的协程栈大小。不带参的构造函数用于初始化当前线程的协程功能,构造线程主协程对象
/**
* @brief 构造函数
* @attention 无参构造函数只用于创建线程的第一个协程,也就是线程主函数对应的协程,
* 这个协程只能由GetThis()方法调用,所以定义成私有方法
*/
Fiber::Fiber(){
SetThis(this);
m_state = RUNNING;
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
++s_fiber_count;
m_id = s_fiber_id++; // 协程id从0开始,用完加1
SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber() main id = " << m_id;
}
/**
* @brief 构造函数,用于创建用户协程
* @param[] cb 协程入口函数
* @param[] stacksize 栈大小,默认为128k
*/
Fiber::Fiber(std::function<void()> cb, size_t stacksize)
: m_id(s_fiber_id++)
, m_cb(cb) {
++s_fiber_count;
m_stacksize = stacksize ? stacksize : g_fiber_stack_size->getValue();
m_stack = StackAllocator::Alloc(m_stacksize);
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
m_ctx.uc_link = nullptr;
m_ctx.uc_stack.ss_sp = m_stack;
m_ctx.uc_stack.ss_size = m_stacksize;
makecontext(&m_ctx, &Fiber::MainFunc, 0);
SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber() id = " << m_id;
}
/**
* @brief 返回当前线程正在执行的协程
* @details 如果当前线程还未创建协程,则创建线程的第一个协程,
* 且该协程为当前线程的主协程,其他协程都通过这个协程来调度,也就是说,其他协程
* 结束时,都要切回到主协程,由主协程重新选择新的协程进行resume
* @attention 线程如果要创建协程,那么应该首先执行一下Fiber::GetThis()操作,以初始化主函数协程
*/
Fiber::ptr GetThis(){
if (t_fiber) {
return t_fiber->shared_from_this();
}
Fiber::ptr main_fiber(new Fiber);
SYLAR_ASSERT(t_fiber == main_fiber.get());
t_thread_fiber = main_fiber;
return t_fiber->shared_from_this();
}
协程原语的实现,挂起和恢复
/**
* @brief 将当前协程切换到运行状态(与调度协程)
* @pre getState() != EXEC
* @post getState() = EXEC
*/
void Fiber::swapIn() {
SetThis(this);
SYLAR_ASSERT(m_state != EXEC);
m_state = EXEC;
if(swapcontext(&Scheduler::GetMainFiber()->m_ctx, &m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}
/**
* @brief 将当前协程切换到后台(与调度协程)
*/
void Fiber::swapOut() {
SetThis(Scheduler::GetMainFiber());
if(swapcontext(&m_ctx, &Scheduler::GetMainFiber()->m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}
/**
* @brief 将当前线程切换到执行状态(与主协程)
* @pre 执行的为当前线程的主协程
*/
void Fiber::call() {
SetThis(this);
m_state = EXEC;
SYLAR_LOG_ERROR(g_logger) << getId();
if(swapcontext(&t_threadFiber->m_ctx, &m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}
/**
* @brief 将当前线程切换到后台(与主协程)
* @pre 执行的为该协程
* @post 返回到线程的主协程
*/
void Fiber::back() {
SetThis(t_threadFiber.get());
if(swapcontext(&m_ctx, &t_threadFiber->m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}
协程入口函数
void Fiber::MainFunc() {
Fiber::ptr cur = GetThis(); // GetThis()的shared_from_this()方法让引用计数加1
SYLAR_ASSERT(cur);
cur->m_cb(); // 这里真正执行协程的入口函数
cur->m_cb = nullptr;
cur->m_state = TERM;
auto raw_ptr = cur.get(); // 手动让t_fiber的引用计数减1
cur.reset();
raw_ptr->yield(); // 协程结束时自动yield,以回到主协程
}
协程的重置
重复利用已结束的协程,复用其栈空间,创建新协程
void Fiber::reset(std::function<void()> cb) {
SYLAR_ASSERT(m_stack);
SYLAR_ASSERT(m_state == TERM);
m_cb = cb;
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
m_ctx.uc_link = nullptr;
m_ctx.uc_stack.ss_sp = m_stack;
m_ctx.uc_stack.ss_size = m_stacksize;
makecontext(&m_ctx, &Fiber::MainFunc, 0);
m_state = READY;
}