什么是惊群效应
惊群现象就是多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),如果等待的这个事件发生,那么他就会唤醒等待的所有进程(或者线程),但是最终却只可能有一个进程(线程)获得这个时间的“控制权”,对该事件进行处理,而其他进程(线程)获取“控制权”失败,只能重新进入休眠状态,这种现象和性能浪费就叫做惊群。
为了更好的理解何为惊群,举一个很简单的例子,当你往一群鸽子中间扔一粒谷子,所有的各自都被惊动前来抢夺这粒食物,但是最终注定只可能有一个鸽子满意的抢到食物,没有抢到的鸽子只好回去继续睡觉,等待下一粒谷子的到来。这里鸽子表示进程(线程),那粒谷子就是等待处理的事件。
惊群效应到底消耗了什么
我想你应该也会有跟我一样的问题,那就是惊群效应到底消耗了什么?
(1)系统对用户进程/线程频繁地做无效的调度,上下文切换系统性能大打折扣。
(2)为了确保只有一个线程得到资源,用户必须对资源操作进行加锁保护,进一步加大了系统开销。
是不是还是觉得不够深入,概念化?看下面:
(1)上下文切换(context switch)过高会导致cpu像个搬运工,频繁地在寄存器和运行队列之间奔波,更多的时间花在了进程(线程)切换,而不是在真正工作的进程(线程)上面。直接的消耗包括cpu寄存器要保存和加载(例如程序计数器)、系统调度器的代码需要执行。间接的消耗在于多核cache之间的共享数据。
(2)通过锁机制解决惊群效应是一种方法,在任意时刻只让一个进程(线程)处理等待的事件。但是锁机制也会造成cpu等资源的消耗和性能损耗。目前一些常见的服务器软件有的是通过锁机制解决的,比如nginx(它的锁机制是默认开启的,可以关闭);还有些认为惊群对系统性能影响不大,没有去处理,比如lighttpd。
Google了一下:其实在linux2.6版本以后,linux内核已经解决了accept函数的“惊群”现象,大概的处理方式就是,当内核接收到一个客户连接后,只会唤醒等待队列上的第一个进程(线程),所以如果服务器采用accept阻塞调用方式,在最新的linux系统中已经没有“惊群效应”了。
看了下kernel代码,发现2.6确实已经解决了,那2.6之前的内核都有惊群问题吗?其实不是的,下载了几个内核版本,通过代码得出结论(没实际验证),情况如下:
linux-2.2.2 会唤醒等待队列所有进程
linux-2.3.13 只会唤醒第一个进程
linux-2.4.34只会唤醒第一个进程
linux-2.5.27只会唤醒第一个进程
所以应该是从linux-2.2.2后的某个内核版本修复的,但具体版本号还不太清楚。
accept惊群问题未解决时的代码流程
接下来看看linux-2.2.2版本中堵塞唤醒代码流程,了解为什么会唤醒所有进程?为什么会出现惊群现象?
用户程序调用accept时,会调到内核态中的tcp_accept,如果没有设置非堵塞,会调用wait_for_connect(sk, &prev)堵塞进程,等待事件到来。
- static struct open_request * wait_for_connect(struct sock * sk,
- struct open_request **pprev)
- {
- struct wait_queue wait = { current, NULL };
- struct open_request *req;
-
- //将wait添加到sk的等待队列头sleep中
- //假如fork了10个进程,每个进程都调用accept,则会将10个进程的wait添加到sk的等待队列头 sleep上。
- add_wait_queue(sk->sleep, &wait);
- for (;;) {
- current->state = TASK_INTERRUPTIBLE;
- //释放锁。
- release_sock(sk);
- //进程调度,执行其他进程,直到被唤醒
- schedule();
- //如果事件发生后,这个10个进程都会被被唤醒,从此处开始执行,
- //首先获取锁。
- //即10个进程只有一个进程才能获取锁成功,继而获取新连接成功
- lock_sock(sk);
- //第一个获取到锁的进程也会获取到新连接,跳出循环,继续执行。
- //其他进程没获取锁的继续执行调度,等待新连接的到来
- //所以这10个进程可以循环接收新连接
- req = tcp_find_established(&(sk->tp_pinfo.af_tcp), pprev);
- if (req)
- break;
- if (signal_pending(current))
- break;
- }
- //进程被唤醒并且获取连接后,将本进程从等待队列中删除。
- current->state = TASK_RUNNING;
- remove_wait_queue(sk->sleep, &wait);
- return req;
- }
- extern inline void add_wait_queue(struct wait_queue ** p, struct wait_queue * wait)
- {
- unsigned long flags;
- //添加到队列头部,并且没有exclusive标志
- write_lock_irqsave(&waitqueue_lock, flags);
- __add_wait_queue(p, wait);
- write_unlock_irqrestore(&waitqueue_lock, flags);
- }
tcp三次握手完成后,需要唤醒accept进程,流程如下:
- tcp_v4_do_rcv->tcp_v4_hnd_req->tcp_check_req->syn_recv_sock(tcp_v4_syn_recv_sock)->sk->data_ready(sk, 0)->sock_def_readable->wake_up_interruptible(sk->sleep);
- #define wake_up_interruptible(x) __wake_up((x),TASK_INTERRUPTIBLE)
- void __wake_up(struct wait_queue **q, unsigned int mode)
- {
- struct wait_queue *next;
-
- read_lock(&waitqueue_lock);
- if (q && (next = *q)) {
- struct wait_queue *head;
- //遍历sk等待队列中的所有进程并唤醒,此例中会唤醒10个进程。
- head = WAIT_QUEUE_HEAD(q);
- while (next != head) {
- struct task_struct *p = next->task;
- next = next->next;
- if (p->state & mode)
- wake_up_process(p);
- }
- }
- read_unlock(&waitqueue_lock);
- }
疑问:在惊群问题未解决时,新连接到来后,只有一个后返回accept成功,其余的九个会有返回值吗?还是在内核态继续执行调度等待其他新连接的到来?
应该是其余九个进程会被唤醒,但是不会有返回值,由于第一个被唤醒进程已经取走了新连接,这九个进程即使被唤醒,发现全连接队列为空,就会继续调度睡眠,等待新连接到来。
内核解决accept惊群后的代码流程
以内核版本3.17.89为例
用户程序调用accept时,会调到内核态中的inet_csk_accept,如果全连接队列为空,并且没有设置非堵塞,则会调用inet_csk_wait_for_connect(sk, timeo)堵塞进程,等待事件到来。
- static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
- {
- struct inet_connection_sock *icsk = inet_csk(sk);
- //初始化等待队列节点wait
- DEFINE_WAIT(wait);
- int err;
- /*
- * True wake-one mechanism for incoming connections: only
- * one process gets woken up, not the 'whole herd'.
- * Since we do not 'race & poll' for established sockets
- * anymore, the common case will execute the loop only once.
- *
- * Subtle issue: "add_wait_queue_exclusive()" will be added
- * after any current non-exclusive waiters, and we know that
- * it will always _stay_ after any new non-exclusive waiters
- * because all non-exclusive waiters are added at the
- * beginning of the wait-queue. As such, it's ok to "drop"
- * our exclusiveness temporarily when we get woken up without
- * having to remove and re-insert us on the wait queue.
- */
- for (;;) {
- //以上英文注释已经说清楚了,只有一个进程会被唤醒。
- //非exclusive的节点会被加在等待队列前面,excusive节点会被加在
- //所有非exclusive节点的后面。
- //此例中,等待队列前面有n个非exclusive节点,后面有10个exclusive节点
- prepare_to_wait_exclusive(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
- release_sock(sk);
- if (reqsk_queue_empty(&icsk->icsk_accept_queue))
- timeo = schedule_timeout(timeo);
- lock_sock(sk);
- err = 0;
- if (!reqsk_queue_empty(&icsk->icsk_accept_queue))
- break;
- err = -EINVAL;
- if (sk->sk_state != TCP_LISTEN)
- break;
- err = sock_intr_errno(timeo);
- if (signal_pending(current))
- break;
- err = -EAGAIN;
- if (!timeo)
- break;
- }
- //将等待节点从等待队列中删除
- finish_wait(sk_sleep(sk), &wait);
- return err;
- }
- #define DEFINE_WAIT_FUNC(name, function) \
- wait_queue_t name = { \
- .private = current, \
- .func = function, \
- .task_list = LIST_HEAD_INIT((name).task_list), \
- }
- //有新连接时,会调用autoremove_wake_function唤醒调用accept的进程,
- //并且将等待节点从等待队列中删除
- #define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
-
- int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
- {
- //唤醒堵塞的accept进程
- int ret = default_wake_function(wait, mode, sync, key);
- if (ret)
- //如果唤醒成功了,则将等待节点从等待队列中删除,这样下次有新连接,
- //就会唤醒另一个进程来处理。
- list_del_init(&wait->task_list);
- return ret;
- }
-
- void prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state)
- {
- unsigned long flags;
-
- //设置exclusive标志,表示一次只会有一个进程被唤醒。
- wait->flags |= WQ_FLAG_EXCLUSIVE;
- spin_lock_irqsave(&q->lock, flags);
- //把wait添加到等待队列的尾部。
- if (list_empty(&wait->task_list))
- __add_wait_queue_tail(q, wait);
- set_current_state(state);
- spin_unlock_irqrestore(&q->lock, flags);
- }
以上是accept的实现,继续看唤醒部分代码。
当有tcp连接完成,就会从半连接队列拷贝sock到全连接队列,这个时候就可以唤醒阻塞的accept了。代码如下:
- tcp_v4_do_rcv->tcp_child_process
- int tcp_child_process(struct sock *parent, struct sock *child,
- struct sk_buff *skb)
- {
- int ret = 0;
- int state = child->sk_state;
-
- if (!sock_owned_by_user(child)) {
- ret = tcp_rcv_state_process(child, skb, tcp_hdr(skb),
- skb->len);
- /* Wakeup parent, send SIGIO */
- if (state == TCP_SYN_RECV && child->sk_state != state)
- parent->sk_data_ready(parent);
- } else {
- /* Alas, it is possible again, because we do lookup
- * in main socket hash table and lock on listening
- * socket does not protect us more.
- */
- __sk_add_backlog(child, skb);
- }
-
- bh_unlock_sock(child);
- sock_put(child);
- return ret;
- }
- 调用sk_data_ready通知父socket,即监听socket,此函数对应sock_def_readable
- static void sock_def_readable(struct sock *sk)
- {
- struct socket_wq *wq;
-
- rcu_read_lock();
- //判断等待队列是否为空,不为空说明有进程在堵塞等待
- wq = rcu_dereference(sk->sk_wq);
- if (wq_has_sleeper(wq))
- wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
- POLLRDNORM | POLLRDBAND);
- sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
- rcu_read_unlock();
- }
- #define wake_up_interruptible_sync_poll(x, m) \
- __wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
-
- void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
- int nr_exclusive, void *key)
- {
- unsigned long flags;
- int wake_flags = 1; /* XXX WF_SYNC */
-
- if (unlikely(!q))
- return;
-
- if (unlikely(nr_exclusive != 1))
- wake_flags = 0;
-
- spin_lock_irqsave(&q->lock, flags);
- __wake_up_common(q, mode, nr_exclusive, wake_flags, key);
- spin_unlock_irqrestore(&q->lock, flags);
- }
-
- static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
- int nr_exclusive, int wake_flags, void *key)
- {
- wait_queue_t *curr, *next;
-
- //从头开始遍历等待队列。
- //等待队列头前面可能有N个非exclusive节点,先执行curr->func,但是flags中没有WQ_FLAG_EXCLUSIVE标志,则if条件不满足,不会执行--nr_exclusive,其值仍然为1
- //执行到第一个exclusive节点时,也是先执行curr->func,并且flags中有WQ_FLAG_EXCLUSIVE标志,继续执行--nr_exclusive,nr_exclusive变为0,取非后满足,if的三个条件都满足,则执行break,达到只唤醒一个进程的目的。
- list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
- unsigned flags = curr->flags;
-
- //nr_exclusive为1,执行一次就会为0,跳出循环,达到只唤醒一个进程的目的
- //被唤醒后,会在inet_csk_wait_for_connect中将wait从等待队列删除。
- //func为autoremove_wake_function,此函数不只唤醒进程还会将wait节点从队列删除
- if (curr->func(curr, mode, wake_flags, key) &&
- (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
- break;
- }
- }
解决惊群问题后的代码测试
client端代码
- root@master:~# cat client.c
- #include <sys/socket.h>
- #include <stdio.h>
- #include <errno.h>
- #include <unistd.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
-
- void main(void)
- {
- int fd, ret;
- struct sockaddr_in addr;
-
- fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if(fd < 0)
- {
- perror("socket create failed");
- return ;
- }
- addr.sin_family = AF_INET;
- addr.sin_port = htons(2222);
- addr.sin_addr.s_addr = inet_addr("192.168.122.20");
- ret = connect(fd, (const struct sockaddr *)&addr, sizeof(addr));
- if( ret != 0)
- {
- perror("socket connect1 failed");
- return ;
- }
- perror("socket connect succefsully\n");
- sleep(1000);
- }
server端代码
- root@master:~# cat server.c
- #include <stdio.h>
- #include <stdlib.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <sys/wait.h>
- #include <string.h>
- #include <netinet/in.h>
- #include <unistd.h>
- #include <errno.h>
-
- #define PROCESS_NUM 5
- int main()
- {
- int fd = socket(PF_INET, SOCK_STREAM, 0);
- int connfd;
- int pid;
-
- struct sockaddr_in serveraddr;
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
- serveraddr.sin_port = htons(2222);
- bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
- listen(fd, 1024);
-
- int i;
- for(i = 0; i < PROCESS_NUM; ++i){
- pid = fork();
- if(pid == 0){
- printf("I'm pid: %d\n", getpid());
- while(1){
- connfd = accept(fd, (struct sockaddr *)NULL, NULL);
- if(connfd != -1)
- {
- printf("process %d accept success\n", getpid());
- close(connfd);
- }else{
- printf("process %d accept a connection failed: %s\n", getpid(), strerror(errno));
- }
- }
- }
- }
- wait(0);
- return 0;
- }
server端创建block阻塞型的socket后,fork出10个进程,accept同一个fd。可以使用strace跟踪server端子进程执行结果,可看到每次client连接server只有一个进程被唤醒
- root@master:~# ./server
- I'm pid: 10794
- I'm pid: 10796
- I'm pid: 10795
- I'm pid: 10797
- I'm pid: 10798
- process 10794 accept success --->10794先调用accept,则会被先唤醒
- process 10796 accept success
- process 10795 accept success
- process 10797 accept success
- process 10798 accept success
client连接server五次,每次都有不同的进程来处理,处理事件的进程顺序由调用顺序决定。
- root@master:~# ./client
- socket connect succefsully
- : Success
- socket connect succefsully
- : Success
- socket connect succefsully
- : Success
- socket connect succefsully
- : Success
先说结论,poll/select是存在惊群问题的。下面用代码验证,再看看代码流程。
client代码仍然使用accept时的代码。下面是poll模式下server端代码
- root@master:~# cat poll_thunder.c
- #include<stdio.h>
- #include<stdlib.h>
- #include<sys/types.h>
- #include<sys/socket.h>
- #include<sys/wait.h>
- #include<string.h>
- #include<netinet/in.h>
- #include<unistd.h>
- #include <errno.h>
- #include <poll.h>
-
- #define PROCESS_NUM 10
-
- int main()
- {
- int fd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
- int connfd;
- int pid;
- int status = 1;
-
- char sendbuff[1024];
- struct sockaddr_in serveraddr;
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
- serveraddr.sin_port = htons(2222);
- bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
- listen(fd, 1024);
-
- int i, new_fd, ret=0;
- struct pollfd clientfds[1];
- for(i = 0; i < PROCESS_NUM; ++i){
- pid = fork();
- if(pid == 0){
- //printf("I'm pid: %d, poll on : %d\n", getpid(), fd);
- while (1) {
- printf("I'm pid: %d, poll on : %d\n", getpid(), fd);
- clientfds[0].fd = fd;
- clientfds[0].events=POLLIN;
- ret = poll(clientfds, 2, -1);
- if (ret < 0)
- {
- perror("poll error");
- }
- else if(ret == 0)
- {
- printf("poll timeout\n");
- continue;
- }
- if (clientfds[0].revents&POLLIN) {
- new_fd = accept(fd, (struct sockaddr *)NULL, NULL);;
- if(new_fd < 0)
- {
- printf("accept failed: %d on pid: %d\n", errno, getpid());
- printf("\n");
- continue;
- }
- printf("new read event: accept new_fd: %d on pid: %d\n", new_fd, getpid());
- close(new_fd);
- }
- printf("\n");
- if (clientfds[0].revents&POLLOUT) {
- printf("new write event in pid: %d\n", getpid());
- }
- if (clientfds[0].revents&POLLERR) {
- printf("new error event in pid: %d\n", getpid());
- }
- }
- }
- }
- //int status;
- wait(0);
- return 0;
- }
client连接server一次,可看到有四个进程被唤醒,只有一个进程接收了新连接。
- root@master:~# ./poll
- I'm pid: 18907, poll on : 3
- I'm pid: 18908, poll on : 3
- I'm pid: 18909, poll on : 3
- I'm pid: 18910, poll on : 3
- I'm pid: 18911, poll on : 3
- I'm pid: 18912, poll on : 3
- I'm pid: 18913, poll on : 3
- I'm pid: 18914, poll on : 3
- I'm pid: 18915, poll on : 3
- I'm pid: 18916, poll on : 3
- new read event: accept new_fd: 4 on pid: 18915 -->只有此进程接收新连接成功
- accept failed: 11 on pid: 18916
-
-
- I'm pid: 18915, poll on : 3
- I'm pid: 18916, poll on : 3
- accept failed: 11 on pid: 18907
-
- I'm pid: 18907, poll on : 3
- accept failed: 11 on pid: 18908
- I'm pid: 18908, poll on : 3
这里有一个问题,按说应该10个进程都被唤醒了,为什么只有四个进程执行到accept处?
因为进程被唤醒后,会调用目标文件的poll函数获取发生的事件通知用户程序,用户程序调用accept后,会将发生的事件清空。如果清空事件前,被唤醒的进程调用poll还会获取到发生的事件,用户程序再调用accept会返回失败。但是清空事件后,被唤醒的进程调用poll获取不到事件,也就不会通知用户程序,而是继续睡眠,这个情况通过log是看不到的,可以通过strace观察server。
或者在进程被唤醒后,在调用accept前sleep一段时间,让所有进程都有时间调用poll获取事件就会看到如下log,10个进程都被唤醒并且都调用accept,但是仍然只有一个进程能成功。
- root@master:~# ./poll
- I'm pid: 20130, poll on : 3
- I'm pid: 20131, poll on : 3
- I'm pid: 20132, poll on : 3
- I'm pid: 20133, poll on : 3
- I'm pid: 20135, poll on : 3
- I'm pid: 20136, poll on : 3
- I'm pid: 20137, poll on : 3
- I'm pid: 20138, poll on : 3
- I'm pid: 20134, poll on : 3
- I'm pid: 20139, poll on : 3
- //10个进程都被唤醒执行accept了,但是只有一个能成功接收新连接
- accept failed: 11 on pid: 20136
- new read event: accept new_fd: 4 on pid: 20133
- accept failed: 11 on pid: 20131
-
-
- I'm pid: 20136, poll on : 3
- I'm pid: 20131, poll on : 3
- accept failed: 11 on pid: 20139
-
- I'm pid: 20139, poll on : 3
- accept failed: 11 on pid: 20132
- I'm pid: 20133, poll on : 3
-
- accept failed: 11 on pid: 20135
- I'm pid: 20132, poll on : 3
- I'm pid: 20135, poll on : 3
- accept failed: 11 on pid: 20137
- accept failed: 11 on pid: 20130
-
-
- I'm pid: 20137, poll on : 3
- I'm pid: 20130, poll on : 3
- accept failed: 11 on pid: 20134
-
- I'm pid: 20134, poll on : 3
- accept failed: 11 on pid: 20138
- I'm pid: 20138, poll on : 3
用户调用poll堵塞进程流程
用户调用poll后,在内核态流程调用:
- do_sys_poll->do_poll->do_pollfd-> f.file->f_op->poll, 对于socket来说,poll函数为调用sock_poll
- static unsigned int sock_poll(struct file *file, poll_table *wait)
- return busy_flag | sock->ops->poll(file, sock, wait); //tcp_poll
-
- unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
- {
- unsigned int mask;
- struct sock *sk = sock->sk;
- const struct tcp_sock *tp = tcp_sk(sk);
-
- sock_rps_record_flow(sk);
-
- sock_poll_wait(file, sk_sleep(sk), wait);
- if (sk->sk_state == TCP_LISTEN)
- return inet_csk_listen_poll(sk);
- }
- //如果全连接队列不为空,则直接返回POLLIN事件
- static inline unsigned int inet_csk_listen_poll(const struct sock *sk)
- {
- return !reqsk_queue_empty(&inet_csk(sk)->icsk_accept_queue) ?
- (POLLIN | POLLRDNORM) : 0;
- }
-
- static inline void sock_poll_wait(struct file *filp,
- wait_queue_head_t *wait_address, poll_table *p)
- {
- if (!poll_does_not_wait(p) && wait_address) {
- poll_wait(filp, wait_address, p);
- /* We need to be sure we are in sync with the
- * socket flags modification.
- *
- * This memory barrier is paired in the wq_has_sleeper.
- */
- smp_mb();
- }
- }
- static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
- {
- //p->_qproc为__pollwait
- if (p && p->_qproc && wait_address)
- p->_qproc(filp, wait_address, p);
- }
- /* Add a new entry */
- static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
- {
- struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
- struct poll_table_entry *entry = poll_get_entry(pwq);
- if (!entry)
- return;
- entry->filp = get_file(filp);
- entry->wait_address = wait_address;
- entry->key = p->_key;
- init_waitqueue_func_entry(&entry->wait, pollwake);
- entry->wait.private = pwq;
- add_wait_queue(wait_address, &entry->wait);
- }
- void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
- {
- unsigned long flags;
- //将exclusive标志清除掉,这样有10个进程调用select,就会将10个进程添加到等待队列中。
- //因为没有exclusive,所以这10个进程都会被唤醒。
- wait->flags &= ~WQ_FLAG_EXCLUSIVE;
- spin_lock_irqsave(&q->lock, flags);
- __add_wait_queue(q, wait);
- spin_unlock_irqrestore(&q->lock, flags);
- }
唤醒进程流程
唤醒流程和accept中唤醒流程相同,只不过最后调用__wake_up_common时,因为poll添加到等待队列时,没有设置exclusive,所以所有进程都会被唤醒。被唤醒的进程再调用accept接收新连接,但是只有一个进程会成功,其余9个都返回错误EAGAIN。
还是先说结论:ET模式下不存在惊群问题,LT模式下存在。
下面通过代码验证,再分析实现流程。
- root@master:~# cat epoll_thunder.c
- #include<stdio.h>
- #include<stdlib.h>
- #include<sys/types.h>
- #include<sys/socket.h>
- #include<sys/wait.h>
- #include<string.h>
- #include<netinet/in.h>
- #include<unistd.h>
- #include <errno.h>
- #include<sys/epoll.h>
-
- #define MAXEVENTS 64
- #define PROCESS_NUM 10
-
- int main()
- {
- int fd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
- int connfd;
- int pid;
- int i, epoll_fd, new_fd, ret=0, num;
- struct epoll_event event;
- struct epoll_event *events;
-
- struct sockaddr_in serveraddr;
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
- serveraddr.sin_port = htons(2222);
- bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
- listen(fd, 1024);
-
- if ((epoll_fd = epoll_create(MAXEVENTS))< 0) {
- perror("epoll_create");
- exit(1);
- }
- event.data.fd = fd;
- event.events = EPOLLIN | EPOLLET;
- //event.events = EPOLLIN;
- if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) < 0){
- perror("epoll_ctl");
- exit(1);
- }
-
- events = calloc(MAXEVENTS, sizeof(event));
- for(i = 0; i < PROCESS_NUM; ++i) {
- pid = fork();
- if(pid == 0) {
- while (1) {
- printf("I'm pid: %d, epoll on : %d\n", getpid(), fd);
- num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
- if (num < 0) {
- printf("epoll_wait failed %d, on pid %d\n", errno, getpid());
- continue;
- }
- for(i = 0; i < num; ++i){
- if((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))){
- fprintf(stderr, "epoll error\n");
- close(events[i].data.fd);
- continue;
- }else if(fd == events[i].data.fd){
- new_fd = accept(fd, (struct sockaddr *)NULL, NULL);;
- if(new_fd < 0)
- {
- printf("accept failed: %d on pid: %d\n", errno, getpid());
- printf("\n");
- continue;
- }
- printf("new read event: accept new_fd: %d on pid: %d\n", new_fd, getpid());
- close(new_fd);
- }
- }
- }
- }
- }
- wait(0);
- return 0;
- }
在epoll_thunder.c中,如果events加上标志EPOLLET就是ET模式,不加的话默认是LT模式
- event.data.fd = fd;
- event.events = EPOLLIN | EPOLLET;
- epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event)
ET模式测试结果如下,始终只有一个进程接收新连接,并且是同一个进程。这是因为调用epoll_wait堵塞时,添加wait节点加到了ep等待队列头部,并且是exclusive的,而唤醒进程时总是从队列头部开始,由于设置了exclusive,所以只好唤醒一个进程。
为什么总是同一个进程呢?是因为第一个被唤醒的进程马上又调用epoll_wait将其再次加入等待队列头部,所以下次事件到来时仍然唤醒同一个进程。
- root@ubuntu:/home/jk/socket# ./epoll
- I'm pid: 119280, epoll on : 3
- I'm pid: 119281, epoll on : 3
- I'm pid: 119282, epoll on : 3
- I'm pid: 119285, epoll on : 3
- I'm pid: 119283, epoll on : 3
- I'm pid: 119287, epoll on : 3
- I'm pid: 119284, epoll on : 3
- I'm pid: 119286, epoll on : 3
- I'm pid: 119288, epoll on : 3
- I'm pid: 119289, epoll on : 3
-
- new read event: accept new_fd: 5 on pid: 119289
- I'm pid: 119289, epoll on : 3
- new read event: accept new_fd: 5 on pid: 119289
- I'm pid: 119289, epoll on : 3
- new read event: accept new_fd: 5 on pid: 119289
- I'm pid: 119289, epoll on : 3
下面将进程被唤醒后sleep一段时间20s,然后用client连接server,会发现新连接总是被等待队列头部的进程处理。
- root@master:~# ./epoll
- I'm pid: 27656, epoll on : 3
- I'm pid: 27657, epoll on : 3
- I'm pid: 27658, epoll on : 3
- I'm pid: 27659, epoll on : 3
- I'm pid: 27660, epoll on : 3
- I'm pid: 27661, epoll on : 3
- I'm pid: 27662, epoll on : 3
- I'm pid: 27663, epoll on : 3
- I'm pid: 27664, epoll on : 3
- I'm pid: 27665, epoll on : 3
- new read event: accept new_fd: 5 on pid: 27665 -->第一次连接被27665进程处理,然后sleep 20s
- new read event: accept new_fd: 5 on pid: 27664 -->第二次连接被27664进程处理,然后sleep 20s
- new read event: accept new_fd: 5 on pid: 27663 -->第三次连接被27663进程处理,然后sleep 20s
- I'm pid: 27665, epoll on : 3 -->进程27665 sleep结束,重新开始wait
- new read event: accept new_fd: 5 on pid: 27665 -->第四次连接又被27663进程处理
LT模式下,测试结果如下,从结果看,好像也是只有一个进程被唤醒了,但是实际上唤醒进程不只一个
- root@ubuntu:/home/jk/socket# ./epoll
- I'm pid: 119318, epoll on : 3
- I'm pid: 119319, epoll on : 3
- I'm pid: 119320, epoll on : 3
- I'm pid: 119324, epoll on : 3
- I'm pid: 119321, epoll on : 3
- I'm pid: 119326, epoll on : 3
- I'm pid: 119322, epoll on : 3
- I'm pid: 119323, epoll on : 3
- I'm pid: 119325, epoll on : 3
- I'm pid: 119327, epoll on : 3
-
- new read event: accept new_fd: 5 on pid: 119327
- I'm pid: 119327, epoll on : 3
- new read event: accept new_fd: 5 on pid: 119327
- I'm pid: 119327, epoll on : 3
- new read event: accept new_fd: 5 on pid: 119327
- I'm pid: 119327, epoll on : 3
LT模式下,如果添加sleep后,就会看到所有进程都被唤醒了。唤醒多少个进程和被唤醒进程处理时间长短有关系,如果处理越快,唤醒的进程越少。
- num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
- if (num < 0) {
- printf("epoll_wait failed %d, on pid %d\n", errno, getpid());
- continue;
- }
- sleep(2);
再次执行,可看到所有进程都被唤醒了
- root@ubuntu:/home/jk/socket# ./epoll
- I'm pid: 119480, epoll on : 3
- I'm pid: 119481, epoll on : 3
- I'm pid: 119482, epoll on : 3
- I'm pid: 119483, epoll on : 3
- I'm pid: 119484, epoll on : 3
- I'm pid: 119485, epoll on : 3
- I'm pid: 119486, epoll on : 3
- I'm pid: 119487, epoll on : 3
- I'm pid: 119488, epoll on : 3
- I'm pid: 119489, epoll on : 3
- new read event: accept new_fd: 5 on pid: 119489
- accept failed: 11 on pid: 119488
-
- I'm pid: 119488, epoll on : 3
- I'm pid: 119489, epoll on : 3
- accept failed: 11 on pid: 119487
-
- I'm pid: 119487, epoll on : 3
- accept failed: 11 on pid: 119486
- I'm pid: 119486, epoll on : 3
- accept failed: 11 on pid: 119485
-
- I'm pid: 119485, epoll on : 3
- accept failed: 11 on pid: 119484
- I'm pid: 119484, epoll on : 3
- accept failed: 11 on pid: 119483
-
- I'm pid: 119483, epoll on : 3
- accept failed: 11 on pid: 119482
- I'm pid: 119482, epoll on : 3
- accept failed: 11 on pid: 119481
-
- I'm pid: 119481, epoll on : 3
- accept failed: 11 on pid: 119480
- I'm pid: 119480, epoll on : 3
代码分析
调用epll_wait时,如果需要堵塞等待,则将调用进程加入到ep等待队列中,设置exclusive,并且添加到队列头部,如果有新连接到来,也只会唤醒一个进程。
- init_waitqueue_entry(&wait, current);
- __add_wait_queue_exclusive(&ep->wq, &wait);
这样看貌似epoll_wait已经解决了惊群问题,但在LT和ET模式下处理流程的差别导致了LT模式下惊群问题。
假如调用epoll_ctl将一个fd加入到epoll进行监听,会调用目标文件fd的poll函数,将wait节点(fd上有新事件发生时,调用ep_poll_callback唤醒监听进程)加入到目标文件fd的等待队列中,再fork十个进程调用epoll_wait等待事件到来,这样就会有10个进程堵塞在ep的等待队列中(如果有事件发生时,则调用default_wake_function唤醒堵塞进程)。
如果此时client和server完成了三次握手,则会调用socket的fd等待队列上的task,即会调用ep_poll_callback,将发生事件的fd添加到就绪链表rdlist中,如果ep的等待队列中不为空(此例不为空,有10个节点),则会唤醒第一个进程(因为添加了exclusive标志,所以只会唤醒第一个进程)。
- ep_poll_callback
- if (waitqueue_active(&ep->wq))
- wake_up_locked(&ep->wq);
第一个进程被唤醒后,首先将自己从ep的等待队列中删除,然后调用rdlist上发生事件的fd的poll函数获取发生的事件,将其传递到用户程序,如果是用户程序感兴趣的事件,用户程序再调用accept接收新连接。
- ep_send_events_proc
- revents = ep_item_poll(epi, &pt);
- if (revents) {
- __put_user(revents, &uevent->events)
如果是LT模式,则将目标文件再次添加到就绪链表rdlist,
如果是ET模式,就不会将目标文件再次添加到就绪链表rdlist。
- ep_send_events_proc
- revents = ep_item_poll(epi, &pt);
- if (revents) {
- if (!(epi->event.events & EPOLLET))
- list_add_tail(&epi->rdllink, &ep->rdllist);
后面流程还会做如下判断
- ep_scan_ready_list
- if (!list_empty(&ep->rdllist)) {
- if (waitqueue_active(&ep->wq))
- wake_up_locked(&ep->wq);
在LT模式下,rdlist不为空,并且ep->wq中还有9个进程在堵塞等待,则又会唤醒第二个进程。
第二个进程被唤醒后,后面处理方式和第一个进程相同。
如果第一个进程调用accept把事件取走了,则第二个进程调用目标文件poll函数时就得不到事件,ep->wq中的其他进程就不会被唤醒了。如果每个被唤醒进程调用accept的时间更长,则会唤醒更多的进程。
在ET模式下,rdlist为空,就不会唤醒等待队列上的其他进程了。
epoll下涉及两个等待队列:
a. 调用epoll_ctl时,将进程添加到目标文件的等待队列中,目标文件发生事件时调用ep_poll_callback,判断如果ep等待队列不为空,则唤醒ep等待队列上的进程。
b. 调用epoll_wait时,将进程添加到ep的等待队列中。
https://www.cnblogs.com/Anker/p/7071849.html
https://blog.csdn.net/lyztyycode/article/details/78648798