• TCP协议之《乱序队列Out-Of-Order》


    乱序的数据包被保存在TCP套接口的out_of_order_queue队列中,以红黑树组织,套接口成员ooo_last_skb缓存了此红黑树的最后一个成员,方便内核后续添加新的成员。

    struct tcp_sock {
        /* OOO segments go in this rbtree. Socket lock must be held. */
        struct rb_root  out_of_order_queue;
        struct sk_buff  *ooo_last_skb; /* cache rb_last(out_of_order_queue) */
    }

    一、乱序报文入队
    内核函数tcp_data_queue负责套接口数据的接收,对于开始序号sequence位于套接口下一个要接收的序列后之后的数据包,并且其开始序号在接收窗口之内,调用函数tcp_data_queue_ofo将其添加到TCP乱序队列中。

    static void tcp_data_queue(struct sock *sk, struct sk_buff *skb)
    {
        if (!before(TCP_SKB_CB(skb)->seq, tp->rcv_nxt + tcp_receive_window(tp)))
            goto out_of_window;
     
        tcp_data_queue_ofo(sk, skb);
    }
    接下来看一下函数tcp_data_queue_ofo,如果是第一个要添加的乱序报文,out_of_order_queue队列为空,将其插入队列中,并且更新最后一个skb指针ooo_last_skb。另外,如果此套接口连接支持SACK,使用乱序数据包的起始和结束序列号初始化一个SACK块。

    static void tcp_data_queue_ofo(struct sock *sk, struct sk_buff *skb)
    {
        if (RB_EMPTY_ROOT(&tp->out_of_order_queue)) {
            /* Initial out of order segment, build 1 SACK. */
            if (tcp_is_sack(tp)) {
                tp->rx_opt.num_sacks = 1;
                tp->selective_acks[0].start_seq = seq;
                tp->selective_acks[0].end_seq = end_seq;
            }
            rb_link_node(&skb->rbnode, NULL, p);
            rb_insert_color(&skb->rbnode, &tp->out_of_order_queue);
            tp->ooo_last_skb = skb;
            goto end;
        }
    }

    如果out_of_order_queue队列不为空,内核首先尝试将此数据包与队列中的最后一个数据包进行合并,因为典型的情况下,接收到的数据包应当位于之前数据包的后面,这样做又可避免整个队列的遍历。如果接收skb数据包的开始序号等于队列中最后一个数据包的结束序号,将skb合并到队列最后的数据包中。如果接收skb数据包的开始序位于队列最后一个数据包的结束序号之后,将skb插入到最后数据包所在红黑树节点的右侧。

    static void tcp_data_queue_ofo(struct sock *sk, struct sk_buff *skb)
    {
        if (tcp_try_coalesce(sk, tp->ooo_last_skb, skb, &fragstolen)) {
    coalesce_done:
            tcp_grow_window(sk, skb);
            kfree_skb_partial(skb, fragstolen);
            skb = NULL;
            goto add_sack;
        }
     
        if (!before(seq, TCP_SKB_CB(tp->ooo_last_skb)->end_seq)) {
            parent = &tp->ooo_last_skb->rbnode;
            p = &parent->rb_right;
            goto insert;
        }
    }

    以上条件都不成立的话,内核将遍历红黑树,查找数据包的插入位置。从队列out_of_order_queue头部开始遍历,如果数据包开始序号位于节点数据包开始序号的之前,向树的左侧遍历,否则,向树右侧遍历。在此过程中,如果数据包的开始序号位于节点开始序号之后,并且数据包的结束序号位于节点数据包的结束序号之前,说明节点数据包包含有此接收数据包,不执行插入,设置套接口的重复SACK块,跳出遍历。

    如果数据包的开始序号位于节点数据包的开始序号之后,并且结束序号位于节点数据包结束序号之后,表明两个数据包有部分数据重叠,内核使用接收数据包的开始序号和节点数据包的结束序号设置套接口的重复SACK块,继续遍历知道队列结束或者其它情况发生,所以在out_of_order_queue树中的数据包由可能存在部分重叠的情况。

    再一种情况是,接收数据包的开始序号与节点数据包的开始序号相同,并且其结束序号位于节点数据包的结束序号之后,即接收数据包完全包含节点数据包,使用新接收数据包替换掉树中的节点数据包,调用函数tcp_dsack_extend扩展套接口SACK设置,跳出遍历。

        while (*p) {
            parent = *p;
            skb1 = rb_to_skb(parent);
            if (before(seq, TCP_SKB_CB(skb1)->seq)) {
                p = &parent->rb_left;
                continue;
            }   
            if (before(seq, TCP_SKB_CB(skb1)->end_seq)) {
                if (!after(end_seq, TCP_SKB_CB(skb1)->end_seq)) {
                    __kfree_skb(skb);
                    skb = NULL;
                    tcp_dsack_set(sk, seq, end_seq);
                    goto add_sack;
                }   
                if (after(seq, TCP_SKB_CB(skb1)->seq)) {
                    tcp_dsack_set(sk, seq, TCP_SKB_CB(skb1)->end_seq);
                } else {
                    /* skb's seq == skb1's seq and skb covers skb1.
                     * Replace skb1 with skb.
                     */
                    rb_replace_node(&skb1->rbnode, &skb->rbnode, &tp->out_of_order_queue);
                    tcp_dsack_extend(sk, TCP_SKB_CB(skb1)->seq, TCP_SKB_CB(skb1)->end_seq);
                    __kfree_skb(skb1);
                    goto merge_right;
                }
            } else if (tcp_try_coalesce(sk, skb1, skb, &fragstolen)) {
                goto coalesce_done;
            }
            p = &parent->rb_right;
        }

    最后一种情况,接收数据包的开始序号位于节点开始序号和结束序号之后,内核调用函数tcp_try_coalesce尝试将接收数据包合并到节点数据包中。

    在以上找到了合适的插入位置之后,parent为父节点,p表示左侧或者右侧节点,将接收数据包插入到红黑树中。

    insert:
        /* Insert segment into RB tree. */
        rb_link_node(&skb->rbnode, parent, p);
        rb_insert_color(&skb->rbnode, &tp->out_of_order_queue);
    插入完成之后,需要遍历skb节点之后的其它节点,以检查是否有其它节点位置的数据包被本数据包所覆盖。如果有覆盖,内核将擦除被覆盖的节点,并且释放节点数据包,调用函数tcp_dsack_extend扩展套接口SACK设置。如果只是部分重叠,仅做SACK扩展。如果数据包skb为最后一个树节点,更新ooo_last_skb指针。

    merge_right:
        /* Remove other segments covered by skb. */
        while ((skb1 = skb_rb_next(skb)) != NULL) {
            if (!after(end_seq, TCP_SKB_CB(skb1)->seq))
                break;
            if (before(end_seq, TCP_SKB_CB(skb1)->end_seq)) {
                tcp_dsack_extend(sk, TCP_SKB_CB(skb1)->seq, end_seq);
                break;
            }
            rb_erase(&skb1->rbnode, &tp->out_of_order_queue);
            tcp_dsack_extend(sk, TCP_SKB_CB(skb1)->seq, TCP_SKB_CB(skb1)->end_seq);
            tcp_drop(sk, skb1);
        }
        if (!skb1)
            tp->ooo_last_skb = skb;

    二、乱序报文出队列
    在tcp_data_queue接收到保序的数据包之后,调用函数tcp_ofo_queue检查out_of_order_queue队列中是否有数据包可用。即新接收的数据包添上了套接口下一个接收序号与out_of_order_queue队列中数据包序号之间的空缺,使out_of_order_queue中的部分数据包变成序号连续的可用数据包。如果以上成立的话,内核将马上发送ACK确认报文,以通知对端新的请求数据包序号。

    static void tcp_data_queue(struct sock *sk, struct sk_buff *skb)
    {
        if (TCP_SKB_CB(skb)->seq == tp->rcv_nxt) {
            if (!RB_EMPTY_ROOT(&tp->out_of_order_queue)) {
                tcp_ofo_queue(sk);
                
                if (RB_EMPTY_ROOT(&tp->out_of_order_queue))
                    inet_csk(sk)->icsk_ack.pingpong = 0;
            }   
    }
    如下为tcp_ofo_queue遍历函数,首先如果第一个树节点的数据开始序号在套接口的下一个要接收的序号之后,表明两者之间还有缺口,跳出循环。如果节点数据包的结束序号位于套接口的下一个要接收序号之前,表明此数据包的数据也被正确接收到,释放此节点数据包。

    static void tcp_ofo_queue(struct sock *sk)
    {
        p = rb_first(&tp->out_of_order_queue);
        while (p) {
            skb = rb_to_skb(p);
            if (after(TCP_SKB_CB(skb)->seq, tp->rcv_nxt))
                break;
            
            if (before(TCP_SKB_CB(skb)->seq, dsack_high)) {
                __u32 dsack = dsack_high;
                if (before(TCP_SKB_CB(skb)->end_seq, dsack_high))
                    dsack_high = TCP_SKB_CB(skb)->end_seq; 
                tcp_dsack_extend(sk, TCP_SKB_CB(skb)->seq, dsack);
            } 
            p = rb_next(p);
            rb_erase(&skb->rbnode, &tp->out_of_order_queue);
            
            if (unlikely(!after(TCP_SKB_CB(skb)->end_seq, tp->rcv_nxt))) {
                SOCK_DEBUG(sk, "ofo packet was already received\n");
                tcp_drop(sk, skb);
                continue;
            }

    接下来,tcp_ofo_queue函数尝试将sk_receive_queue接收队列的最后一个数据包tail与节点数据包skb进行合并tcp_try_coalesce。如果合并成功,调用kfree_skb_partial释放skb,fragstolen表示合并了数据包的线性缓存部分的数据,但是共享缓存中的数据继续在使用,所以不能够释放。如果合并失败,将节点的skb添加到sk_receive_queue队列的末尾。

            tail = skb_peek_tail(&sk->sk_receive_queue);
            eaten = tail && tcp_try_coalesce(sk, tail, skb, &fragstolen);
            tcp_rcv_nxt_update(tp, TCP_SKB_CB(skb)->end_seq);
            fin = TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN;
            if (!eaten)
                __skb_queue_tail(&sk->sk_receive_queue, skb);
            else
                kfree_skb_partial(skb, fragstolen);
            
            if (unlikely(fin)) {
                tcp_fin(sk);
                /* tcp_fin() purges tp->out_of_order_queue,
                 * so we must end this loop right now.
                 */
                break;
            }
        }   
    }  

    如果节点数据包带有TCP的FIN标志,调用tcp_fin函数处理,在其中将清空out_of_order_queue队列。

    void tcp_fin(struct sock *sk)
    {
        skb_rbtree_purge(&tp->out_of_order_queue);
    }


    ofo队列整理与丢弃

    在TCP套接口接收数据过程中,如果套接口接收缓存已经大于限定的套接口缓存限值,或者TCP系统占用的缓存已超过限定的总阈值,内核将使用tcp_prune_queue函数尝试回收接收队列占用的缓存。首先使用tcp_collapse_ofo_queue函数尝试合并out_of_order_queue队列中的重复数据,之后使用tcp_collapse函数尝试将sk_receive_queue队列中的数据折叠到少量的skb结构中;最后如果接收缓存还是占用过高,调用函数tcp_prune_ofo_queue删除out_of_order_queue队列中的数据包。

    static int tcp_try_rmem_schedule(struct sock *sk, struct sk_buff *skb, unsigned int size)
    {
        if (atomic_read(&sk->sk_rmem_alloc) > sk->sk_rcvbuf || !sk_rmem_schedule(sk, skb, size)) {
            if (tcp_prune_queue(sk) < 0)
                return -1;
            while (!sk_rmem_schedule(sk, skb, size)) {
                if (!tcp_prune_ofo_queue(sk))
                    return -1;
            }
        }
    }
    static int tcp_prune_queue(struct sock *sk)
    {
        tcp_collapse_ofo_queue(sk);
        if (!skb_queue_empty(&sk->sk_receive_queue))
            tcp_collapse(sk, &sk->sk_receive_queue, NULL, skb_peek(&sk->sk_receive_queue), NULL, tp->copied_seq, tp->rcv_nxt);
        sk_mem_reclaim(sk);
     
        if (atomic_read(&sk->sk_rmem_alloc) <= sk->sk_rcvbuf)
            return 0;
        tcp_prune_ofo_queue(sk);
    }

    函数tcp_collapse_ofo_queue首先寻找到一段序号连续的节点数据包(由start至end),一旦遇到缺口或者队列结尾,标志着此段连续序号数据包的结束,就将这段节点数据进行合并。之后开始寻找新的下一段序号连续的节点数据,更新start和end。

    如下所示,在skb为空队列结束情况下,或者下一个节点的开始序号在end段结尾之后(红黑树右侧情况),又或者下一个节点的结束序号在start段开始之前(树左侧),标志着序号已经不连续,内核将合并head到skb之间的节点数据,这些数据位于序列号start到end之间。并且,以skb开始另一段新的连续序号查找。

    在寻找序号连续节点的过程中,如果下一个节点与上一个阶段的序号有重叠,下一个节点的开始序号位于start段开始序号之前,更新start的值;下一个节点的结束序号在end段结束序号之后,更新end的值。

    static void tcp_collapse_ofo_queue(struct sock *sk)
    {
        skb = skb_rb_first(&tp->out_of_order_queue);
    new_range:
        if (!skb) {
            tp->ooo_last_skb = skb_rb_last(&tp->out_of_order_queue);
            return;
        }
        start = TCP_SKB_CB(skb)->seq;
        end = TCP_SKB_CB(skb)->end_seq;
        for (head = skb;;) {
            skb = skb_rb_next(skb);
            if (!skb || after(TCP_SKB_CB(skb)->seq, end) || before(TCP_SKB_CB(skb)->end_seq, start)) {
                tcp_collapse(sk, NULL, &tp->out_of_order_queue, head, skb, start, end);
                goto new_range;
            }
            if (unlikely(before(TCP_SKB_CB(skb)->seq, start)))
                start = TCP_SKB_CB(skb)->seq;
            if (after(TCP_SKB_CB(skb)->end_seq, end))
                end = TCP_SKB_CB(skb)->end_seq;
        }
    }

    核心的函数tcp_collapse函数如下,分成两个部分介绍。第一部分主要是检查由head到tail的队列是否可以进行折叠,并且找到最佳的折叠开始处。如果skb数据的TCP头部未设置SYN或者FIN标志,并且由数据包的truesize长度计算的窗口值大于数据包的数据长度,或者数据包的开始序号在start之前,即找到进行折叠的开始数据包。另外,如果skb的下一个树节点不是末尾节点,并且skb的结束序号不等于下一个树节点的开始序号,两者有重叠,将此skb做为折叠开始节点。如果以上条件都不成立,开始检查下一个节点数据包,更新折叠start开始序号为当前数据包skb的结束序号。

    最后,如果要求折叠的start开始序号在当前遍历数据包skb的接收序号之后,说明此数据包已被包括在内,不在需要了,使用tcp_collapse_one清除此数据包。

    static void
    tcp_collapse(struct sock *sk, struct sk_buff_head *list, struct rb_root *root, struct sk_buff *head, struct sk_buff *tail, u32 start, u32 end)
    {
    restart:
        for (end_of_skbs = true; skb != NULL && skb != tail; skb = n) {
            n = tcp_skb_next(skb, list);
     
            /* No new bits? It is possible on ofo queue. */
            if (!before(start, TCP_SKB_CB(skb)->end_seq)) {
                skb = tcp_collapse_one(sk, skb, list, root);
                if (!skb)
                    break;
                goto restart;
            }
            if (!(TCP_SKB_CB(skb)->tcp_flags & (TCPHDR_SYN | TCPHDR_FIN)) && (tcp_win_from_space(sk, skb->truesize) > skb->len || before(TCP_SKB_CB(skb)->seq, start))) {
                end_of_skbs = false;
                break;
            }
            if (n && n != tail && TCP_SKB_CB(skb)->end_seq != TCP_SKB_CB(n)->seq) {
                end_of_skbs = false;
                break;
            }
            start = TCP_SKB_CB(skb)->end_seq;
        }
        if (end_of_skbs || (TCP_SKB_CB(skb)->tcp_flags & (TCPHDR_SYN | TCPHDR_FIN)))
            return;
    }

    在找到折叠开始节点和更新了开始序号之后,函数tcp_collapse的下半部分进行折叠操作。对序列号在start到end之间的数据包,按照以copy字节为数据长度的skb进行重组织。其中copy为end与start序号的差值,及SKB_MAX_ORDER(0, 0)的结果之间的较小值。宏SKB_MAX_ORDER(0,0)的结果为页面大小减去skb_shared_info结构体的长度所得到的值。

    对于out_of_order_queue队列,需要先将新的skb结构链接在临时的tmp列表中,在完成折叠以后,将tmp列表中的数据包插入到out_of_order_queue树结构中。

    #define SKB_WITH_OVERHEAD(X)     ((X) - SKB_DATA_ALIGN(sizeof(struct skb_shared_info)))
    #define SKB_MAX_ORDER(X, ORDER)  SKB_WITH_OVERHEAD((PAGE_SIZE << (ORDER)) - (X))
     
        while (before(start, end)) {
            int copy = min_t(int, SKB_MAX_ORDER(0, 0), end - start);
            
            nskb = alloc_skb(copy, GFP_ATOMIC);
            if (!nskb)
                break;
            memcpy(nskb->cb, skb->cb, sizeof(skb->cb));
            TCP_SKB_CB(nskb)->seq = TCP_SKB_CB(nskb)->end_seq = start;
            if (list)
                __skb_queue_before(list, skb, nskb);
            else
                __skb_queue_tail(&tmp, nskb); /* defer rbtree insertion */
            skb_set_owner_r(nskb, sk); 
            
            /* Copy data, releasing collapsed skbs. */
            while (copy > 0) {
                int offset = start - TCP_SKB_CB(skb)->seq;
                int size = TCP_SKB_CB(skb)->end_seq - start;
                
                BUG_ON(offset < 0);
                if (size > 0) { 
                    size = min(copy, size);
                    if (skb_copy_bits(skb, offset, skb_put(nskb, size), size))
                        BUG();
                    TCP_SKB_CB(nskb)->end_seq += size;
                    copy -= size;
                    start += size;
                }   
                if (!before(start, TCP_SKB_CB(skb)->end_seq)) {
                    skb = tcp_collapse_one(sk, skb, list, root);
                    if (!skb || skb == tail || (TCP_SKB_CB(skb)->tcp_flags & (TCPHDR_SYN | TCPHDR_FIN)))
                        goto end;
                }       
            }   
        }   
    end:
        skb_queue_walk_safe(&tmp, skb, n)
            tcp_rbtree_insert(root, skb);
    }

    函数tcp_prune_ofo_queue由out_of_order_queue树的最后一个节点开始遍历树结构,释放树节点,直到套接口的接收缓存sk_rmem_alloc小于等于设定的接收缓存限值sk_rcvbuf,并且TCP协议的缓存状态不再处于承压状态为止。此遍历将先释放尾部大序号的数据包,此举可为小序号数据包的接收腾出空间,以便其组成可供应用层读取的正常数据包。

    static bool tcp_prune_ofo_queue(struct sock *sk)
    {
        struct tcp_sock *tp = tcp_sk(sk);
     
        node = &tp->ooo_last_skb->rbnode;
        do {
            prev = rb_prev(node);
            rb_erase(node, &tp->out_of_order_queue);
            tcp_drop(sk, rb_to_skb(node));
            sk_mem_reclaim(sk);
            if (atomic_read(&sk->sk_rmem_alloc) <= sk->sk_rcvbuf && !tcp_under_memory_pressure(sk))
                break;
            node = prev;
        } while (node);
        tp->ooo_last_skb = rb_to_skb(prev);
    }

  • 相关阅读:
    【云原生kubernetes从入门到实践系列教程 ]一.docker安装
    《概念解析》内容汇总 + 导航
    java-net-php-python-sceatch在线学习系统2019演示录像计算机毕业设计程序
    逆势涨薪3k!新媒体运营毅然转行测试,我的入行秘籍是什么?
    23 种设计模式的通俗解释,虽然有点污,但是很正点
    基于Simulink的用于电力系统动态分析
    老卫带你学---leetcode刷题(4. 寻找两个正序数组的中位数)
    【Java基础(高级篇)】集合源码剖析
    使用python批量重命名文件夹中的文件
    进阶的风控策略篇:如果筛选最佳策略帮我们锁定优质客群
  • 原文地址:https://blog.csdn.net/wuyongmao/article/details/126265842