本文为《深入理解Linux网络》学习笔记,使用的Linux源码版本是3.10,网卡驱动默认采用的都是Intel的igb网卡驱动
Linux源码在线阅读:https://elixir.bootlin.com/linux/v3.10/source
从开发者的角度来看,调用socket函数可以创建一个socket
等这个socket函数调用执行完之后,用户层面看到返回的是一个整数型的句柄,但其实内核在内部创建了一系列socket相关的内核对象。它们互相之间的关系如下图所示:
// net/socket.c
SYSCALL_DEFINE3(socket, int, family, int, type, int, protocol)
{
...
retval = sock_create(family, type, protocol, &sock);
...
}
sock_create是创建socket的主要位置,其中sock_create又调用了__sock_create
// net/socket.c
int __sock_create(struct net *net, int family, int type, int protocol,
struct socket **res, int kern)
{
int err;
struct socket *sock;
const struct net_proto_family *pf;
...
// 分配socket对象
sock = sock_alloc();
...
// 获得每个协议族的操作表
pf = rcu_dereference(net_families[family]);
...
// 调用指定协议族的创建函数,对于AF_INET对应的是inet_create
err = pf->create(net, sock, protocol, kern);
...
}
在__sock_create
里,首先调用sock_alloc来分配一个struct socket内核对象,接着获取协议族的操作函数表,并调用其create方法。对于AF_INET协议族来说,执行到的是inet_create方法
// net/ipv4/af_inet.c
static struct inet_protosw inetsw_array[] =
{
{
.type = SOCK_STREAM,
.protocol = IPPROTO_TCP,
.prot = &tcp_prot,
.ops = &inet_stream_ops,
.no_check = 0,
.flags = INET_PROTOSW_PERMANENT |
INET_PROTOSW_ICSK,
},
...
};
static int inet_create(struct net *net, struct socket *sock, int protocol,
int kern)
{
struct sock *sk;
struct inet_protosw *answer;
struct inet_sock *inet;
struct proto *answer_prot;
...
list_for_each_entry_rcu(answer, &inetsw[sock->type], list) {
err = 0;
if (protocol == answer->protocol) {
if (protocol != IPPROTO_IP)
break;
} else {
if (IPPROTO_IP == protocol) {
protocol = answer->protocol;
break;
}
if (IPPROTO_IP == answer->protocol)
break;
}
err = -EPROTONOSUPPORT;
}
...
// 将inet_stream_ops赋到socket->ops上
sock->ops = answer->ops;
// 获取tcp_prot
answer_prot = answer->prot;
...
// 分配sock对象,并把tcp_prot赋到sock->sk_prot上
sk = sk_alloc(net, PF_INET, GFP_KERNEL, answer_prot);
...
// 对sock对象进行初始化
sock_init_data(sock, sk);
...
}
在inet_create中,根据类型SOCK_STREAM查找到对于TCP定义的操作方法实现集合inet_stream_ops和tcp_prot,并把它们分别设置到socket->ops和sock->sk_prot上,如下图所示:
再往下看到了sock_init_data。在这个方法中将socket中的sk_data_ready函数指针进行了初始化,设置为默认sock_def_readable,如下图所示:
// net/core/sock.c
void sock_init_data(struct socket *sock, struct sock *sk)
{
...
sk->sk_data_ready = sock_def_readable;
sk->sk_write_space = sock_def_write_space;
sk->sk_error_report = sock_def_error_report;
...
}
当软中断上收到数据包时通过调用sk_data_ready函数指针(实际被设置成了sock_def_readable())来唤醒在socket上等待的进程。后面讲到“软中断模块”时可以看到这一过程
至此,一个tcp对象,确切的说是AF_INET协议族下SOCKET_STREAM对象就算创建完成了。这里花费了一次socket系统调用的开销
阻塞IO模型:
当用户线程发出IO请求之后,内核会去查看数据是否就绪,如果没有就绪就会等待数据就绪,而用户线程就会处于阻塞状态,用户线程交出CPU。当数据就绪之后,内核会将数据拷贝到用户线程,并返回结果给用户线程,用户线程才解除block状态
在同步阻塞IO模型中,先是用户进程发起创建socket的指令,然后切换到内核态完成了内核对象的初始化。接下来,Linux在数据包的接收上,是硬中断和ksoftirqd线程在进行处理。当ksoftirqd线程处理完以后,再通知相关的用户进程
从用户进程创建socket,到一个网络包抵达网卡被用户进程接收,同步阻塞IO总体上的流程如下图所示:
clib库recv函数会执行recvform系统调用。进入系统调用后,用户进程就进入了内核态,执行一系列的内核协议层函数,然后到socket对象的接收队列中查看是否有数据,没有的话就把是自己添加到socket对应的等待队列里。最后让出CPU,操作系统会选择下一个就绪状态的进程来执行。整个流程如下图所示:
接下来根据源码来看更具体的细节。其中要关注的重点是recvfrom最后是怎么把自己的进程阻塞掉的
// net/socket.c
SYSCALL_DEFINE6(recvfrom, int, fd, void __user *, ubuf, size_t, size,
unsigned int, flags, struct sockaddr __user *, addr,
int __user *, addr_len)
{
struct socket *sock;
...
// 根据用户传入的fd找到socket对象
sock = sockfd_lookup_light(fd, &err, &fput_needed);
...
err = sock_recvmsg(sock, &msg, size, flags);
...
}
接下来的调用顺序为:sock_recvmsg => __sock_recvmsg
=> __sock_recvmsg_nosec
// net/socket.c
static inline int __sock_recvmsg_nosec(struct kiocb *iocb, struct socket *sock,
struct msghdr *msg, size_t size, int flags)
{
...
return sock->ops->recvmsg(iocb, sock, msg, size, flags);
}
调用socket对象ops里的recvmsg,recvmsg指向的是inet_recvmsg方法
// net/ipv4/af_inet.c
int inet_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
size_t size, int flags)
{
...
err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT,
flags & ~MSG_DONTWAIT, &addr_len);
...
}
这里又遇到一个函数指针,这次调用的是socket对象里sk_prot下的recvmsg方法,recvmsg方法对应的是tcp_recvmsg方法
// net/ipv4/tcp.c
int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
size_t len, int nonblock, int flags, int *addr_len)
{
...
int copied = 0;
...
do {
...
// 遍历接收队列接收数据
skb_queue_walk(&sk->sk_receive_queue, skb) {
...
}
...
if (copied >= target) {
release_sock(sk);
lock_sock(sk);
} else // 没有收到足够数据,启用sk_wait_data阻塞当前进程
sk_wait_data(sk, &timeo);
...
} while (len > 0);
...
}
终于看到了我们想要看的内容,skb_queue_walk在访问sock对象下的接收队列,如下图所示:
如果没有收到数据,或者收到的不够多,则调用sk_wait_data把当前进程阻塞掉
// net/core/sock.c
int sk_wait_data(struct sock *sk, long *timeo)
{
int rc;
// 当前进程(current)关联到所定义的等待队列项上
DEFINE_WAIT(wait);
// 调用sk_sleep获取sock对象下的wait
// 并准备挂起,将当前进程设置为可打断(INTERRUPTIBLE)
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
set_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
// 通过调用schedule_timeout让出CPU,然后进行睡眠
rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue));
...
}
下面再来详细看看sk_wait_data是怎样把当前进程给阻塞掉的,如下图所示:
首先在DEFINE_WAIT宏下,定义了一个等待队列项wait。在这个新的等待队列项上,注册了回调函数autoremove_wake_function,并把当前进程描述符current关联到其.private
成员上
// include/linux/wait.h
#define DEFINE_WAIT_FUNC(name, function) \
wait_queue_t name = { \
.private = current, \
.func = function, \
.task_list = LIST_HEAD_INIT((name).task_list), \
}
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
紧接着在sk_wait_data中调用sk_sleep获取socket对象下的等待队列列表头wait_queue_head_t。sk_sleep源码如下:
// include/net/sock.h
static inline wait_queue_head_t *sk_sleep(struct sock *sk)
{
BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
return &rcu_dereference_raw(sk->sk_wq)->wait;
}
接着调用prepare_to_wait来把新定义的等待队列项wait插入sock对象的等待队列
// kernel/wait.c
void
prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
unsigned long flags;
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list))
__add_wait_queue(q, wait);
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags);
}
这样后面当内核收完数据产生就绪事件的时候,就可以查找socket等待队列上的等待项,进而可以找到回调函数和在等待该socket就绪事件的进程了
最后调用sk_wait_event让出CPU,进程将进入睡眠状态,这会产生一次进程上下文切换的开销,这个开销是昂贵的,大约需要消耗几个微妙的CPU时间
前文讲到了网络包到网卡后是怎么被网卡接收,最后再交由软中断处理的,这里直接从TCP协议的接收函数tcp_v4_rcv看起,总体接收流程如下图所示:
软中断(也就是Linux里的ksoftirqd线程)里收到数据以后,发现是TCP包就会执行tcp_v4_rcv函数。接着往下,如果是ESTABLISH状态下的数据包,则最后会把数据拆出来放到对应socket的接收队列中,然后调用sk_data_ready来唤醒用户进程
// net/ipv4/tcp_ipv4.c
int tcp_v4_rcv(struct sk_buff *skb)
{
...
// 获取tcp header
th = tcp_hdr(skb);
// 获取ip header
iph = ip_hdr(skb);
...
// 根据数据包header中的IP、端口信息查找到对应的socket
sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest);
...
// socket未被用户锁定
if (!sock_owned_by_user(sk)) {
...
{
if (!tcp_prequeue(sk, skb))
ret = tcp_v4_do_rcv(sk, skb);
}
}
...
}
在tcp_v4_rcv中,首先根据收到的网络包的header里的source和dest信息在本机上查询对应的socket。找到以后,调用tcp_v4_do_rcv函数
// net/ipv4/tcp_ipv4.c
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
{
...
if (sk->sk_state == TCP_ESTABLISHED) {
...
// 执行连接状态下的数据处理
if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len)) {
rsk = sk;
goto reset;
}
return 0;
}
// 其他非ESTABLISH状态的数据包处理
...
}
假设处理的是ESTABLISH状态下的包,这样就进入tcp_rcv_established函数进行处理
// net/ipv4/tcp_input.c
int tcp_rcv_established(struct sock *sk, struct sk_buff *skb,
const struct tcphdr *th, unsigned int len)
{
...
// 接收数据放到队列中
eaten = tcp_queue_rcv(sk, skb, tcp_header_len,
&fragstolen);
...
// 数据准备好,唤醒socket上阻塞掉的进程
sk->sk_data_ready(sk, 0);
...
}
在tcp_rcv_established中通过调用tcp_queue_rcv函数,完成了将接收到的数据放到socket的接收队列上,如下图所示:
函数tcp_queue_rcv的源码如下:
// net/ipv4/tcp_input.c
static int __must_check tcp_queue_rcv(struct sock *sk, struct sk_buff *skb, int hdrlen,
bool *fragstolen)
{
...
// 把接收到的数据放到socket的接收队列的尾部
if (!eaten) {
__skb_queue_tail(&sk->sk_receive_queue, skb);
skb_set_owner_r(skb, sk);
}
return eaten;
}
调用tcp_queue_rcv接收完成之后,接着调用sk_data_ready来唤醒在socket上等待的用户进程。这又是一个函数指针。在前面”socket的直接创建“的部分,讲到在创建socket的流程里执行到的sock_init_data函数已经把sk_data_ready指针设置成了sock_def_readable函数了。它是默认的数据就绪处理函数
// net/core/sock.c
static void sock_def_readable(struct sock *sk, int len)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
// 有进程在此socket的等待队列
if (wq_has_sleeper(wq))
// 唤醒等待队列上的进程
wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
POLLRDNORM | POLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
}
在sock_def_readable中再一次访问到了sock->sk_wq下的wait。在前面”等待接收消息“的部分调用recvform时,在执行过程的最后,通过DEFINE_WAIT(wait)
将当前进程关联的等待队列添加到sock->sk_wq下的wait里了
那接下来就是调用wake_up_interruptible_sync_poll来唤醒在socket上因为等待数据而被阻塞掉的进程了,如下图所示:
// include/linux/wait.h
#define wake_up_interruptible_sync_poll(x, m) \
__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
// kernel/sched/core.c
void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)
{
unsigned long flags;
int wake_flags = WF_SYNC;
if (unlikely(!q))
return;
if (unlikely(!nr_exclusive))
wake_flags = 0;
spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, wake_flags, key);
spin_unlock_irqrestore(&q->lock, flags);
}
__wake_up_common
实现唤醒。该函数调用的参数nr_exclusive传入的是1,这里指的是即使有多个进程都阻塞在同一个socket上,也只会唤醒一个进程。其作用是为了避免惊群,而不是把所有的进程都唤醒
// kernel/sched/core.c
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
在__wake_up_common
中找出一个等待队列项curr,然后调用其curr->func。在前面”等待接收消息“的部分recv函数执行的时候,使用DEFINE_WAIT()定义等待队列项时,内核把curr->func设置成了autoremove_wake_function
// kernel/wait.c
int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int ret = default_wake_function(wait, mode, sync, key);
if (ret)
list_del_init(&wait->task_list);
return ret;
}
在autoremove_wake_function中,调用了default_wake_function
// kernel/sched/core.c
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
void *key)
{
return try_to_wake_up(curr->private, mode, wake_flags);
}
调用try_to_wake_up时传入的task_struct是curr->private,这个就是因为等待而被阻塞的进程项。当这个函数执行完的时候,在socket上等待而被阻塞的进程就被推入可运行队列里了,这又将产生一次进程上下文切换的开销
同步阻塞方式接收网络包的整个过程分为两部分:
同步阻塞总体流程如下图所示:
每次一个进程专门为了等一个socket上的数据就被从CPU上拿下来,然后换上另一个进程,如下图所示。等到数据准备好,睡眠的进程又被唤醒,总共产生两次进程上下文切换开销
推荐阅读:
Linux五种I/O模型:带你彻底理解Linux五种I/O模型