• [原创] KCP 源码分析(上)


    KCP 协议是一种可靠的传输协议,对比 TCP 取消了累计确认(延迟 ACK)、减小 RTO增长速度、选择性重传而非全部重传。通过用流量换取低时延。 KCP 中最重要的两个数据结构IKCPCB和IKCPSEG,一个IKCPCB对应一个 KCP 连接,通过这个结构体维护发送缓存、接收缓存、超时重传时间、窗口大小等。IKCPSEG 对应一个 KCP 数据包,包含该数据包的命令、数据、时间戳、数据长度等信息。源码地址:https://github.com/skywind3000/kcp

    KCP 数据包结构体:

    struct IKCPSEG
    {
    	struct IQUEUEHEAD node;
    	IUINT32 conv; 	// 会话 ID
    	IUINT32 cmd;	// KCP 命令:
        				// IKCP_CMD_ACK:这是个 ACK 
        				// IKCP_CMD_WASK:发送方探测接收方的窗口
        				// IKCP_CMD_WINS:接收方回应自己的窗口大小
        				
    	IUINT32 frg;	// fragment分段号,如果是流模式:默认为 0
    	IUINT32 wnd;	// 窗口大小
    	IUINT32 ts;		// 发送方:数据包的发送时间戳。 
      					// 接收方(ACK):所接受数据包的发送时间,而不是发送 ACK 的时间,方便发送方收到 ACK 后计算 rtt。
    	IUINT32 sn;		// 发送方:发送数据包的序列号
      					// 接收方(ACK):ACK 号
    	IUINT32 una;	// 未确认序列号:期待下次收到的数据包
    	IUINT32 len;	// 数据包除去头部的字节数
      
    /*-----------------以下成员不会实际发送到网络中,主要是超时重传和快速重传计算的辅助数据-----------------*/
    	IUINT32 resendts;	// = current + rto, 超时重传的阈值, 当前时间超过resendts, 就要重发这个数据包
    	IUINT32 rto;	// Retransmission Timeout, 下次超时重传的间隔时间, 会随着超时次数增加, 增加速率取决于是不是快速模式
    	IUINT32 fastack;// 数据包被跳过次数, 快速重传功能需要
    	IUINT32 xmit;	// 该数据包发送次数, transmit 的缩写, ,次数太多判断网络断开
    /*-----------------以上成员不会实际发送到网络中,主要是超时重传和快速重传计算的辅助数据-----------------*/
      
    	char data[1];	// 数据包携带的数据,大小根据ikcp_segment_new的参数决定
    };
    

    KCP 连接结构体:

    struct IKCPCB
    {
    	IUINT32 conv; 	// 会话ID
    	IUINT32 mtu; 	// 下层协议的最大传输单元, 一次发送若干个kcp包, 这些包的总长度不超过mtu
    	IUINT32 mss; 	// 一个KCP数据包的最大数据载荷, mss+head一定不超过mtu
    	IUINT32 state; 	// 连接状态
    
    	IUINT32 snd_una; 	// snd_una之前的包对方(接收方)都已经收到了
    	IUINT32 snd_nxt;  	// 下一个要从 send_que 发到 send_buf 的包序列号
    	IUINT32 rcv_nxt;	// 下一个要从 rcv_buf 发到 rcv_que 的包序列号
    
    	IUINT32 ts_recent; 	// 没用到
    	IUINT32 ts_lastack;	// 没用到
    
    	IUINT32 ssthresh;	// 拥塞窗口从慢启动转换到拥塞避免的窗口阈值
    	IINT32 rx_rttval;	// 近4次rtt和srtt的平均差值,反应了rtt偏离srtt的程度
    	IINT32 rx_srtt;		// 平滑的rtt,近8次rtt平均值
    	IINT32 rx_rto;		// 系统的重传超时时间
    	IINT32 rx_minrto; 	// 最小重传超时时间
    	
    	IUINT32 snd_wnd; 	// 发送窗口大小
    	IUINT32 rcv_wnd; 	// 接收窗口大小
    	IUINT32 rmt_wnd; 	// 对方接收窗口大小
    	IUINT32 cwnd; 		// 拥塞窗口大小
    	IUINT32 probe;		// 探测窗口大小
    
    	IUINT32 current;	// 当前时间戳
    	IUINT32 interval; 	// 内部flush刷新间隔
    	IUINT32 ts_flush; 	// 下一次刷新输出的时间戳
    	IUINT32 xmit;		// 该KCP连接超时重传次数
    
    	IUINT32 nrcv_buf; 	// rcv_buf的长度
    	IUINT32 nsnd_buf;	// snd_buf的长度
    	IUINT32 nrcv_que; 	// rcv_que的长度
    	IUINT32 nsnd_que; 	// snd_que的长度
    
    	IUINT32 nodelay;	// 是否启用nodelay模式, ==2为快速模式
    	IUINT32 updated;	// 是否调用过update函数
    	IUINT32 ts_probe; 	// 下次探测窗口大小的时间戳
    	IUINT32 probe_wait; // 探测窗口大小的间隔时间,每次探测对面窗口为0(失败), 探测时间*1.5
    	IUINT32 dead_link;	// 断开连接的重传次数阈值
    	IUINT32 incr; 		// k*mss , 拥塞窗口等于floor(k)
    	struct IQUEUEHEAD snd_queue;// 发送队列
    	struct IQUEUEHEAD rcv_queue;// 接收队列
    	struct IQUEUEHEAD snd_buf; // 发送缓存, 还没收到 ACK 的包都在这里边
    	struct IQUEUEHEAD rcv_buf; // 接收缓存, 将收到的数据暂存, 然后将其中连续的数据放到rcv_queue供上层读取
    	IUINT32 *acklist; 	// 一个整数数组,存放要回复的ack,
      						// 结构为 [sn0(接收数据包的序号), ts0(接收数据包的发送时间), sn1, ts1, ...]
    	IUINT32 ackcount; 	// 本次需要回复的ack个数
    	IUINT32 ackblock; 	// acklist的大小,会动态扩容,类似于 vector
    	void *user;			// 用户标识
    	char *buffer; 		// 数据缓冲区
    	int fastresend; 	// 快速重传的失序阈值, 发送方收到 fastresend 个冗余ACK就触发快速重传
    	int fastlimit;  	// 快速重传的次数限制
    	int nocwnd; 		// 0: 有拥塞控制, 1: 没有拥塞控制
    	int stream;			// 流模式
    	int logmask;
    	int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user); // 回调函数,数据发送到下层协议
    	void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
    };
    

    ikcp_send

    先来看发送方的用户接口:

    int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
    {
    	IKCPSEG *seg;
    	int count, // 需要装多少包
    		 i;
    
    	assert(kcp->mss > 0);
    	if (len < 0) return -1;
    
    	// 字节流模式,如果之前的包没装满,则先把之前的包装满。(粘包现象)
    	if (kcp->stream != 0) { 
    		if (!iqueue_is_empty(&kcp->snd_queue)) {
          // old:没有被装满的包
    			IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
    			if (old->len < kcp->mss) { // 前一个包没塞满, 粘包
    				int capacity = kcp->mss - old->len;
    				int extend = (len < capacity)? len : capacity;
    				seg = ikcp_segment_new(kcp, old->len + extend);
    				assert(seg);
    				if (seg == NULL) {
    					return -2;
    				}
    				iqueue_add_tail(&seg->node, &kcp->snd_queue);
    				memcpy(seg->data, old->data, old->len); // 把old数据转移到seg,然后把old删了
    				if (buffer) {
    					memcpy(seg->data + old->len, buffer, extend);
    					buffer += extend;
    				}
    				seg->len = old->len + extend;
    				seg->frg = 0;
    				len -= extend;
    				iqueue_del_init(&old->node); 
    				ikcp_segment_delete(kcp, old); // 删除 old 节点
    			}
    		}
    		if (len <= 0) {
    			return 0;
    		}
    	}
    
    	// 需要几个包来装len字节的数据, 一个包最多装mss字节
    	if (len <= (int)kcp->mss) count = 1;
    	else count = (len + kcp->mss - 1) / kcp->mss;
    
    	if (count >= (int)IKCP_WND_RCV) return -2;
    
    	if (count == 0) count = 1;
    
    	// 将buffer数据分段装入snd_queue
    	for (i = 0; i < count; i++) {
    		int size = len > (int)kcp->mss ? (int)kcp->mss : len;
    		seg = ikcp_segment_new(kcp, size);
    		assert(seg);
    		if (seg == NULL) {
    			return -2;
    		}
    		if (buffer && len > 0) {
    			memcpy(seg->data, buffer, size);
    		}
    		seg->len = size;
        	// 上层数据包被分段后的段号,如果开启流模式,默认段号都为 0
    		seg->frg = (kcp->stream == 0)? (count - i - 1) : 0; 
    		iqueue_init(&seg->node);
    		iqueue_add_tail(&seg->node, &kcp->snd_queue); // 将数据包 push 进发送队列
    		kcp->nsnd_que++;
    		if (buffer) {
    			buffer += size;
    		}
    		len -= size;
    	}
    
    	return 0;
    }
    

    ikcp_send 的主要逻辑就是将用户数据分段组装成 IKCPSEG 然后将其添加到发送队列。如果是流模式,则没有段号,每个包都是满的,数据有粘包需要用户自己处理。

    ikcp_update

    上层定时调用,主要功能是设置当前时间戳、计算下一次update 事件以及调用 ikcp_flush,ikcp_flush 才是将数据从发送队列发送到发送缓存的函数。

    ikcp_flush

    Step1、回应ACK

    void ikcp_flush(ikcpcb *kcp)
    {
        char *buffer = kcp->buffer; // 数据缓冲区
    	char *ptr = buffer;
     	IKCPSEG seg;
        seg.conv = kcp->conv;
    	seg.cmd = IKCP_CMD_ACK; // 命令为 ACK
    	seg.frg = 0;
    	seg.wnd = ikcp_wnd_unused(kcp); // 设置窗口大小
    	seg.una = kcp->rcv_nxt;
    	seg.len = 0;
    	seg.sn = 0;
    	seg.ts = 0;
        ...
    	count = kcp->ackcount; // 需要回复的 ack 个数
    	for (i = 0; i < count; i++) { 
    		size = (int)(ptr - buffer);
            // 如果buffer 放不下 seg 的 head ,那就先把 buffer 中的数据先发到网络中
    		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
    			ikcp_output(kcp, buffer, size); // 将buffer 中的数据先发到网络中
    			ptr = buffer;
    		}
            // 从 acklist 中取出要发送 ack 的 sn 和 ts
    		ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
            // 只将 seg 的 head 拷贝到数据缓冲区里,注意回应 ack 的报文没有 data。
    		ptr = ikcp_encode_seg(ptr, &seg); 
    	}
    
    	kcp->ackcount = 0;
      	...
    }
    

    这段代码主要功能就是回复 ACK,在接收数据的时候 kcp 会把需要回复的 ACK 放入 acklist,在这里检查kcp->ackcount,发现需要回复 ACK就从 acklist 中取出要发送 ack 的 sn 和 ts存入 seg 然后发送。

    Step2、探测窗口

    void ikcp_flush(ikcpcb *kcp)
    {
        char *buffer = kcp->buffer; // 数据缓冲区
    	char *ptr = buffer;
     	IKCPSEG seg;
        ...
    	// 对面没有接收缓存,等待probe_wait
    	if (kcp->rmt_wnd == 0) {
    		if (kcp->probe_wait == 0) { // 初始化探测窗口
    			kcp->probe_wait = IKCP_PROBE_INIT;
    			kcp->ts_probe = kcp->current + kcp->probe_wait;
    		}	
    		else {
    			if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {  // 已经到了探测时间
    				if (kcp->probe_wait < IKCP_PROBE_INIT) 
    					kcp->probe_wait = IKCP_PROBE_INIT;
    				kcp->probe_wait += kcp->probe_wait / 2; // 每次探测间隔增长 0.5 倍
    				if (kcp->probe_wait > IKCP_PROBE_LIMIT)
    					kcp->probe_wait = IKCP_PROBE_LIMIT;
    				kcp->ts_probe = kcp->current + kcp->probe_wait;
    				kcp->probe |= IKCP_ASK_SEND; // 标记需要探测窗口
    			}
    		}
    	}	else { // 一旦对方有,则重置探测时间和探测间隔
    		kcp->ts_probe = 0;
    		kcp->probe_wait = 0;
    	}
    	
        // 标记需要发送探测
    	if (kcp->probe & IKCP_ASK_SEND) {
    		seg.cmd = IKCP_CMD_WASK; // 命令设为 IKCP_CMD_WASK,其他头信息不需要
    		size = (int)(ptr - buffer);
    		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
    			ikcp_output(kcp, buffer, size);
    			ptr = buffer;
    		}
    		ptr = ikcp_encode_seg(ptr, &seg);
    	}
      	...
    }
    

    当发送方发现对方窗口大小为 0,需要发送探测命令询问对方窗口大小,每次探测间隔都会增长0.5 倍,一旦对方有接收窗口,则重置探测时间和探测间隔。

    Step3、回应探测窗口

    void ikcp_flush(ikcpcb *kcp)
    {
        char *buffer = kcp->buffer; // 数据缓冲区
    	char *ptr = buffer;
     	IKCPSEG seg;
        ...
        // 需要回应窗口大小
    	if (kcp->probe & IKCP_ASK_TELL) {
    		seg.cmd = IKCP_CMD_WINS;  // 命令设为 IKCP_CMD_WINS,其他头信息不需要
    		size = (int)(ptr - buffer);
    		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
    			ikcp_output(kcp, buffer, size);
    			ptr = buffer;
    		}
    		ptr = ikcp_encode_seg(ptr, &seg);
    	}
        ...
    }
    

    Step4、发送数据

    void ikcp_flush(ikcpcb *kcp)
    {
        char *buffer = kcp->buffer; // 数据缓冲区
    	char *ptr = buffer;
     	IKCPSEG seg;
        ...
        // 计算可以发多少数据
    	cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
        // kcp->nocwnd == 1 则关闭流控(拥塞控制)
    	if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
        // 如果 snd_nxt(下一个要从 send_que 发到 send_buf 的包序列号) 在发送窗口内
        // 就一直从 snd_que 中取出数据包放到 snd_buf 中, 直到snd_buf满或者snd_queue为空
    	while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
    		IKCPSEG *newseg;
    		if (iqueue_is_empty(&kcp->snd_queue)) break;
    
    		newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
    
    		iqueue_del(&newseg->node);
    		iqueue_add_tail(&newseg->node, &kcp->snd_buf);
    		kcp->nsnd_que--;
    		kcp->nsnd_buf++;
    
    		newseg->conv = kcp->conv;
    		newseg->cmd = IKCP_CMD_PUSH;
    		newseg->wnd = seg.wnd;
    		newseg->ts = current;
    		newseg->sn = kcp->snd_nxt++;
    		newseg->una = kcp->rcv_nxt;
    		newseg->resendts = current;
    		newseg->rto = kcp->rx_rto;
    		newseg->fastack = 0;
    		newseg->xmit = 0;
    	}
    
    	// resent:收到 resent 个失序 ACK 就会触发快速重传,TCP 里是冗余 ACK。
        // 如果没开启快速重传,则 resent 为 inf。
    	resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
        // 超时重传的最小超时时间
    	rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;
    
    	// 遍历snd_buf里的数据包是否需要发送
    	for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
    		IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
    		int needsend = 0; 
    		/* 
    			该数据包是否需要发送,三种情况:
    			1. 该数据包没发送过
    			2. 超时没有收到ack,触发超时重传
    			3. 收到resent次冗余ack,触发快速重传
    		*/
    
    		// 1. 该数据包第一次发送
    		if (segment->xmit == 0) {
    			needsend = 1;
    			segment->xmit++;
    			segment->rto = kcp->rx_rto; // 超时重传时间
    			segment->resendts = current + segment->rto + rtomin;
    		}
    
    		// 2. 该数据包超时没有收到ACK, 触发超时重传
    		else if (_itimediff(current, segment->resendts) >= 0) { 
    			needsend = 1;
    			segment->xmit++;
    			kcp->xmit++;
    			if (kcp->nodelay == 0) { // 普通模式,超时时间*2
    				segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
    			}	else { // 快速模式,超时时间*1.5
    				IINT32 step = (kcp->nodelay < 2)? 
    					((IINT32)(segment->rto)) : kcp->rx_rto;
    				segment->rto += step / 2;
    			}
    			segment->resendts = current + segment->rto;
    			lost = 1;
    		}
    
    		// 3. 该数据包被跳过的次数超过了fastresend, 触发快速重传
    		else if (segment->fastack >= resent) {  
    			if ((int)segment->xmit <= kcp->fastlimit || // 快速重传的限制,不能一直快速重传
    				kcp->fastlimit <= 0) {
    				needsend = 1;
    				segment->xmit++;
    				segment->fastack = 0;
    				segment->resendts = current + segment->rto;
    				change++;
    			}
    		}
    
    		if (needsend) {
    			int need;
    			segment->ts = current;
    			segment->wnd = seg.wnd;
    			segment->una = kcp->rcv_nxt;
    
    			size = (int)(ptr - buffer);
    			need = IKCP_OVERHEAD + segment->len; // 该数据包长度, 最大为head+mss
    
    			if (size + need > (int)kcp->mtu) {
    				ikcp_output(kcp, buffer, size);
    				ptr = buffer;
    			}
    
    			ptr = ikcp_encode_seg(ptr, segment);
    
    			if (segment->len > 0) {
    				memcpy(ptr, segment->data, segment->len);
    				ptr += segment->len;
    			}
    			// 某个数据包的传输次数超过了dead_link,则判断当前连接断开。
    			if (segment->xmit >= kcp->dead_link) {  // 断开连接
    				kcp->state = (IUINT32)-1;
    			}
    		}
    	}
    
    	// 把没有数据缓冲区的数据发送出去
    	size = (int)(ptr - buffer);
    	if (size > 0) {
    		ikcp_output(kcp, buffer, size);
    	}
    
    	// 如果触发了快速重传,减小拥塞窗口(快速恢复)
    	if (change) {
    		IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
    		kcp->ssthresh = inflight / 2;
    		if (kcp->ssthresh < IKCP_THRESH_MIN)
    			kcp->ssthresh = IKCP_THRESH_MIN;
    		kcp->cwnd = kcp->ssthresh + resent;
    		kcp->incr = kcp->cwnd * kcp->mss;
    	}
    
    	// 超时重传,丢包了,重置拥塞窗口和ssthresh。
    	if (lost) {
    		kcp->ssthresh = cwnd / 2;
    		if (kcp->ssthresh < IKCP_THRESH_MIN)
    			kcp->ssthresh = IKCP_THRESH_MIN;
    		kcp->cwnd = 1;
    		kcp->incr = kcp->mss;
    	}
    
    	if (kcp->cwnd < 1) {
    		kcp->cwnd = 1;
    		kcp->incr = kcp->mss;
    	}
        ...
    }
    

    首先确定发送窗口 cwnd = min(kcp->snd_wnd, kcp->rmt_wnd); 接着检查是否开启流控(拥塞控制) if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);通过取消拥塞控制可以进一步降低延迟。

    然后从snd_queue中取出数据包放到snd_buf中, 直到snd_buf满或者snd_queue为空。然后依次检查snd_buf 里的数据包需不需要发送,需要发送有三种情况:

    1. 该数据包首次发送。
    2. 触发超时重传:时间超过超时重传的阈值。超时重传普通模式下:每次超时,超时重传的时间就翻倍。在快速模式下:每次超时的重传时间翻 0.5 倍。降低传输时延同时重置拥塞窗口和ssthresh。
    3. 触发快速重传:收到了该数据包的resent次的失序(冗余)ACK。该数据包被跳过的次数超过了fastresend, 触发快速重传。同时减小拥塞窗口为原来的一半,ssthresh也设置为这个值。

    __EOF__

  • 本文作者: hellozhangjz
  • 本文链接: https://www.cnblogs.com/hellozhangjz/p/18075534
  • 关于博主: 评论和私信会在第一时间回复。或者直接私信我。
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。
  • 相关阅读:
    2022年的就业前景
    从精装到智装,下一波浪潮浮现,听听智能家居的大咖们怎么说?
    async/await 贴脸输出,这次你总该明白了
    Qt5开发及实例V2.0-第十五章-Qt单元测试框架
    看了这篇MySQL,开发功力又升级
    Java后端大写字段传到前端,或者使用postman调用后,返回变为小写
    软件加密系统Themida应用程序保护指南(二):有哪些保护选项
    PHP redis sorted set 有序集合
    BK698CPA15B0 创建了通用电气数字工业发展指数
    【附源码】计算机毕业设计SSM网上购物平台
  • 原文地址:https://www.cnblogs.com/hellozhangjz/p/18075534