从IO同步和异步的优缺点分析如下:
IO同步优点就是sockfd管理方便,操作逻辑清晰;缺点是程序依赖epoll_wait的循环响应速度,程序性能差。
IO异步优点就是子模块好规划,程序性能高;缺点就是逻辑理解有点难度,还会出现多个线程共用一个sockfd,此时需要避免在IO操作时,出现sockfd出现关闭或其它异常。
有没有一种方式,同步的方式实现了异步的性能呢?那就是下文所说的协程。
协程切换核心就是yield(让出)与resume(恢复)来实现协程上下文切换,实现有以下3种方法。
(1)longjmp和setjmp
(2)ucontext
(3)汇编实现跳转
本文使用第三种汇编实现,yied = switch(a,b),resume = switch(b,a),根据不同的处理器的汇编指令实现switch的操作,比如x64_86如下。
- _asm__(
- " .text \n"
- " .p2align 4,,15 \n"
- ".globl _switch \n"
- ".globl __switch \n"
- "_switch: \n"
- "__switch: \n"
- " movq %rsp, 0(%rsi) # 从rsp存到rsi寄存器 \n"
- " movq %rbp, 8(%rsi) # 移动8个字节,一个指针是8个字节 \n"
- " movq (%rsp), %rax # save insn_pointer \n"
- " movq %rax, 16(%rsi) \n"
- " movq %rbx, 24(%rsi) # save rbx,r12-r15 \n"
- " movq %r12, 32(%rsi) \n"
- " movq %r13, 40(%rsi) \n"
- " movq %r14, 48(%rsi) \n"
- " movq %r15, 56(%rsi) \n"
- " movq 56(%rdi), %r15 \n"
- " movq 48(%rdi), %r14 \n"
- " movq 40(%rdi), %r13 # restore rbx,r12-r15 \n"
- " movq 32(%rdi), %r12 \n"
- " movq 24(%rdi), %rbx \n"
- " movq 8(%rdi), %rbp # restore frame_pointer \n"
- " movq 0(%rdi), %rsp # restore stack_pointer \n"
- " movq 16(%rdi), %rax # restore insn_pointer \n"
- " movq %rax, (%rsp) \n"
- " ret \n"
- );
- //64位系统,一个指针是8个字节
x86 _64 的寄存器有 16 个 64 位寄存器,分别是 :%rax, %rbx,%rcx, %esi, %edi, %rbp, %rsp, %r8, %r9,%r10, %r11, %r12, %r13, %r14, %r15 ,%rax。
(1)%rax作为函数返回值使用的
(2)%rsp 栈指针寄存器 指向栈顶
(3)%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函数参数 依次对应第 1 参数 第 2 参数。。。
(4)%rbx, %rbp, %r12, %r13, %r14, %r15用作数据存储
协程上下文切换,就是将 CPU 的寄存器暂时保存,再将即将运行的协程的上下文寄存器分别mov到cpu对应的寄存器上。
协程的上下文结构体
- typedef struct _nty_cpu_ctx {
- void *esp; //栈指针指向-->stack
- void *ebp;
- void *eip;//指向回调函数入口
- void *edi;//参数
- void *esi;
- void *ebx;
- void *r1;
- void *r2;
- void *r3;
- void *r4;
- void *r5;
- } nty_cpu_ctx;
cpu中有一个非常重要的寄存器eip,用来存储cpu运行的下一条指令地址,可以把回调函数的地址存储到eip。
一个协程核心结构体如下
- typedef struct _nty_coroutine {
- nty_cpu_ctx ctx;
- proc_coroutine func;
- void *arg;
- size_t stack_size;
- nty_coroutine_status status;
- nty_schedule *sched;
- uint64_t birth;
- uint64_t id;
- void *stack;
- RB_ENTRY(_nty_coroutine) sleep_node;
- RB_ENTRY(_nty_coroutine) wait_node;
- TAILQ_ENTRY(_nty_coroutine) ready_next;
- TAILQ_ENTRY(_nty_coroutine) defer_next;
- } nty_coroutine;
(1)context,上下文,切换用的
(2)stack,每个协程的栈,协程内部用来做函数压栈
(3)size,协程栈的大小
(4)func,协程入口函数
(5)arg,入口函数的参数
(6)wait(等待集),等待IO就绪,等待集合采用红黑树存储
(7)sleep(睡眠树),采用红黑树存储
(8)ready(就绪集合),采用队列ready_queue存储
(9)status 状态
协程有3种状态:就绪、睡眠、等待;新创建的协程,创建完成后,加入就绪集合,等待调度器的调度;协程在运行完成后,进行IO操作,此时IO并未准备好,进入等待状态集合;IO准备就绪,协程开始运行,后续进行sleep操作,此时进入到睡眠状态集合。
调度器主要实现协程的切换,当IO准备就绪时,切换到该IO对应的协程,调度器的结构体如下。
- typedef struct _nty_coroutine_queue nty_coroutine_queue;
- typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
- typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;
- typedef struct _nty_schedule {
- uint64_t birth; nty_cpu_ctx ctx;
- struct _nty_coroutine *curr_thread;
- int page_size;
- int poller_fd;
- int eventfd;
- struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
- int nevents;
- int num_new_events;
- nty_coroutine_queue ready;
- nty_coroutine_rbtree_sleep sleeping;
- nty_coroutine_rbtree_wait waiting;
- } nty_schedule;
调度器从3部分来得到就绪IO的协程:就绪集合、睡眠集合、等待集合,代码如下。
- void nty_schedule_run(void) {
-
- nty_schedule *sched = nty_coroutine_get_sched();
- if (sched == NULL) return ;
-
- while (!nty_schedule_isdone(sched)) {
-
- // 1. expired --> sleep rbtree 睡眠等待时间
- nty_coroutine *expired = NULL;
- while ((expired = nty_schedule_expired(sched)) != NULL) {
- nty_coroutine_resume(expired);//那些时间到期了,恢复协程的运行
- }
- // 2. ready queue 就绪队列
- nty_coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _nty_coroutine_queue);
- while (!TAILQ_EMPTY(&sched->ready)) {
- //从就绪队列拿出第一个节点
- nty_coroutine *co = TAILQ_FIRST(&sched->ready);
- TAILQ_REMOVE(&co->sched->ready, co, ready_next);
-
- if (co->status & BIT(NTY_COROUTINE_STATUS_FDEOF)) {
- nty_coroutine_free(co);
- break;
- }
-
- nty_coroutine_resume(co);//恢复协程的运行
- if (co == last_co_ready) break;
- }
-
- // 3. wait rbtree IO等待 其他协程让出后,回到调度器这里
- //调度器处理IO等待
- nty_schedule_epoll(sched);//调用epoll_wait,监听就绪IO,sched->num_new_events就是IO事件数量
- while (sched->num_new_events) {
- int idx = --sched->num_new_events;
- struct epoll_event *ev = sched->eventlist+idx;
-
- int fd = ev->data.fd;
- int is_eof = ev->events & EPOLLHUP;
- if (is_eof) errno = ECONNRESET;
-
- nty_coroutine *co = nty_schedule_search_wait(fd);//通过fd,从红黑树中获取对应的coroutine
- if (co != NULL) {
- if (is_eof) {
- co->status |= BIT(NTY_COROUTINE_STATUS_FDEOF);
- }
- nty_coroutine_resume(co);//恢复,返回到协程
- 的运行
- }
-
- is_eof = 0;
- }
- }
-
- nty_schedule_free(sched);
-
- return ;
- }
fd如何知道就绪?
创建协程时,把fd添加到epoll进行管理,然后yied让出给调度器,由调度器resume到IO就绪的协程。其实调度器通过epoll_wait()监听IO是否就绪,得到就绪的fd,通过fd从红黑树中获取对应的协程,再通过resume()回到该就绪fd对应的协程,该协程继续执行accept/recv/send等阻塞API。
协程接口分为两部分
(1)协程本身的API
创建协程:int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg); 运行调度器:void nty_schedule_run(void);
(2)posix的API
对于需要等待IO就绪的的网络IO的操作函数需重新封装使用,而不是直接使用系统提供的,对于不需等待就绪的,可以不进行封装,需要的有:accept,connect,send,write,sendto,recv,read,recvfrom;不需要:socket,close,fcntl,setsocketopt,getsocketopt,listen。
(1)多进程
每一个进程亲和一个cpu,通过sched_setaffinity函数设置亲和力
(2)多线程
需要对调度器进行加锁
sleep_rbtree中取出超时的节点,进行加锁mutex
ready_queue中取出就绪的节点,进行springlock
wait_rbtree中获取就绪IO可以使用mutex
(3)x86指令,未实现。
这里为啥要介绍hook呢?因为使用协程时要把posix的API重新进行封装,所以可以使用hook劫持posix的API封装成自己的函数,hook钩子函数可以劫持两类函数:
(1)系统的函数,使用dlsym();
(2)第三方的库函数,使用dlopen();
初始化hook后,就把系统的函数劫获,运行时执行的是自己定义的函数,而不是系统的函数(类似重定向),特别注意的是,其他的应用程序调用系统函数,不会执行当前应用程序对应定义hook函数,因为当前的应用程序调用系统函数时,只执行对应定义的函数,这个只限于当前应用程序。
例子1:协程+mysql,不去修改mysql-dev,使用hook来重新定义connect、read、recv、send、write等函数;
例子2:hook来劫持malloc和free检查内存泄露;
例子3:nginx运行在dpdk也是使用hook的方法;