• TCP协议之《Pacing功能》


    TCP Pacing功能控制TCP的发包速率。

    一、Pacing的初始化

    TCP协议初始函数tcp_sk_init中,赋值了两个Pacing相关的参数,分别为sysctl_tcp_pacing_ss_ratio和sysctl_tcp_pacing_ca_ratio,都是控制pacing速率的倍数值。前者应用中慢启动阶段,默认值为200,即将速率提升200%;后者应用在拥塞避免阶段,默认值为120,即将速率提升120%。

    static int __net_init tcp_sk_init(struct net *net)
    {
        net->ipv4.sysctl_tcp_pacing_ss_ratio = 200;
        net->ipv4.sysctl_tcp_pacing_ca_ratio = 120;
    }
     
    $ cat /proc/sys/net/ipv4/tcp_pacing_ss_ratio
    200
    $ cat /proc/sys/net/ipv4/tcp_pacing_ca_ratio
    120
    另外,在TCP定时器初始化函数tcp_init_xmit_timers中,内核初始化一个高精度的pacing定时器,超时处理函数设定为tcp_pace_kick。

    void tcp_init_xmit_timers(struct sock *sk)
    {
        inet_csk_init_xmit_timers(sk, &tcp_write_timer, &tcp_delack_timer, &tcp_keepalive_timer);
        hrtimer_init(&tcp_sk(sk)->pacing_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS_PINNED);
        tcp_sk(sk)->pacing_timer.function = tcp_pace_kick;
    }
    最后在套接口初始化函数sock_init_data中,初始化三个pacing相关的参数。其中最大的速率sk_max_pacing_rate和当前速率sk_pacing_rate设置为最大的无符号整型值,而sk_pacing_shift设置为10。

    void sock_init_data(struct socket *sock, struct sock *sk)
    {
        sk->sk_max_pacing_rate = ~0U;
        sk->sk_pacing_rate = ~0U;
        sk->sk_pacing_shift = 10;
    }

    二、Pacing功能开启
    TCP拥塞控制算法BBR需要pacing功能的支持,在其初始化函数bbr_init中,将pacing状态sk_pacing_status设置为SK_PACING_NEEDED使能pacing功能。另外,用户可通过setsockopt系统调用的选项SO_MAX_PACING_RATE设置pacing的最大速率,其隐含的打开套接口的pacing功能。

    static void bbr_init(struct sock *sk)
    {
        struct tcp_sock *tp = tcp_sk(sk);
        struct bbr *bbr = inet_csk_ca(sk);
     
        bbr->has_seen_rtt = 0;
        bbr_init_pacing_rate_from_rtt(sk);
        
        cmpxchg(&sk->sk_pacing_status, SK_PACING_NONE, SK_PACING_NEEDED);
    }
    int sock_setsockopt(struct socket *sock, int level, int optname, char __user *optval, unsigned int optlen)
    {
        switch (optname) {
        case SO_MAX_PACING_RATE:
            if (val != ~0U)
                cmpxchg(&sk->sk_pacing_status, SK_PACING_NONE, SK_PACING_NEEDED);
            sk->sk_max_pacing_rate = val;
            sk->sk_pacing_rate = min(sk->sk_pacing_rate, sk->sk_max_pacing_rate);
            break;
        }
    }

    三、Pacing功能启用

    内核网络中的流控算法Fair queue可以很好的完成数据报文的pacing功能,但是当前系统为其网络接口选用了sch_fq的算法。所以,不考虑流控系统,函数tcp_needs_internal_pacing检查是否要在TCP子系统中执行pacing功能。

    static bool tcp_needs_internal_pacing(const struct sock *sk)
    {
        return smp_load_acquire(&sk->sk_pacing_status) == SK_PACING_NEEDED;
    }
    函数tcp_internal_pacing负责开启TCP自身的pacing功能,前提是由pacing的需求,并且当前速率不为零而且不等于最大无符号整数值。按照当前的速率计算发送skb数据包所需要的时长,以纳秒为单位。启动pacing_timer,超时时间设置为当前数据包以设定的pacing速率发送完成所需的时长。但是,实际情况中,数据可能并不需要如此长的时间。

    static void tcp_internal_pacing(struct sock *sk, const struct sk_buff *skb)
    {
        u64 len_ns;
        u32 rate;
     
        if (!tcp_needs_internal_pacing(sk))
            return;
        rate = sk->sk_pacing_rate;
        if (!rate || rate == ~0U)
            return;
     
        /* Should account for header sizes as sch_fq does, but lets make things simple. */
        len_ns = (u64)skb->len * NSEC_PER_SEC;
        do_div(len_ns, rate);
        hrtimer_start(&tcp_sk(sk)->pacing_timer, ktime_add_ns(ktime_get(), len_ns), HRTIMER_MODE_ABS_PINNED);
    }

    以上pacing使能函数tcp_internal_pacing在TCP传输函数tcp_transmit_skb中调用,条件是当前发送的数据包有数据,并非是SYN或者ACK类的控制报文。鉴于此时skb还没有添加网络层以及链路层的头部信息,tcp_internal_pacing在计算数据包发送时长算法中,也仅是TCP头部和数据的长度。但是流控系统的sch_fq算法不同,其可获取到最终的完整数据包长度。

    static int tcp_transmit_skb(struct sock *sk, struct sk_buff *skb, int clone_it, gfp_t gfp_mask)
    {
        if (skb->len != tcp_header_size) {
            tcp_event_data_sent(tp, sk);
            tp->data_segs_out += tcp_skb_pcount(skb);
            tcp_internal_pacing(sk, skb);
        }
    }

    四、Pacing检查

    Pacing功能的检查一个是之前的tcp_needs_internal_pacing函数,检查TCP pacing是否开启;另一个条件是pacing定时器是否启动,由函数hrtimer_active实现。两个条件同时成立,表明pacing正在工作,暂停数据包的发送。

    static bool tcp_pacing_check(const struct sock *sk)
    {
        return tcp_needs_internal_pacing(sk) && hrtimer_active(&tcp_sk(sk)->pacing_timer);
    }
    参见以下的TCP发送队列处理函数tcp_write_xmit和重传队列处理函数tcp_xmit_retransmit_queue,在tcp_pacing_check返回真表明pacing已在工作时,退出发送处理流程。

    static bool tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle, int push_one, gfp_t gfp)
    {
        max_segs = tcp_tso_segs(sk, mss_now);
        while ((skb = tcp_send_head(sk))) {
     
            if (tcp_pacing_check(sk))
                break;
    }
    void tcp_xmit_retransmit_queue(struct sock *sk)
    {
        rtx_head = tcp_rtx_queue_head(sk);
        skb = tp->retransmit_skb_hint ?: rtx_head;
        max_segs = tcp_tso_segs(sk, tcp_current_mss(sk));
        skb_rbtree_walk_from(skb) {
     
            if (tcp_pacing_check(sk))
                break;
    }

    五、Pacing处理

    由于tcp_pacing_check函数和tcp_internal_pacing函数配合,已经实现了TCP数据包的pacing功能。在pacing定时器超时之后,pacing超时处理函数tcp_pace_kick其实并不需要做什么处理了。

    但是在实现中,tcp_pace_kick函数将处理被TSQ(TCP Small Queue)功能设置为阻塞状态的套接口。关于TSQ参见 https://blog.csdn.net/sinat_20184565/article/details/89341370。

    static bool tcp_small_queue_check(struct sock *sk, const struct sk_buff *skb, unsigned int factor)
    {
        limit = max(2 * skb->truesize, sk->sk_pacing_rate >> sk->sk_pacing_shift);
        limit = min_t(u32, limit, sock_net(sk)->ipv4.sysctl_tcp_limit_output_bytes);
        limit <<= factor;
    }
    由于在TSQ检查时,可能由于当前的pacing速率sk_pacing_rate过高,TSQ限制了数据报文的发送,将套接口设置为阻塞状态。所以,在tcp_pace_kick函数中处理TSQ队列,必要时调用TSQ的tasklet处理。

    enum hrtimer_restart tcp_pace_kick(struct hrtimer *timer)
    {
        struct tcp_sock *tp = container_of(timer, struct tcp_sock, pacing_timer);
        struct sock *sk = (struct sock *)tp;
     
        for (oval = READ_ONCE(sk->sk_tsq_flags);; oval = nval) {
            struct tsq_tasklet *tsq;
            bool empty;
     
            if (oval & TSQF_QUEUED)
                break;
     
            nval = (oval & ~TSQF_THROTTLED) | TSQF_QUEUED | TCPF_TSQ_DEFERRED;
            nval = cmpxchg(&sk->sk_tsq_flags, oval, nval);
            if (nval != oval)
                continue;
     
            if (!refcount_inc_not_zero(&sk->sk_wmem_alloc))
                break;
            /* queue this socket to tasklet queue */
            tsq = this_cpu_ptr(&tsq_tasklet);
            empty = list_empty(&tsq->head);
            list_add(&tp->tsq_node, &tsq->head);
            if (empty)
                tasklet_schedule(&tsq->tasklet);
            break;
        }
        return HRTIMER_NORESTART;
    }

    六、Pacing速率
    速率更新的基础函数为tcp_update_pacing_rate。如下,当前pacing速率的计算由三个变量组成。当前发送MSS缓存值mss_cache乘以拥塞窗口cwnd的结果,除以平滑往返时间srtt,及最大可发送的数据长度除以srtt得到当前pacing速率。拥塞窗口函数中是取自发送拥塞窗口值snd_cwnd与已发出数据包数量packets_out两者之中的最大值。对于处在慢启动阶段的套接口,将得到的速率值默认增加200%倍(sysctl_tcp_pacing_ss_ratio);反之对于处在拥塞避免阶段的套接口,将速率默认增加120%倍(sysctl_tcp_pacing_ca_ratio)。最终的pacing速率不能大于限定的最大值sk_max_pacing_rate。

    static void tcp_update_pacing_rate(struct sock *sk)
    {
        /* set sk_pacing_rate to 200 % of current rate (mss * cwnd / srtt) */
        rate = (u64)tp->mss_cache * ((USEC_PER_SEC / 100) << 3);
     
        /* current rate is (cwnd * mss) / srtt
         * In Slow Start [1], set sk_pacing_rate to 200 % the current rate.
         * In Congestion Avoidance phase, set it to 120 % the current rate.
         *
         * [1] : Normal Slow Start condition is (tp->snd_cwnd < tp->snd_ssthresh)
         *   If snd_cwnd >= (tp->snd_ssthresh / 2), we are approaching end of slow start and should slow down.
         */
        if (tp->snd_cwnd < tp->snd_ssthresh / 2)
            rate *= sock_net(sk)->ipv4.sysctl_tcp_pacing_ss_ratio;
        else
            rate *= sock_net(sk)->ipv4.sysctl_tcp_pacing_ca_ratio;
     
        rate *= max(tp->snd_cwnd, tp->packets_out);
     
        if (likely(tp->srtt_us))
            do_div(rate, tp->srtt_us);
     
        /* WRITE_ONCE() is needed because sch_fq fetches sk_pacing_rate
         * without any lock. We want to make sure compiler wont store intermediate values in this location.
         */
        WRITE_ONCE(sk->sk_pacing_rate, min_t(u64, rate, sk->sk_max_pacing_rate));
    }

    Pacing速率更新的入口有两个。一个位于TCP服务端接收到客户端的三次握手的ACK报文之后,初始化pacing速率。前提是TCP当前采用的拥塞避免算法没有实现cong_control回调函数,目前仅有BBR算法实现了此回调。

    int tcp_rcv_state_process(struct sock *sk, struct sk_buff *skb)
    {
        switch (sk->sk_state) {
        case TCP_SYN_RECV:
            if (!inet_csk(sk)->icsk_ca_ops->cong_control)
                tcp_update_pacing_rate(sk);
    }
    static struct tcp_congestion_ops tcp_bbr_cong_ops __read_mostly = {
        .flags      = TCP_CONG_NON_RESTRICTED,
        .name       = "bbr",
        .cong_control   = bbr_main,
    };
    如果采用BBR算法,将不在tcp_rcv_state_process函数中初始化pacing速率。BBR拥塞算法在cong_control回调(bbr_main)中设置pacing速率,如下的tcp_cong_control函数,如果cong_control有值,执行完之后就结束函数。只有在采用除BBR算法之外的其它拥塞算法时(cong_control为空指针),才会往后执行,调用pacing速率更新函数。tcp_cong_control函数在处理ACK确认报文的最后被调用。

    static void tcp_cong_control(struct sock *sk, u32 ack, u32 acked_sacked, int flag, const struct rate_sample *rs)
    {
        const struct inet_connection_sock *icsk = inet_csk(sk);
     
        if (icsk->icsk_ca_ops->cong_control) {
            icsk->icsk_ca_ops->cong_control(sk, rs);
            return;
        }
        if (tcp_in_cwnd_reduction(sk)) {  
            tcp_cwnd_reduction(sk, acked_sacked, flag);   /* Reduce cwnd if state mandates */
        } else if (tcp_may_raise_cwnd(sk, flag)) {
            tcp_cong_avoid(sk, ack, acked_sacked);        /* Advance cwnd if state allows */
        }
        tcp_update_pacing_rate(sk);
    }
    static int tcp_ack(struct sock *sk, const struct sk_buff *skb, int flag)
    {
        tcp_cong_control(sk, ack, delivered, flag, sack_state.rate);
        tcp_xmit_recovery(sk, rexmit);
        return 1;
    }

    七、BBR调整pacing速率
    在拥塞控制算法BBR中,将cong_control回调初始化为指向bbr_main函数的指针,其调用bbr_set_pacing_rate函数更新pacing速率。

    static void bbr_main(struct sock *sk, const struct rate_sample *rs)
    {
        struct bbr *bbr = inet_csk_ca(sk);
     
        bbr_update_model(sk, rs);
        bw = bbr_bw(sk);
        bbr_set_pacing_rate(sk, bw, bbr->pacing_gain);
    }
    BBR中核心的pacing速率计算函数如下的bbr_rate_bytes_per_sec。其中rate参数为BBR算法估算出的当前带宽值,其乘以MTU值,再乘以一个增益值gain,类似于之前接收的函数tcp_update_pacing_rate中计算pacing速率的过程,差别在与这里使用的报文长度为MTU(不包括链路层长度),并且将之前的拥塞窗口换成了带宽值,而且将之前的递增倍率(sysctl_tcp_pacing_ss_ratio/sysctl_tcp_pacing_ca_ratio)换成了BBR计算的增益值gain。

    函数bbr_bw_to_pacing_rate确保pacing值不超过最大的限定值sk_max_pacing_rate。

    static u64 bbr_rate_bytes_per_sec(struct sock *sk, u64 rate, int gain)
    {
        rate *= tcp_mss_to_mtu(sk, tcp_sk(sk)->mss_cache);
        rate *= gain;
        rate >>= BBR_SCALE;
        rate *= USEC_PER_SEC;
        return rate >> BW_SCALE;
    }
    /* Convert a BBR bw and gain factor to a pacing rate in bytes per second. */
    static u32 bbr_bw_to_pacing_rate(struct sock *sk, u32 bw, int gain)
    {
        u64 rate = bw;
     
        rate = bbr_rate_bytes_per_sec(sk, rate, gain);
        rate = min_t(u64, rate, sk->sk_max_pacing_rate);
        return rate;
    }  

    在主回调函数bbr_main中,bbr_set_pacing_rate调用以上bbr_bw_to_pacing_rate获取到pacing的速率值。通常情况下has_seen_rtt变量已经置位,在初始化函数bbr_init中已调用过函数bbr_init_pacing_rate_from_rtt。如果带宽已占满,或者计算的pacing速率大于当前使用的速率sk_pacing_rate,更新当前速率。

    为了在保证较少队列的同时维持网络的高利用率和低延时,计算所得的pacing速率略低于估算带宽的百分之一左右,为达到此目的,在计算pacing速率时,使用了链路的MTU值,未将链路层头部数据长度包含在内。

    static void bbr_set_pacing_rate(struct sock *sk, u32 bw, int gain)
    {
        struct tcp_sock *tp = tcp_sk(sk);
        struct bbr *bbr = inet_csk_ca(sk);
        u32 rate = bbr_bw_to_pacing_rate(sk, bw, gain);
     
        if (unlikely(!bbr->has_seen_rtt && tp->srtt_us))
            bbr_init_pacing_rate_from_rtt(sk);
        if (bbr_full_bw_reached(sk) || rate > sk->sk_pacing_rate)
            sk->sk_pacing_rate = rate;
    }
    函数bbr_init_pacing_rate_from_rtt已在br_init初始化时调用,带宽的值由发送拥塞窗口乘以BW_UNIT,在除以rtt_us值得到,通过以上的bbr_bw_to_pacing_rate函数计算pacing速率,增益值使用bbr_high_gain。如果平滑往返时间为零,rtt_us使用缺省的USEC_PER_MSEC(1000),反之,使用srtt_us值除以8(BBR_SCALE)的值。
     

    #define BW_SCALE 24
    #define BW_UNIT (1 << BW_SCALE)
    #define BBR_SCALE 8                /* scaling factor for fractions in BBR (e.g. gains) */
    #define BBR_UNIT (1 << BBR_SCALE)
    static const int bbr_high_gain  = BBR_UNIT * 2885 / 1000 + 1;
     
    /* Initialize pacing rate to: high_gain * init_cwnd / RTT. */
    static void bbr_init_pacing_rate_from_rtt(struct sock *sk)
    {
        struct tcp_sock *tp = tcp_sk(sk);
        struct bbr *bbr = inet_csk_ca(sk);
        u64 bw;
        u32 rtt_us;
     
        if (tp->srtt_us) {      /* any RTT sample yet? */
            rtt_us = max(tp->srtt_us >> 3, 1U);
            bbr->has_seen_rtt = 1;
        } else {             /* no RTT sample yet */
            rtt_us = USEC_PER_MSEC;  /* use nominal default RTT */
        }
        bw = (u64)tp->snd_cwnd * BW_UNIT;
        do_div(bw, rtt_us);
        sk->sk_pacing_rate = bbr_bw_to_pacing_rate(sk, bw, bbr_high_gain);
    }

  • 相关阅读:
    springboot整合mybatis(配置模式+注解模式)
    网页在移动端的适配中单位的选择
    SandStorm 出品|建设者高光时刻 9 月作品集已上线!
    粽子产线的速度提升
    Win10+MX350+CUDA10.2+Python3.9配置Detectron2
    快速解决 adb server version doesn‘t match this client
    .NET LINQ 通常分 Syntax Query 和Syntax Method
    reactNative导入excel文件
    分解质因数——AcWing 197. 阶乘分解
    【redis入门系列】redis搭建主从服务器
  • 原文地址:https://blog.csdn.net/wuyongmao/article/details/126246591