• quinn源码解析:QUIC数据包是如何发送的


    简介

    quinn是Rust编程语言中用于实现QUIC(Quick UDP Internet Connections)协议的一个crate(包)。它提供了一个高级别的API,用于构建基于QUIC的网络应用程序。quinn crate的设计目标是提供一个简洁、安全和高性能的QUIC实现。它内部使用了Rust的异步编程模型(async/await),使得编写异步网络代码更加方便和高效。
    本文主要介绍其发送数据的流程

    QUIC协议中的概念

    endpoint(端点)

    在QUIC(Quick UDP Internet Connections)协议中,Endpoint(端点)是指QUIC连接的一端,可以是客户端或服务器。每个端点都有自己的网络地址,并与其他端点进行通信以建立和管理QUIC连接。在quinn中,endpoint对应一个操作系统的socket。例如client的Endpoint创建时就是bind了一个本地的地址。

        pub fn client(addr: SocketAddr) -> io::Result<Self> {
            let socket = std::net::UdpSocket::bind(addr)?;
            let runtime = default_runtime()
                .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no async runtime found"))?;
            Self::new_with_runtime(
                EndpointConfig::default(),
                None,
                runtime.wrap_udp_socket(socket)?,
                runtime,
            )
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    connection(连接)

    两个endpoint之间可以建立connection,并且一个endpoint可以向多个endpoint建立连接。

    注意与TCP不同的是,QUIC的一个socket可以同时向多个其他socket建立连接。而TCP中每一个连接都对应client和server端的两个socket。

    在这里插入图片描述

    Stream(流)

    一条连接可以同时存在多条流,每条流上的数据相互独立,一条流发生阻塞不会影响其他流。(TCP相当于只有一条流,所以会有对头阻塞的缺陷。)

    client的流ID为奇数,server的流ID为偶数

    在这里插入图片描述

    Frame (帧)

    流是抽象出的概念,而实际上在链路上传输的只是不同的帧,不同流的帧中会有流ID用于标识此帧属于哪条流,接收端收到后根据流ID将对应的帧放入对应的流缓冲区。
    在这里插入图片描述

    发包过程解析

    以官方的client Example为例。其关键步骤如下述伪代码所示,主要包括:创建endpoint、创建连接、创建流、最后写入数据。

     //创建endpoint
     let mut endpoint = quinn::Endpoint::client("[::]:0".parse().unwrap())?; 
        ...
        //创建连接
        let conn = endpoint
            .connect(remote, host)?
            .await
            .map_err(|e| anyhow!("failed to connect: {}", e))?;
        //创建流
        let (mut send, mut recv) = conn
            .open_bi()
            .await
            .map_err(|e| anyhow!("failed to open stream: {}", e))?;
    
    	//写数据
        send.write_all(request.as_bytes())
            .await
            .map_err(|e| anyhow!("failed to send request: {}", e))?;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    SendStream::write_all

    首先我们以流写入数据为切入点来看。
    write_all接口实际上是产生了一个WriteAllFuture,数据会暂时放在WriteAll结构体里。当Runtime(默认为Tokio的运行时)下一次pollFuture时才会将数据写入到该流的缓冲区中。

    impl<'a> Future for WriteAll<'a> {
        type Output = Result<(), WriteError>;
        fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
            let this = self.get_mut();
            loop {
                if this.buf.is_empty() {
                    return Poll::Ready(Ok(()));
                }
                let buf = this.buf;
                #将数据写入缓冲区
                let n = ready!(this.stream.execute_poll(cx, |s| s.write(buf)))?;
                this.buf = &this.buf[n..];
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    注意向流的缓冲区写数据时,是经过了流控逻辑的:当可写空间为0时,写操作会被block。可写空间一般由send_window-unacked_datasend_windowunacked_data都是连接级的,所有流都受此限制。send_window是开始时设置的,此值决定整个连接的发送缓冲区的峰值大小。当应用连接数较多时应该谨慎设置此值,避免因内存占用过多而引起OOM。

        /// Returns the maximum amount of data this is allowed to be written on the connection
        pub(crate) fn write_limit(&self) -> u64 {
            (self.max_data - self.data_sent).min(self.send_window - self.unacked_data)
        }
    
    • 1
    • 2
    • 3
    • 4

    写入的数据最终又被暂时放在SendBufferunacked_segments里。

    impl SendBuffer {
        /// Append application data to the end of the stream
        pub(super) fn write(&mut self, data: Bytes) {
            self.unacked_len += data.len();
            self.offset += data.len() as u64;
            self.unacked_segments.push_back(data);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    到这里,write_all这个操作就算是结束了。那么放入缓冲区的数据又是如何进一步被发送的呢?

    ConnectionDriver

    我们把视线回到 endpoint.connect(remote, host)?.await,在连接建立时,产生了一个ConnectionDriverFuture,此ConnectionDriver一产生就被丢进runtime中去持续地执行了。

            runtime.spawn(Box::pin(
                ConnectionDriver(conn.clone()).instrument(Span::current()),
            ));
    
    • 1
    • 2
    • 3

    而这个ConnectionDriver在被poll时最终会调用Connection::poll_transmit–>Connection::populate_packet获取将要发送的帧

        fn populate_packet(
            &mut self,
            now: Instant,
            space_id: SpaceId,
            buf: &mut BytesMut,
            max_size: usize,
            pn: u64,
        ) -> SentFrames {
            let mut sent = SentFrames::default();
    
    		...
    		...
    
            // STREAM
            if space_id == SpaceId::Data {
                sent.stream_frames = self.streams.write_stream_frames(buf, max_size);
                self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
            }
    
            sent
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    StreamsState::write_stream_frames方法中从优先级队列中取出优先级最高的流并将其数据写入buf,如果流的数据都已发送完毕则将此流从优先级队列中取出。

    pub(crate) fn write_stream_frames(
            &mut self,
            buf: &mut BytesMut,
            max_buf_size: usize,
        ) -> StreamMetaVec {
            let mut stream_frames = StreamMetaVec::new();
            while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size {
                if max_buf_size
                    .checked_sub(buf.len() + frame::Stream::SIZE_BOUND)
                    .is_none()
                {
                    break;
                }
    			//不同优先级的数量
                let num_levels = self.pending.len();
                //获取优先级最高的队列
                let mut level = match self.pending.peek_mut() {
                    Some(x) => x,
                    None => break,
                };
                // Poppping data from the front of the queue, storing as much data
                // as possible in a single frame, and enqueing sending further
                // remaining data at the end of the queue helps with fairness.
                // Other streams will have a chance to write data before we touch
                // this stream again.
                //从队列中拿到第一个流
                let id = match level.queue.get_mut().pop_front() {
                    Some(x) => x,
                    None => {
                        debug_assert!(
                            num_levels == 1,
                            "An empty queue is only allowed for a single level"
                        );
                        break;
                    }
                };
                //拿到具体的流
                let stream = match self.send.get_mut(&id) {
                    Some(s) => s,
                    // Stream was reset with pending data and the reset was acknowledged
                    None => continue,
                };
    
                // Reset streams aren't removed from the pending list and still exist while the peer
                // hasn't acknowledged the reset, but should not generate STREAM frames, so we need to
                // check for them explicitly.
                if stream.is_reset() {
                    continue;
                }
    
                // Now that we know the `StreamId`, we can better account for how many bytes
                // are required to encode it.
                let max_buf_size = max_buf_size - buf.len() - 1 - VarInt::size(id.into());
                //从流中获取到本次要写的偏移量
                let (offsets, encode_length) = stream.pending.poll_transmit(max_buf_size);
                //如果流中的数据都已经发送完,则将此流从pending队列中移除
                let fin = offsets.end == stream.pending.offset()
                    && matches!(stream.state, SendState::DataSent { .. });
                if fin {
                    stream.fin_pending = false;
                }
    
                if stream.is_pending() {
                    if level.priority == stream.priority {
                        // Enqueue for the same level
                        level.queue.get_mut().push_back(id);
                    } else {
                        // Enqueue for a different level. If the current level is empty, drop it
                        if level.queue.borrow().is_empty() && num_levels != 1 {
                            // We keep the last level around even in empty form so that
                            // the next insert doesn't have to reallocate the queue
                            PeekMut::pop(level);
                        } else {
                            drop(level);
                        }
                        push_pending(&mut self.pending, id, stream.priority);
                    }
                } else if level.queue.borrow().is_empty() && num_levels != 1 {
                    // We keep the last level around even in empty form so that
                    // the next insert doesn't have to reallocate the queue
                    PeekMut::pop(level);
                }
    
                let meta = frame::StreamMeta { id, offsets, fin };
                trace!(id = %meta.id, off = meta.offsets.start, len = meta.offsets.end - meta.offsets.start, fin = meta.fin, "STREAM");
                //写入帧的头部
                meta.encode(encode_length, buf);
    
                // The range might not be retrievable in a single `get` if it is
                // stored in noncontiguous fashion. Therefore this loop iterates
                // until the range is fully copied into the frame.
                let mut offsets = meta.offsets.clone();
                while offsets.start != offsets.end {
                    let data = stream.pending.get(offsets.clone());
                    offsets.start += data.len() as u64;
                    //写入具体数据
                    buf.put_slice(data);
                }
                stream_frames.push(meta);
            }
    
            stream_frames
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    到了这里,要发送的数据实际上还是暂存在缓冲区了。然后又以EndpointEvent::Transmit事件的方式通过channel发送到endpoint的协程里。

    fn drive_transmit(&mut self) -> bool {
            let now = Instant::now();
            let mut transmits = 0;
    
            let max_datagrams = self.socket.max_transmit_segments();
            let capacity = self.inner.current_mtu();
            let mut buffer = BytesMut::with_capacity(capacity as usize);
    
            while let Some(t) = self.inner.poll_transmit(now, max_datagrams, &mut buffer) {
                transmits += match t.segment_size {
                    None => 1,
                    Some(s) => (t.size + s - 1) / s, // round up
                };
                // If the endpoint driver is gone, noop.
                let size = t.size;
                //将要发送的数据发送到endpoint协程
                let _ = self.endpoint_events.send((
                    self.handle,
                    EndpointEvent::Transmit(t, buffer.split_to(size).freeze()),
                ));
    
                if transmits >= MAX_TRANSMIT_DATAGRAMS {
                    // TODO: What isn't ideal here yet is that if we don't poll all
                    // datagrams that could be sent we don't go into the `app_limited`
                    // state and CWND continues to grow until we get here the next time.
                    // See https://github.com/quinn-rs/quinn/issues/1126
                    return true;
                }
            }
    
            false
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    ConnectionDriver的任务到这里就完成了,总的来说ConnectionDriver的任务就是从流中取出数据,并最终将数据通过channel发送给endpoint

    EndpointDriver

    connection的逻辑类似,endpoints建立时就已经spawn了一个EndpointDriver在后台一直poll,正是在poll方法中会处理来自ConnectionDriver发来的events,并写入outgoing缓冲区中。

        fn handle_events(&mut self, cx: &mut Context, shared: &Shared) -> bool {
            use EndpointEvent::*;
            for _ in 0..IO_LOOP_BOUND {
                match self.events.poll_recv(cx) {
                    Poll::Ready(Some((ch, event))) => match event {
                        ...
                        ...
                        //接受从ConnectionDriver发过来的Transmit,并写入到outgoing缓冲区中
                        Transmit(t, buf) => {
                            let contents_len = buf.len();
                            self.outgoing.push_back(udp_transmit(t, buf));
                            self.transmit_queue_contents_len = self
                                .transmit_queue_contents_len
                                .saturating_add(contents_len);
                        }
                    },
                    Poll::Ready(None) => unreachable!("EndpointInner owns one sender"),
                    Poll::Pending => {
                        return false;
                    }
                }
            }
    
            true
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    drive_send中从outgoing缓冲区中取出数据并写入socket

     fn drive_send(&mut self, cx: &mut Context) -> Result<bool, io::Error> {
            self.send_limiter.start_cycle();
    
            let result = loop {
                if self.outgoing.is_empty() {
                    break Ok(false);
                }
    
                if !self.send_limiter.allow_work() {
                    break Ok(true);
                }
    			//实际写入
                match self.socket.poll_send(cx, self.outgoing.as_slices().0) {
                    Poll::Ready(Ok(n)) => {
                        let contents_len: usize =
                            self.outgoing.drain(..n).map(|t| t.contents.len()).sum();
                        self.transmit_queue_contents_len = self
                            .transmit_queue_contents_len
                            .saturating_sub(contents_len);
                        // We count transmits instead of `poll_send` calls since the cost
                        // of a `sendmmsg` still linearly increases with number of packets.
                        self.send_limiter.record_work(n);
                    }
                    Poll::Pending => {
                        break Ok(false);
                    }
                    Poll::Ready(Err(e)) => {
                        break Err(e);
                    }
                }
            };
    
            self.send_limiter.finish_cycle();
            result
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    至此,整个发送过程就算完了。写入socket的数据由具体的操作系统底层去实现了。

  • 相关阅读:
    基于Levenberg-Marquardt算法的声源定位matlab仿真
    Reinforcement Learning in the Era of LLMs: What is Essential? What is needed?
    世界互联网大会|美创新品发布—流动安全管控平台
    使用 CSS 的仿 GitHub 登录页面
    REACT 在组件之间共享状态
    3.HTML段落、文本格式化、链接
    线段树 模板 Java语言版
    性能测试之基准测试
    5个免费全球DEM数据源-数字高程模型
    golang小案例获取命令行的的参数并求得最大值
  • 原文地址:https://blog.csdn.net/luchengtao11/article/details/134469240