• Linux网络设计之协程原理


    一、协程的意义

    协程可以看作一个轻量级的线程,能自己实现调度。有一些轻量的场景,如网络刷新、网络加载、UI刷新、IO读写操作等,可以不需要开启一个线程去执行;线程或进程的调度较重,只需要一个轻量级的线程来维护业务代码,使业务代码更加的轻便灵活;这就是协程的意义。协程,简单的说,就是一个具有异步的性能,却使用同步编程方式的组件。使用者调用协程可以很好的管理业务代码,整个执行过程清晰明了。
    在这里插入图片描述

    二、异步的执行流程

    多线程异步操作,就是将不同的操作放到不同的线程中进行。异步带来的好处是子模块好规划、程序性能高;缺点是模块间的数据管理异常麻烦。

    服务端
    客户端
    返回结果
    发起请求
    服务程序
    线程1
    线程2

    多线程异步简单示例代码:

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <errno.h>
    #include <fcntl.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <sys/epoll.h>
    #include <netdb.h>
    #include <arpa/inet.h>
    #include <pthread.h>
    
    #define ASYNC_EVENT_LENGTH 1024
    
    struct context {
    	int epfd;
    	pthread_t thid;
    };
    
    void *asyn_callback(void * arg)
    {
    	struct context *ctx=(struct context*)arg;
    	while(1)
    	{
    		struct epoll_event events[ASYNC_EVENT_LENGTH] = { 0 };
    		int nready=epoll_wait(ctx->epfd,events,ASYNC_EVENT_LENGTH,-1);
    		if (nready < 0)
    		{
    			if (errno == EINTR || errno == EAGAIN)
    				continue;
    			else
    				break;
    		}
    		else if (nready == 0)
    			continue;
    
    		int i = 0;
    		for (i = 0; i < nready;i++)
    		{
    			int clientfd = events[i].data.fd;
    			if (events[i].events &EPOLLIN)
    			{
    				char buffer[1024] = { 0 };
    				struct sockaddr_in addr;
    				size_t addr_len = sizeof(struct sockaddr_in);
    
    				// 从读缓冲区中读取数据
    				int n = recvfrom(clientfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);
    
    				printf("recvfrom n : %d\n", n);
    
    				// 解析数据
    				parse_response(buffer);
    
    				// 删除事件监听
    				epoll_ctl(ctx->epfd, EPOLL_CTL_DEL, clientfd, NULL);
    
    				// 关闭fd
    				close(clientfd);
    			}
    		}
    	}
    	return NULL;
    }
    
    int asyn_commit(struct context *ctx)
    {
    	// 创建 socket
    	int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    	if (sockfd < 0) {
    		perror("create socket failed\n");
    		exit(-1);
    	}
    	// 配置socket相关信息
    	struct sockaddr_in dest;
    	bzero(&dest, sizeof(dest));
    	dest.sin_family = AF_INET;
    	dest.sin_port = htons(53);
    	dest.sin_addr.s_addr = inet_addr(DNS_SVR);
    
    	// connect目标,探路
    	int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));
    	printf("connect :%d\n", ret);
    
    	// 准备协议
    	//......
    	
    	// 发送数据
    	int slen = sendto(sockfd, request, req_len, 0, (struct sockaddr*)&dest, sizeof(struct sockaddr));
    
    	// 加入epoll中,监测结果返回
    	struct epoll_event ev;
    	ev.events = EPOLLIN;
    	ev.data.fd = sockfd;
    
    	return epoll_ctl(ctx->epfd, EPOLL_CTL_ADD, sockfd, &ev);
    
    }
    
    int asyn_init(struct context *ctx)
    {
    	ctx->epfd=epoll_create(1);
    	pthread_create(&ctx->thid,NULL,asyn_callback,ctx);
    }
    
    
    int main(int argc;char *argv[])
    {
    	struct context ctx;
    	asyn_init(&ctx);
    	for(int i=0;i<100;i++)
    	{
    			asyn_commit();
    	}
    	
    	getchar();
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120

    协程就是要在一个线程中实现异步操作。

    服务端
    客户端
    协程调度器
    调度
    返回结果
    发起请求
    发起请求
    返回结果
    服务程序
    协程1
    协程2

    三、协程的基本操作

    协程主要有几个操作:创建(create)、让出(yield)和恢复(resume)。这后两个操作使用一个关键的动作,switch,即切换。
    (1)将socketfd添加到epoll中管理。
    (2)然后切换上下文,由协程上下文切换到调度器上下文,这个过程称为让出(yield)。
    (3)调度器获取下一个协程上下文,恢复(resume)新的协程。
    如此达到异步的操作。
    调度器与协程的上下文切换如下图:

    IO异步操作nty_send
    IO异步操作nty_recv
    调度器
    yield
    epoll_ctl add
    send
    epoll_ctl del
    yield
    epoll_ctl add
    recv
    epoll_ctl del
    resume
    epoll_wait

    epoll_ctl() add和del动作能够保证 sockfd 只在一个上下文中能够操作 IO ;不会出现在多个上下文同时对一个 IO 进行操作。
    IO异步操作上下文切换时序图:

    coroutine1 coroutine2 coroutine3 scheduler resume yield resume yield resume yield coroutine1 coroutine2 coroutine3 scheduler

    3.1、“切换”的方式–switch

    执行切换有三种方式:
    (1)longjmp / setjmp
    (2)ucontext
    (3)汇编
    ”切换“不能使用goto。goto只能在栈内跳转,只能在函数内,不能跨函数。

    汇编实现switch可以参考Linux kernel的任务调度方式,在“切换”前保存当前的上下文信息再加载要执行的上下文信息。x86_64 的寄存器有 16 个 64 位寄存器:rax,rbx,rdi,rsi,rdx,rcx,r8,r9,r10,r11,r12,r13,r14,r15,rbp,rsp。
    其中:
    rax:存储函数的返回值;
    rdi,rsi,rdx,rcx,r8,r9:函数的六个参数,如果函数的参数超过六个,那么六个以后的参数会入栈。
    rbp:栈指针寄存器,指向栈底;
    rsp:栈指针寄存器,指向栈顶。
    其余的用作数据存储。
    eip:指令指针寄存器,指向CPU要执行的下一个指令。
    例如,对于X86-64的汇编切换代码:

    __asm__ (
    "    .text                                  \n"
    "       .p2align 4,,15                                   \n"
    ".globl _switch                                          \n"
    ".globl __switch                                         \n"
    "_switch:                                                \n"
    "__switch:                                               \n"
    "       movq %rsp, 0(%rsi)      # save stack_pointer     \n"
    "       movq %rbp, 8(%rsi)      # save frame_pointer     \n"
    "       movq (%rsp), %rax       # save insn_pointer      \n"
    "       movq %rax, 16(%rsi)                              \n"
    "       movq %rbx, 24(%rsi)     # save rbx,r12-r15       \n"
    "       movq %r12, 32(%rsi)                              \n"
    "       movq %r13, 40(%rsi)                              \n"
    "       movq %r14, 48(%rsi)                              \n"
    "       movq %r15, 56(%rsi)                              \n"
    "       movq 56(%rdi), %r15                              \n"
    "       movq 48(%rdi), %r14                              \n"
    "       movq 40(%rdi), %r13     # restore rbx,r12-r15    \n"
    "       movq 32(%rdi), %r12                              \n"
    "       movq 24(%rdi), %rbx                              \n"
    "       movq 8(%rdi), %rbp      # restore frame_pointer  \n"
    "       movq 0(%rdi), %rsp      # restore stack_pointer  \n"
    "       movq 16(%rdi), %rax     # restore insn_pointer   \n"
    "       movq %rax, (%rsp)                                \n"
    "       ret                                              \n"
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    3.2、create:创建协程

    (1)如果调度器不存在,则创建调度器。调度器作为全局实例。
    (2)分配协程内存空间,并设置协程的数据项。如协程的栈空间、栈大小、子过程回调函数、子过程回调参数等等。
    (3)将新创建的协程添加到就绪队列中。

    3.3、yield:让出CPU

    切换到最近执行 resume 的上下文。

    3.4、resume:恢复协程运行权

    切换到运行协程实例的 yield 的位置。
    resume 与 yield 是两个可逆过程的原子操作。

    四、协程的定义

    协程一般包含几个内容:
    (1)协程ID
    (2)协程上下文
    (3)协程入口函数
    (4)协程的状态
    (5)协程的栈空间
    (6)返回值
    (7)状态集合

    struct coroutine{
    	uint64_t birth;//创建时间
    	uint64_t id;//协程ID
    	
    	struct context ctx;//上下文
    	
    	void *(*func) (void*);//子过程回调函数
    	void *arg;//回调函数参数
    	
    	struct nty_coroutine_status status;// 协程的状态
    	
    	void *stack;// 栈
    	size_t stack_length;//栈大小
    	
    	nty_schedule *sched;//调度器
    	
    	//状态集合
    	struct rbtree_node wait;
    	struct queue_node ready;
    	struct rbtree_node sleep;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    五、调度器的定义

    typedef struct _nty_coroutine_queue nty_coroutine_queue;
    typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
    typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;
    
    typedef struct _nty_schedule {
    	uint64_t birth;//创建时间
    	nty_cpu_ctx ctx;//上下文
    	
    	struct _nty_coroutine *curr_thread;//当前运行的协程
    	
    	int page_size;
    	
    	// epoll 管理
    	int poller_fd;
    	int eventfd;
    	struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
    	int nevents;
    	int num_new_events;
    	
    	//状态集合
    	nty_coroutine_queue ready;
    	nty_coroutine_rbtree_sleep sleeping;
    	nty_coroutine_rbtree_wait waiting;
    } nty_schedule;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    总结

    协程可以让业务代码易于管理,整个流程清晰;自己实现调度器,在单线程中实现异步调度。协程的底层还是使用epoll等IO多路复用器,性能上只能趋近reactor。
    协程的单核运行过程:

    2014-01-07 2014-01-09 2014-01-11 2014-01-13 2014-01-15 2014-01-17 2014-01-19 2014-01-21 2014-01-23 2014-01-25 2014-01-27 2014-01-29 2014-01-31 2014-02-01 2014-02-03 2014-02-05 2014-02-07 2014-02-09 协程1执行 协程2执行 协程5执行 协程3执行 协程4执行 调度器 协程的单核运行过程

    在这里插入图片描述

  • 相关阅读:
    Bellman-Ford算法与SPFA算法详解
    家政服务接单小程序开发源码 家政保洁上门服务小程序源码 开源完整版
    Nginx + tomcat 的搭建
    自己本地写完代码后,不要直接git pull拉远程代码,会导致代码丢失
    快速搭建Jenkins自动化集成cicd工具
    Springboot+mybatis 完整实现CRUD练习项目(带service分层)
    手把手带你学python—牛客网python 机器学习 使用梯度下降对逻辑回归进行训练
    如何解决系统中可能存在的性能问题?
    面试的经历
    5分钟就能实现的API监控,有什么理由不做呢?
  • 原文地址:https://blog.csdn.net/Long_xu/article/details/126568828