继续看 on_packet
方法,我们进入了 TCP 三次握手的关键时刻。发送 SYN-ACK 后,服务器在等待客户端的 ACK 时使用此方法。如果收到 确认信息 连接会从SYN-RECEIVED
状态推进到ESTABLISHED
状态,完成握手并正式打开通信通道。 on_packet
方法完成了完成三次握手的繁重工作,它检查来自客户端的 ACK 响应。如果正确接收到 ACK,该方法会将连接状态更改为ESTABLISHED
,表示 TCP 连接已建立 并且可以开始数据传输。
在接着实现之前,增加一个“write”方法,在 on_accept
和 on_packet
中使用。此方法 将 on_accept
和 on_packet
连接方法执行的常见任务合并到一个地方来 简化 构造和发送 TCP 段的复杂性。
在 write
方法中,首先将序列号和确认号设置为传出数据包的适当值。
self.recv.nxt
,反映接收端的下一个预期字节。确认号对于数据的正确排序至关重要,因为它指示到目前为止所有数据的成功接收以及连接准备下一步接收的数据。此计数从初始接收序列号 (IRS) 开始,并结合迄今为止接收的字节数。self.tcp.sequence_number
字段,确保TCP段携带正确的序列信息。 write
方法还确定它可以在单个段中发送的最大数据量,如前所述,这称为函数内的限制。self.unacked
队列,其中包含已发送但尚未确认的数据字节。我们还没有讨论 未确认队列 - 所以现在我将跳过对此的解释,并在稍后我们讨论数据传输时重新讨论它。buf
,更新有效负载长度字段以包括 TCP 有效负载的实际大小。它故意为 TCP 标头留出空间,该空间将在写入有效负载后填充,因为 TCP 校验和计算需要完整的段,包括标头和有效负载。buf
中的保留空间。传输控制标志,如 SYN
和 FIN
,也会影响序列空间。如果设置了任一标志, write
方法就会递增序列号,因为这些标志在握手或终止过程中消耗序列号。next_seq
,它在考虑任何传输的数据和控制标志后跟踪发送者期望的下一个序列号。然后,它根据 self.timers.send_times
(稍后将访问)中的序列号记录当前时间,以便稍后用于估计往返时间 (RTT) 和管理重传。// tcp.rust
// Define the 'write' method for the 'Connection' struct.
fn write(&mut self, nic: &mut tun_tap::Iface, seq: u32, mut limit: usize) -> io::Result<usize> {
// Allocate a local buffer for creating the TCP segment.
// The size is set to 1500 bytes, which is the standard maximum transmission unit (MTU) for Ethernet.
let mut buf = [0u8; 1500];
// The sequence number for the outgoing packet is determined by the caller.
// This is the number that keeps track of the position of the first byte in the current message
// within the entire sequence of sent bytes.
self.tcp.sequence_number = seq;
// The acknowledgment number is what our side expects to receive next from the other side.
// It is calculated as the sum of the initial received sequence number (irs) and the number of bytes received so far.
self.tcp.acknowledgment_number = self.recv.nxt; // Ensure we don't send more data than we are allowed, which includes respecting the send window limit,
// which is the amount of data we can send before needing an acknowledgment, and accounting for any SYN/FIN flags
// which also consume sequence space despite not being part of the window.
// Print out information to help with debugging the connection status.
// The difference between 'self.recv.nxt' and 'self.recv.irs' gives the number of bytes we have acknowledged.
// 'seq' shows the starting sequence number for this segment.
// 'limit' is the maximum amount of data we can send in this packet.
println!(
"write(ack: {}, seq: {}, limit: {}) syn {:?} fin {:?}", self.recv.nxt - self.recv.irs, seq, limit, self.tcp.syn, self.tcp.fin, );
// Determine the starting point for data to write in the current TCP stream.
// 'wrapping_sub' is used to safely calculate differences of sequence numbers considering they might wrap around.
// Sequence numbers in TCP are 32-bit unsigned numbers and wrap back to 0 after 2^32 - 1.
let mut offset = seq.wrapping_sub(self.send.una) as usize;
// If the connection is closing, ensure no more data is sent after FIN.
if let Some(closed_at) = self.closed_at {
if seq == closed_at.wrapping_add(1) {
// After sending a FIN, reset the offset and limit to prevent further data transmission.
offset = 0;
limit = 0;
}
}
// Print the calculated offset for debugging.
println!("using offset {} base {} in {:?}", offset, self.send.una, self.unacked.as_slices());
// Split the unsent data into two slices - 'h' as the head and 't' as the tail of the unsent data.
// The split is based on the current 'offset' from where we need to start sending data.
let (mut h, mut t) = self.unacked.as_slices();
if h.len() > offset {
// If 'h' is longer than 'offset', we truncate 'h' to start from 'offset'.
h = &h[offset..];
} else {
// If 'h' is shorter, we skip 'h' completely and adjust 't' accordingly.
let skipped = h.len();
h = &[]; // 'h' is now empty, as it's been fully sent.
t = &t[(offset - skipped)..]; // 't' is adjusted to remove the already sent portion.
}
// Calculate how much data can be sent by choosing the smaller of 'limit' or available data.
let max_data = std::cmp::min(limit, h.len() + t.len());
let size = std::cmp::min(
// Limit the size to prevent buffer overflow.
buf.len(),
// Include only as much data as the MTU allows after accounting for IP and TCP header sizes.
self.tcp.header_len() as usize + self.ip.header_len() as usize + max_data,
);
// Set the IP packet's payload length to the TCP segment size.
self.ip.set_payload_len(size - self.ip.header_len() as usize);
// Start writing the packet with the IP header first.
// 'Write' is a trait in Rust that allows writing bytes to a buffer. Here we use it to write the IP header.
use std::io::Write;
let buf_len = buf.len();
let mut unwritten = &mut buf[..]; // Borrow the entire buffer as mutable to start writing the IP header.
self.ip.write(&mut unwritten); // Writing the IP header into the buffer.
// Calculate the point in the buffer where the IP header ends based on how much was written.
let ip_header_ends_at = buf_len - unwritten.len();
// Leave space for the TCP header by skipping over the region where it will be written.
// This is done to come back later and fill it in once we know the payload and can compute the checksum.
unwritten = &mut unwritten[self.tcp.header_len() as usize..];
let tcp_header_ends_at = buf_len - unwritten.len(); // Identify where the TCP header will be written.
// Now write the TCP payload to the buffer by first writing the head slice 'h' and then the tail 't'.
let payload_bytes = {
let mut written = 0; // This will track how many bytes we've successfully written.
let mut limit = max_data; // The adjusted payload size after accounting for the TCP header space.
// Write the first part of the payload ('h') until we hit the limit or run out of 'h'.
let p1l = std::cmp::min(limit, h.len()); // 'p1l' is how much we can write from 'h'.
written += unwritten.write(&h[..p1l])?; // Write and update 'written' with bytes written.
limit -= written; // Reduce the limit by what's been written so far.
// Continue with 't', writing as much as we can after 'h' has been accounted for.
let p2l = std::cmp::min(limit, t.len()); // 'p2l' is how much we can write from 't'.
written += unwritten.write(&t[..p2l])?; // Write 't' to the buffer and update the count.
written // Return total written payload size.
};
// Calculate the end of the payload to know where to stop writing to the buffer.
let payload_ends_at = buf_len - unwritten.len();
// Compute the TCP checksum which ensures data integrity over the network.
// The checksum needs the IP header and the payload to be calculated correctly.
self.tcp.checksum = self
.tcp
.calc_checksum_ipv4(&self.ip, &buf[tcp_header_ends_at..payload_ends_at])
.expect("failed to compute checksum");
// Write the TCP header after the checksum calculation as it's now complete with all required information.
let mut unwritten = &mut buf[ip_header_ends_at..]; // Update the unwritten reference to start at the end of the IP header.
self.tcp.write(&mut unwritten); // Write the TCP header into the designated space in the buffer.
// After filling the buffer with the IP header, TCP header, and payload, we send it via the 'nic' interface.
let write_len = nic.send(&buf[..payload_ends_at])?;
// The 'send' method on 'nic' returns the number of bytes written to the network interface,
// which should match the amount of data we wanted to send (from IP header start to the end of the payload).
// This count is important for tracking successful data transmission.
// Return the size of the payload actually written as the result of this method.
Ok(payload_bytes)
}
使用 write 方法后,我们可以继续使用对 write 函数的单个调用来替换写入缓冲区的逻辑,如下所示 c.write(nic, c.send.nxt, 0)?;
on_packet
方法所做的第一件事是 验证序号, on_packet
方法将 [sequence] 序号 视为由 TCP 序列空间定义的圆上的位置。为了避免数字溢出引起的复杂情况,可以使用 Rust 的 wrapping_add
函数,允许序列在达到 32 位整数上限后优雅地翻转到 0。这种机制很像里程表在达到最大显示极限后翻转,确保里程跟踪顺利进行,不会出现故障。
包装算法的实用性对于维持 TCP 协议的可靠性至关重要。在我们比喻的数字圆圈中,“回绕”表示从上边界到起点的过渡,导致永无止境的循环。转换到 TCP 的 32 位环境中,这个圆圈会膨胀以容纳超过 40 亿个可能的数字。 Rust 的 wrapping_add
为序号带来了这种循环逻辑,允许 TCP 保持发送和接收的字节的连贯叙述,而不管实际的序号如何,从而确保连续性和准确性。
为了掌握 TCP 领域中这一点的相关性,我们必须了解序号如何表示位置和历史。当数据包通过本质上不可靠的网络传输时,它们可能会被打乱或延迟到达。截断算法(wrapping arithmetic) 保留了该顺序的完整性,将 序号 空间视为时钟,其中“1”自然地出现在“12”之后,就像“11”一样。正是这种序号比较的可靠性支撑了 TCP 有序、无损数据传输的承诺。
From the RFC on Sequence Numbers
来自关于序号的 RFC设计中的一个基本概念是通过 TCP 连接发送的每个八位字节数据都有一个序号。由于每个八位位组都是有序的,因此每个八位位组都可以被确认。所采用的确认机制是累积的,因此序号X的确认表明已经接收到直到但不包括X的所有八位位组。该机制允许在存在重传的情况下进行直接的重复检测。段内八位位组的编号是紧跟在报头之后的第一个数据八位位组的编号最低,随后的八位位组连续编号。
“ 段有效性检查”是TCP可靠传送保证的另一个支柱。它确保接收到的段是接收者期望的当前会话的一部分。如果一个段的序列号在接收方期望新数据到达的窗口内,则该段被认为是有效的。对于零长度段,规则略有不同;如果它落在接收窗口内或者如果它是窗口已满时的下一个预期序列号,则它是可接受的。
在 on_packet
方法中,使用 is_between_wrapped
函数将接收到的数据包的序号与预期范围进行检查,该函数使用截断算法(wrapping arithmetic)来确定序列号是否落在预期窗口内。这对于具有高延迟或快速传输大量数据的网络尤其重要,因为序列号可能会频繁回绕。
一旦确认了段有效,并且如果该段被确认(ACK),则该方法继续相应地调整连接的状态。如果处于SynRcvd
状态并且 ACK 可接受,则连接状态更改为Estab
(已建立)。如果连接已建立,并且该段正在确认新的内容,则更新 send.una
(未确认的序列号),并且如果连接处于Estab
状态,则通过发送 FIN 段来终止连接,过渡到FinWait1
。
此外,当在FinWait2
期间接收到FIN段时,指示另一端已完成发送数据,连接状态转变为TimeWait
,结束连接终止阶段。这种优雅的终止 可确保两端 在完全关闭连接之前都有机会确认已接收到所有传输的数据。
辅助代码
FIN 段是一种 TCP 数据包,用于指示发送方已完成发送数据。 TCP是 双向通信协议,这意味着 每个方向都必须独立关闭。当连接的一端没有更多数据要发送时,它会发送一个 FIN 段,另一端对此进行确认。只有当两端都交换并确认了FIN报文段后,连接才被视为完全关闭。此过程确保双方有机会在连接终止之前完全完成数据传输。
**
逐步演练 on_packet
方法检查:**
我们实现中的 on_packet
方法是关于数据包处理的橡胶与道路的交汇处。调用此方法时,首要任务是执行一系列检查:
序号有效性:这是第一个检查,确保该段落在我们期望接下来接收的窗口内。考虑到 wrapping,将段的序列号与接收器的预期序列号进行比较。
段长度和窗口计算:该方法计算段的长度,并使用截断算法(wrapping arithmetic)来确定段的数据是否适合已建立的窗口(当前可用于新数据的连接缓冲区空间部分)。
可接受的确认检查:这里,该方法根据之前发送的内容验证接收到的段中的确认号是否是应该采取行动或忽略的确认号。
Zero-Length Segment and Window Checks:
零长度段和窗口检查:对于不携带数据的段,有特定的规则根据序列号和当前窗口大小确定其可接受性。
Connection State Transitions:
连接状态转换:根据收到的 ACK 和当前状态,连接可能会在状态之间转换 - 在收到有效 ACK 时从“SYN 已接收”转变为“已建立”,或者通过进入“FIN 等待”状态而走向连接拆除FIN 发送并确认后。
Data Reception:
数据接收:如果段有效并且状态已建立,则该方法将序号更新为下一个预期字节,准备连接以接收更多数据。
Reset Handling with send_rst
:
使用 send_rst
进行重置处理:如果收到不可接受的段,或者由于其他原因需要中止连接,则 send_rst
方法会使用正确的序列号和基于连接同步状态和传入分段详细信息的标志。
/// Processes a TCP packet based on the current connection state and sequence numbers.
/// It's part of a larger TCP state machine implementation.
pub fn on_packet<'a>(
&mut self,
nic: &mut tun_tap::Iface, // A mutable reference to the network interface.
iph: etherparse::Ipv4HeaderSlice<'a>, // The IPv4 header of the received packet.
tcph: etherparse::TcpHeaderSlice<'a>, // The TCP header of the received packet.
data: &'a [u8], // The payload data of the packet.
) -> io::Result<()> {
// Sequence number validation as per RFC 793, Section 3.3.
let seqn = tcph.sequence_number(); // The sequence number of the TCP segment.
let mut slen = data.len() as u32; // Length of the data payload in bytes.
// Adjust sequence length for FIN and SYN flags, which consume a sequence number each.
if tcph.fin() {
slen += 1; // FIN flag indicates end of data, consumes one sequence number.
};
if tcph.syn() {
slen += 1; // SYN flag indicates start of sync, consumes one sequence number.
};
// Calculate the window end using wrapping addition to handle potential overflow.
let wend = self.recv.nxt.wrapping_add(self.recv.wnd as u32); // The end of the receiver's window.
// Check for the validity of the segment length and sequence number.
let okay = if slen == 0 {
// Rules for acceptance of zero-length segments.
if self.recv.wnd == 0 {
seqn == self.recv.nxt // Accept only if sequence number equals the next expected number.
} else {
// Accept if within window and sequence space.
is_between_wrapped(self.recv.nxt.wrapping_sub(1), seqn, wend)
}
} else {
// Non-zero length segment rules.
if self.recv.wnd == 0 {
false // Do not accept any data if window is closed.
} else {
// Check if segment falls within the receive window.
is_between_wrapped(self.recv.nxt.wrapping_sub(1), seqn, wend) || is_between_wrapped(self.recv.nxt.wrapping_sub(1), seqn.wrapping_add(slen - 1), wend)
}
};
// Respond with an ACK with the correct sequence number if the segment is not acceptable.
if !okay {
// Send a response with an ACK for the expected sequence number.
self.write(nic, self.send.nxt, 0)?;
return Ok(());
}
// Update the next expected sequence number after accepting valid data.
self.recv.nxt = seqn.wrapping_add(slen);
// If the TCP header does not have an ACK, no further processing is necessary.
if !tcph.ack() {
return Ok(());
}
// Handling ACKs in different states
let ackn = tcph.acknowledgment_number(); // The acknowledgment number from the TCP header.
// If the connection is in 'SYN-RECEIVED' state and the ACK is valid, move to 'ESTABLISHED'.
if let State::SynRcvd = self.state {
if is_between_wrapped(self.send.una.wrapping_sub(1), ackn, self.send.nxt.wrapping_add(1)) {
// Transition to the established state.
self.state = State::Estab;
} else {
// If the ACK is not within the expected range, a reset might be sent here.
// [RFC 793 requires a RST for an unacceptable ACK while in SYN-RECEIVED]
// TODO: Implement RST sending as per RFC specification.
}
}
// If the connection is already established or in a state waiting for a finalization,
// it processes the acknowledgment accordingly.
if let State::Estab | State::FinWait1 | State::FinWait2 = self.state {
if !is_between_wrapped(self.send.una, ackn, self.send.nxt.wrapping_add(1)) {
// If ACK is not valid, no action is taken.
return Ok(());
}
// Update the least unacknowledged byte after a valid ACK is received.
self.send.una = ackn;
// TODO: Implement logic to handle new ACKs and sending out any data that has been queued but not yet sent.
assert!(data.is_empty()); // Assert no data is present for these states as per this implementation.
// Transition to initiate connection termination by sending a FIN.
if let State::Estab = self.state {
// Terminate the connection with a FIN.
self.tcp.fin = true; // Set FIN flag to indicate no more data will be sent.
self.write(nic, self.send.nxt, 0)?; // Send the FIN packet.
self.state = State::FinWait1; // Move to the state waiting for the FIN acknowledgment.
}
}
// In 'FIN-WAIT-1' state, check if the FIN has been acknowledged.
if let State::FinWait1 = self.state {
if self.send.una == self.send.iss + 2 {
// Confirm the peer has acknowledged our FIN.
self.state = State::FinWait2; // Transition to 'FIN-WAIT-2' state.
}
}
// If a FIN is received, handle it based on the current state.
if tcph.fin() {
match self.state {
// If in 'FIN-WAIT-2', the receipt of a FIN indicates the other side has finished sending.
State::FinWait2 => {
// Finalize the connection closure process.
self.write(nic, self.send.nxt, 0)?; // Acknowledge the received FIN.
self.state = State::TimeWait; // Enter 'TIME-WAIT' state.
// TODO: Implement the TIME-WAIT state duration and closing procedures.
}
_ => { /* States not expecting a FIN do not process it here. */ }
}
}
// All cases either lead to a state transition or a write operation (or both),
// demonstrating the intended flow of TCP connection state management.
Ok(()) // Indicate successful processing of the incoming TCP packet.
}
fn wrapping_lt(lhs: u32, rhs: u32) -> bool {
// From RFC1323:
// TCP determines if a data segment is "old" or "new" by testing
// whether its sequence number is within 2**31 bytes of the left edge
// of the window, and if it is not, discarding the data as "old". To
// insure that new data is never mistakenly considered old and vice-
// versa, the left edge of the sender's window has to be at most
// 2**31 away from the right edge of the receiver's window.
lhs.wrapping_sub(rhs) > (1 << 31)
}
fn is_between_wrapped(start: u32, x: u32, end: u32) -> bool {
wrapping_lt(start, x) && wrapping_lt(x, end)
}
fn send_rst(&mut self, nic: &mut tun_tap::Iface) -> io::Result<()> {
self.tcp.rst = true;
// TODO: fix sequence numbers here
// If the incoming segment has an ACK field, the reset takes its
// sequence number from the ACK field of the segment, otherwise the
// reset has sequence number zero and the ACK field is set to the sum
// of the sequence number and segment length of the incoming segment.
// The connection remains in the same state.
//
// TODO: handle synchronized RST
// 3. If the connection is in a synchronized state (ESTABLISHED,
// FIN-WAIT-1, FIN-WAIT-2, CLOSE-WAIT, CLOSING, LAST-ACK, TIME-WAIT),
// any unacceptable segment (out of window sequence number or
// unacceptible acknowledgment number) must elicit only an empty
// acknowledgment segment containing the current send-sequence number
// and an acknowledgment indicating the next sequence number expected
// to be received, and the connection remains in the same state.
self.tcp.sequence_number = 0;
self.tcp.acknowledgment_number = 0;
self.write(nic, self.send.nxt, 0)?;
Ok(())
}
TODO - 结束本章,显示运行此代码的结果