• TCP协议之《套接口sk_backlog接收队列》


    在接收到数据包之后,如果判断此套接口当前正被用户进程所使用,数据包将被保存到套接口结构的sk_backlog成员的head所定义的skb缓存列表中,tail指向链表的末尾,len变量记录了当前链表中所有skb的总长度。

    struct sock {
        struct {
            atomic_t    rmem_alloc;
            int     len;
            struct sk_buff  *head;
            struct sk_buff  *tail;
        } sk_backlog;
    #define sk_rmem_alloc sk_backlog.rmem_alloc
    }
    TCP在创建子套接口时,将sk_backlog结构的成员head和tail都置为空,链表元素数量为零。

    struct sock *sk_clone_lock(const struct sock *sk, const gfp_t priority)
    {
        struct sock *newsk;
     
        newsk = sk_prot_alloc(sk->sk_prot, priority, sk->sk_family);
        if (newsk != NULL) {
            newsk->sk_backlog.head  = newsk->sk_backlog.tail = NULL;
            newsk->sk_backlog.len = 0;
        }
    }

    一、backlog链表添加

    如下函数tcp_v4_rcv所示,函数sock_owned_by_user检查套接口是否被应用进程在使用,如果没有,执行正常的TCP接收操作;否则,将调用函数tcp_add_backlog将数据包添加到backlog链表中,在用户进程释放套接口后,内核还是会调用tcp_v4_do_rcv函数执行接收操作。

    int tcp_v4_rcv(struct sk_buff *skb)
    {
        if (!sock_owned_by_user(sk)) {
            ret = tcp_v4_do_rcv(sk, skb);
        } else if (tcp_add_backlog(sk, skb)) {
            goto discard_and_relse;
        }
    }
    backlog链表的添加操作不能执行接收队列(包括sk_receive_queue和out_of_order_queue)的减小内存占用类的操作(collapse/prune),只有套接口的所有者可执行此类操作。在tcp_add_backlog函数执行链表添加操作之前,内核在最大接收缓存和发送缓存之和的基础之上,再增加64K的额外量以保证成功添加,由于系统中在sk_backlog链表中同时存储有数据的套接口属于少数情况,增加额外量不会有问题。

    之后,还是要使用函数skb_condense尝试一下对skb进行空间压缩,算法很简单:如果skb的线性空间有足够的剩余,就可将其共享空间中的页面片段拷贝到线性空间中,以释放页面片段。如果线性空间中的剩余量小于页面片段的长度,或者此skb被克隆过(页面片段共享给了其它skb),不执行任何压缩。

    bool tcp_add_backlog(struct sock *sk, struct sk_buff *skb)
    {   
        u32 limit = sk->sk_rcvbuf + sk->sk_sndbuf;
        
        limit += 64*1024;
        skb_condense(skb);
        
        if (unlikely(sk_add_backlog(sk, skb, limit))) {
            bh_unlock_sock(sk);
            return true;
        }
        return false;
    }
    函数sk_add_backlog如下,如果即便内核增加了64K的缓存限额,backlog链表占用的空间与套接口接收缓存之和仍然大于限额,返回无缓存的错误。此处的qsize未考虑当前skb数据包的空间占用量,即不管其大小,只要还有空间就将其接收。但是,如果skb的空间是由系统的PF_MEMALLOC保留区分配而来,并且套接口未设置SOCK_MEMALLOC标志,即内存已经处于紧张状态,此套接口还不能够帮助释放内存,返回无内存错误。最终在函数__sk_add_backlog将数据包添加到backlog链表后,为链表的长度增加skb的truesize值。

    static inline bool sk_rcvqueues_full(const struct sock *sk, unsigned int limit)
    {   
        unsigned int qsize = sk->sk_backlog.len + atomic_read(&sk->sk_rmem_alloc);
        return qsize > limit;

    static inline __must_check int sk_add_backlog(struct sock *sk, struct sk_buff *skb, unsigned int limit)
    {
        if (sk_rcvqueues_full(sk, limit))
            return -ENOBUFS;
        
        /*
         * If the skb was allocated from pfmemalloc reserves, only
         * allow SOCK_MEMALLOC sockets to use it as this socket is
         * helping free memory
         */
        if (skb_pfmemalloc(skb) && !sock_flag(sk, SOCK_MEMALLOC))
            return -ENOMEM;
        
        __sk_add_backlog(sk, skb);
        sk->sk_backlog.len += skb->truesize;
        return 0;
    }

    函数__sk_add_backlog执行简单的单链表添加操作,backlog链表的尾部skb的next指针指向新添加的skb,并且,将新的skb地址赋值给backlog的尾部tail指针。

    static inline void __sk_add_backlog(struct sock *sk, struct sk_buff *skb)
    {   
        if (!sk->sk_backlog.tail) 
            sk->sk_backlog.head = skb;
        else
            sk->sk_backlog.tail->next = skb;
        
        sk->sk_backlog.tail = skb;
        skb->next = NULL;
    }

    二、backlog链表处理
    以上提到backlog链表中的数据最终还是由tcp_v4_do_rcv函数进行处理。由于在TCP套接口初始化时,内核已经将TCP协议结构的成员backlog_rcv回调函数指针赋予了函数tcp_v4_do_rcv的值,后续又将backlog_rcv指针赋给了套接口的sk_backlog_rcv函数指针。

    struct proto tcp_prot = {
        .name           = "TCP",
        .backlog_rcv        = tcp_v4_do_rcv,
        .release_cb     = tcp_release_cb,
    }
    static int inet_create(struct net *net, struct socket *sock, int protocol, int kern)
    {
        sk = sk_alloc(net, PF_INET, GFP_KERNEL, answer_prot, kern);
        if (!sk)
            goto out;
     
        sk->sk_backlog_rcv = sk->sk_prot->backlog_rcv;
    }
    backlog链表接收处理函数为sk_backlog_rcv,与链表添加时的操作对应,对于设置了SOCK_MEMALLOC标志的套接口,并且数据包skb的内存是由系统的PF_MEMALLOC保留内存区分配而来的情况,使用__sk_backlog_rcv函数处理数据包。其它情况下直接调用回调函数sk_backlog_rcv处理。

    函数memalloc_noreclaim_save将为当前进程增设PF_MEMALLOC标志,memalloc_noreclaim_restore函数还原前值。

    static inline int sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
    {
        if (sk_memalloc_socks() && skb_pfmemalloc(skb))
            return __sk_backlog_rcv(sk, skb);
     
        return sk->sk_backlog_rcv(sk, skb);
    }
    int __sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
    {
        BUG_ON(!sock_flag(sk, SOCK_MEMALLOC));
     
        noreclaim_flag = memalloc_noreclaim_save();
        ret = sk->sk_backlog_rcv(sk, skb);
        memalloc_noreclaim_restore(noreclaim_flag);
    }

    三、backlog链表处理时机

    以应用层进程的数据发送为例,内核的tcp_sendmsg函数在处理请求之后,调用release_sock释放套接口锁。如果在此期间接收到网络数据包,内核已将其放入了backlog链表中,在释放套接口时,就需要检查以下backlog链表,已有机会进行尽快处理。如果其尾部tail不为空,说明链表非空,调用__release_sock处理。

    int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size)
    {
        lock_sock(sk);
        ret = tcp_sendmsg_locked(sk, msg, size);
        release_sock(sk);
        return ret;
    }
    void release_sock(struct sock *sk)
    {
        spin_lock_bh(&sk->sk_lock.slock);
        if (sk->sk_backlog.tail)
            __release_sock(sk);
        if (sk->sk_prot->release_cb)
            sk->sk_prot->release_cb(sk);
        sock_release_ownership(sk);
        spin_unlock_bh(&sk->sk_lock.slock);
    }

    需要特别注意的是函数__release_sock存在两个嵌套的循环,内层的循环是遍历backlog链表,处理其中的每个数据包skb元素,在进入内存循环之前将backlog链表的头尾两个指针清空,并且在遍历过程中内核有可能发送重调度,如果在调度其将有新的网络数据包到来,就需要外层的循环在次对backlog链表进行判断处理。

    static void __release_sock(struct sock *sk)
        __releases(&sk->sk_lock.slock)
        __acquires(&sk->sk_lock.slock)
    {
        while ((skb = sk->sk_backlog.head) != NULL) {
            sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
     
            spin_unlock_bh(&sk->sk_lock.slock);
            do {
                next = skb->next;
                prefetch(next);
                WARN_ON_ONCE(skb_dst_is_noref(skb));
                skb->next = NULL;
                sk_backlog_rcv(sk, skb);
     
                cond_resched();
                skb = next;
            } while (skb != NULL);
            spin_lock_bh(&sk->sk_lock.slock);
        }
        sk->sk_backlog.len = 0;
    }

    另外,内核定义了backlog链表flush函数,如下sk_flush_backlog,其本质上是对__release_sock函数的封装。其调用点位于TCP发送函数tcp_sendmsg_locked中,使得内核在发送大量数据时,不必等到函数退出才执行backlog链表的处理。

    static inline bool sk_flush_backlog(struct sock *sk)
    {
        if (unlikely(READ_ONCE(sk->sk_backlog.tail))) {
            __sk_flush_backlog(sk);
            return true;
        }
        return false;
    }
    void __sk_flush_backlog(struct sock *sk)
    {
        spin_lock_bh(&sk->sk_lock.slock);
        __release_sock(sk);
        spin_unlock_bh(&sk->sk_lock.slock);
    }
    与TCP发送函数类似,在TCP的tcp_recvmsg接收函数中,如果用户进程设置了非阻塞模式接收,在拷贝完套接口接收队列sk_receive_queue中的数据之后,假设用户层提供的接收缓存长度还有余量,并且backlog链表中有数据,调用release_sock处理backlog链表,处理之后的数据可能又填充到了sk_receive_queue接收队列中,再次处理此队列时,可尽量返回给用户层要求长度的数据。

    int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock, int flags, int *addr_len)
    {
        do {
            last = skb_peek_tail(&sk->sk_receive_queue);
            skb_queue_walk(&sk->sk_receive_queue, skb) {
            }
     
            /* Well, if we have backlog, try to process it now yet. */
            if (copied >= target && !sk->sk_backlog.tail)
                break;
     
            if (copied >= target) {
                /* Do not sleep, just process backlog. */
                release_sock(sk);
                lock_sock(sk);
            } else {
                sk_wait_data(sk, &timeo, last);
            }
            continue;
        } while (len > 0);
    }

    总结
    对于非TCP协议套接口,例如L2TP、PPTP和PPPOE等类型套接口,其控制报文需要上送应用层处理,内核使用sk_receive_skb函数实现。最终的逻辑在函数__sk_receive_skb中。

    static int l2tp_ip_recv(struct sk_buff *skb)
    {
        /* RFC3931: L2TP/IP packets have the first 4 bytes containing
         * the session_id. If it is 0, the packet is a L2TP control
         * frame and the session_id value can be discarded.
         */
        if (session_id == 0) {
            __skb_pull(skb, 4);
            goto pass_up;
        }
    pass_up:
        return sk_receive_skb(sk, skb, 1);
    }
    如下,如果套接口被用户进程使用,也是要添加到backlog链表中,否则,直接使用sk_backlog_rcv进行处理。对于以上三个类型的套接口,其backlog_rcv回调函数与TCP套接口的不同,分别为pppol2tp_backlog_recv、pptp_rcv_core和pppoe_rcv_core。

    int __sk_receive_skb(struct sock *sk, struct sk_buff *skb, const int nested, unsigned int trim_cap, bool refcounted)
    {
        if (sk_rcvqueues_full(sk, sk->sk_rcvbuf)) {
            atomic_inc(&sk->sk_drops);
            goto discard_and_relse;
        }
        if (!sock_owned_by_user(sk)) {
            mutex_acquire(&sk->sk_lock.dep_map, 0, 1, _RET_IP_);
            rc = sk_backlog_rcv(sk, skb);
            mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_);
        } else if (sk_add_backlog(sk, skb, sk->sk_rcvbuf)) {
            bh_unlock_sock(sk);
            atomic_inc(&sk->sk_drops);
            goto discard_and_relse;
        }
    }

  • 相关阅读:
    nodejs中对es6语法规范讲解
    汇编语言快速回顾(以x86_64为例)
    我做了一个世界杯的可视化网站...
    外包干了2个月,技术退步明显.......
    47、Dynamic View Synthesis from Dynamic Monocular Video
    基于JAVA高校共享单车管理系统计算机毕业设计源码+数据库+lw文档+系统+部署
    算法模型总结:哈希
    《Java基础知识》Java ArrayList源码分析 5
    计算有向图点的入度与出度
    基于云服务MRS构建DolphinScheduler2调度系统
  • 原文地址:https://blog.csdn.net/wuyongmao/article/details/126247263