• 网络编程“惊群“问题


    1、惊群效应(thundering herd)

    什么是惊群效应
    惊群现象就是多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),如果等待的这个事件发生,那么他就会唤醒等待的所有进程(或者线程),但是最终却只可能有一个进程(线程)获得这个时间的“控制权”,对该事件进行处理,而其他进程(线程)获取“控制权”失败,只能重新进入休眠状态,这种现象和性能浪费就叫做惊群。
    为了更好的理解何为惊群,举一个很简单的例子,当你往一群鸽子中间扔一粒谷子,所有的各自都被惊动前来抢夺这粒食物,但是最终注定只可能有一个鸽子满意的抢到食物,没有抢到的鸽子只好回去继续睡觉,等待下一粒谷子的到来。这里鸽子表示进程(线程),那粒谷子就是等待处理的事件。

    惊群效应到底消耗了什么
    我想你应该也会有跟我一样的问题,那就是惊群效应到底消耗了什么?
    (1)系统对用户进程/线程频繁地做无效的调度,上下文切换系统性能大打折扣。
    (2)为了确保只有一个线程得到资源,用户必须对资源操作进行加锁保护,进一步加大了系统开销。
    是不是还是觉得不够深入,概念化?看下面:
    (1)上下文切换(context switch)过高会导致cpu像个搬运工,频繁地在寄存器和运行队列之间奔波,更多的时间花在了进程(线程)切换,而不是在真正工作的进程(线程)上面。直接的消耗包括cpu寄存器要保存和加载(例如程序计数器)、系统调度器的代码需要执行。间接的消耗在于多核cache之间的共享数据。
    (2)通过锁机制解决惊群效应是一种方法,在任意时刻只让一个进程(线程)处理等待的事件。但是锁机制也会造成cpu等资源的消耗和性能损耗。目前一些常见的服务器软件有的是通过锁机制解决的,比如nginx(它的锁机制是默认开启的,可以关闭);还有些认为惊群对系统性能影响不大,没有去处理,比如lighttpd。

    2. accept的惊群问题

    Google了一下:其实在linux2.6版本以后,linux内核已经解决了accept函数的“惊群”现象,大概的处理方式就是,当内核接收到一个客户连接后,只会唤醒等待队列上的第一个进程(线程),所以如果服务器采用accept阻塞调用方式,在最新的linux系统中已经没有“惊群效应”了。
    看了下kernel代码,发现2.6确实已经解决了,那2.6之前的内核都有惊群问题吗?其实不是的,下载了几个内核版本,通过代码得出结论(没实际验证),情况如下:
    linux-2.2.2 会唤醒等待队列所有进程
    linux-2.3.13 只会唤醒第一个进程
    linux-2.4.34只会唤醒第一个进程
    linux-2.5.27只会唤醒第一个进程
    所以应该是从linux-2.2.2后的某个内核版本修复的,但具体版本号还不太清楚。
    accept惊群问题未解决时的代码流程
    接下来看看linux-2.2.2版本中堵塞唤醒代码流程,了解为什么会唤醒所有进程?为什么会出现惊群现象?
    用户程序调用accept时,会调到内核态中的tcp_accept,如果没有设置非堵塞,会调用wait_for_connect(sk, &prev)堵塞进程,等待事件到来。

    1. static struct open_request * wait_for_connect(struct sock * sk,
    2. struct open_request **pprev)
    3. {
    4. struct wait_queue wait = { current, NULL };
    5. struct open_request *req;
    6. //将wait添加到sk的等待队列头sleep中
    7. //假如fork了10个进程,每个进程都调用accept,则会将10个进程的wait添加到sk的等待队列头 sleep上。
    8. add_wait_queue(sk->sleep, &wait);
    9. for (;;) {
    10. current->state = TASK_INTERRUPTIBLE;
    11. //释放锁。
    12. release_sock(sk);
    13. //进程调度,执行其他进程,直到被唤醒
    14. schedule();
    15. //如果事件发生后,这个10个进程都会被被唤醒,从此处开始执行,
    16. //首先获取锁。
    17. //10个进程只有一个进程才能获取锁成功,继而获取新连接成功
    18. lock_sock(sk);
    19. //第一个获取到锁的进程也会获取到新连接,跳出循环,继续执行。
    20. //其他进程没获取锁的继续执行调度,等待新连接的到来
    21. //所以这10个进程可以循环接收新连接
    22. req = tcp_find_established(&(sk->tp_pinfo.af_tcp), pprev);
    23. if (req)
    24. break;
    25. if (signal_pending(current))
    26. break;
    27. }
    28. //进程被唤醒并且获取连接后,将本进程从等待队列中删除。
    29. current->state = TASK_RUNNING;
    30. remove_wait_queue(sk->sleep, &wait);
    31. return req;
    32. }
    33. extern inline void add_wait_queue(struct wait_queue ** p, struct wait_queue * wait)
    34. {
    35. unsigned long flags;
    36. //添加到队列头部,并且没有exclusive标志
    37. write_lock_irqsave(&waitqueue_lock, flags);
    38. __add_wait_queue(p, wait);
    39. write_unlock_irqrestore(&waitqueue_lock, flags);
    40. }

    tcp三次握手完成后,需要唤醒accept进程,流程如下:

    1. tcp_v4_do_rcv->tcp_v4_hnd_req->tcp_check_req->syn_recv_sock(tcp_v4_syn_recv_sock)->sk->data_ready(sk, 0)->sock_def_readable->wake_up_interruptible(sk->sleep);
    2. #define wake_up_interruptible(x) __wake_up((x),TASK_INTERRUPTIBLE)
    3. void __wake_up(struct wait_queue **q, unsigned int mode)
    4. {
    5. struct wait_queue *next;
    6. read_lock(&waitqueue_lock);
    7. if (q && (next = *q)) {
    8. struct wait_queue *head;
    9. //遍历sk等待队列中的所有进程并唤醒,此例中会唤醒10个进程。
    10. head = WAIT_QUEUE_HEAD(q);
    11. while (next != head) {
    12. struct task_struct *p = next->task;
    13. next = next->next;
    14. if (p->state & mode)
    15. wake_up_process(p);
    16. }
    17. }
    18. read_unlock(&waitqueue_lock);
    19. }

    疑问:在惊群问题未解决时,新连接到来后,只有一个后返回accept成功,其余的九个会有返回值吗?还是在内核态继续执行调度等待其他新连接的到来?
    应该是其余九个进程会被唤醒,但是不会有返回值,由于第一个被唤醒进程已经取走了新连接,这九个进程即使被唤醒,发现全连接队列为空,就会继续调度睡眠,等待新连接到来。

    内核解决accept惊群后的代码流程
    以内核版本3.17.89为例
    用户程序调用accept时,会调到内核态中的inet_csk_accept,如果全连接队列为空,并且没有设置非堵塞,则会调用inet_csk_wait_for_connect(sk, timeo)堵塞进程,等待事件到来。

    1. static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
    2. {
    3. struct inet_connection_sock *icsk = inet_csk(sk);
    4. //初始化等待队列节点wait
    5. DEFINE_WAIT(wait);
    6. int err;
    7. /*
    8. * True wake-one mechanism for incoming connections: only
    9. * one process gets woken up, not the 'whole herd'.
    10. * Since we do not 'race & poll' for established sockets
    11. * anymore, the common case will execute the loop only once.
    12. *
    13. * Subtle issue: "add_wait_queue_exclusive()" will be added
    14. * after any current non-exclusive waiters, and we know that
    15. * it will always _stay_ after any new non-exclusive waiters
    16. * because all non-exclusive waiters are added at the
    17. * beginning of the wait-queue. As such, it's ok to "drop"
    18. * our exclusiveness temporarily when we get woken up without
    19. * having to remove and re-insert us on the wait queue.
    20. */
    21. for (;;) {
    22. //以上英文注释已经说清楚了,只有一个进程会被唤醒。
    23. //非exclusive的节点会被加在等待队列前面,excusive节点会被加在
    24. //所有非exclusive节点的后面。
    25. //此例中,等待队列前面有n个非exclusive节点,后面有10个exclusive节点
    26. prepare_to_wait_exclusive(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
    27. release_sock(sk);
    28. if (reqsk_queue_empty(&icsk->icsk_accept_queue))
    29. timeo = schedule_timeout(timeo);
    30. lock_sock(sk);
    31. err = 0;
    32. if (!reqsk_queue_empty(&icsk->icsk_accept_queue))
    33. break;
    34. err = -EINVAL;
    35. if (sk->sk_state != TCP_LISTEN)
    36. break;
    37. err = sock_intr_errno(timeo);
    38. if (signal_pending(current))
    39. break;
    40. err = -EAGAIN;
    41. if (!timeo)
    42. break;
    43. }
    44. //将等待节点从等待队列中删除
    45. finish_wait(sk_sleep(sk), &wait);
    46. return err;
    47. }
    48. #define DEFINE_WAIT_FUNC(name, function) \
    49. wait_queue_t name = { \
    50. .private = current, \
    51. .func = function, \
    52. .task_list = LIST_HEAD_INIT((name).task_list), \
    53. }
    54. //有新连接时,会调用autoremove_wake_function唤醒调用accept的进程,
    55. //并且将等待节点从等待队列中删除
    56. #define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
    57. int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
    58. {
    59. //唤醒堵塞的accept进程
    60. int ret = default_wake_function(wait, mode, sync, key);
    61. if (ret)
    62. //如果唤醒成功了,则将等待节点从等待队列中删除,这样下次有新连接,
    63. //就会唤醒另一个进程来处理。
    64. list_del_init(&wait->task_list);
    65. return ret;
    66. }
    67. void prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state)
    68. {
    69. unsigned long flags;
    70. //设置exclusive标志,表示一次只会有一个进程被唤醒。
    71. wait->flags |= WQ_FLAG_EXCLUSIVE;
    72. spin_lock_irqsave(&q->lock, flags);
    73. //把wait添加到等待队列的尾部。
    74. if (list_empty(&wait->task_list))
    75. __add_wait_queue_tail(q, wait);
    76. set_current_state(state);
    77. spin_unlock_irqrestore(&q->lock, flags);
    78. }

    以上是accept的实现,继续看唤醒部分代码。
    当有tcp连接完成,就会从半连接队列拷贝sock到全连接队列,这个时候就可以唤醒阻塞的accept了。代码如下:

    1. tcp_v4_do_rcv->tcp_child_process
    2. int tcp_child_process(struct sock *parent, struct sock *child,
    3. struct sk_buff *skb)
    4. {
    5. int ret = 0;
    6. int state = child->sk_state;
    7. if (!sock_owned_by_user(child)) {
    8. ret = tcp_rcv_state_process(child, skb, tcp_hdr(skb),
    9. skb->len);
    10. /* Wakeup parent, send SIGIO */
    11. if (state == TCP_SYN_RECV && child->sk_state != state)
    12. parent->sk_data_ready(parent);
    13. } else {
    14. /* Alas, it is possible again, because we do lookup
    15. * in main socket hash table and lock on listening
    16. * socket does not protect us more.
    17. */
    18. __sk_add_backlog(child, skb);
    19. }
    20. bh_unlock_sock(child);
    21. sock_put(child);
    22. return ret;
    23. }
    24. 调用sk_data_ready通知父socket,即监听socket,此函数对应sock_def_readable
    25. static void sock_def_readable(struct sock *sk)
    26. {
    27. struct socket_wq *wq;
    28. rcu_read_lock();
    29. //判断等待队列是否为空,不为空说明有进程在堵塞等待
    30. wq = rcu_dereference(sk->sk_wq);
    31. if (wq_has_sleeper(wq))
    32. wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
    33. POLLRDNORM | POLLRDBAND);
    34. sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
    35. rcu_read_unlock();
    36. }
    37. #define wake_up_interruptible_sync_poll(x, m) \
    38. __wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
    39. void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
    40. int nr_exclusive, void *key)
    41. {
    42. unsigned long flags;
    43. int wake_flags = 1; /* XXX WF_SYNC */
    44. if (unlikely(!q))
    45. return;
    46. if (unlikely(nr_exclusive != 1))
    47. wake_flags = 0;
    48. spin_lock_irqsave(&q->lock, flags);
    49. __wake_up_common(q, mode, nr_exclusive, wake_flags, key);
    50. spin_unlock_irqrestore(&q->lock, flags);
    51. }
    52. static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
    53. int nr_exclusive, int wake_flags, void *key)
    54. {
    55. wait_queue_t *curr, *next;
    56. //从头开始遍历等待队列。
    57. //等待队列头前面可能有N个非exclusive节点,先执行curr->func,但是flags中没有WQ_FLAG_EXCLUSIVE标志,则if条件不满足,不会执行--nr_exclusive,其值仍然为1
    58. //执行到第一个exclusive节点时,也是先执行curr->func,并且flags中有WQ_FLAG_EXCLUSIVE标志,继续执行--nr_exclusive,nr_exclusive变为0,取非后满足,if的三个条件都满足,则执行break,达到只唤醒一个进程的目的。
    59. list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
    60. unsigned flags = curr->flags;
    61. //nr_exclusive为1,执行一次就会为0,跳出循环,达到只唤醒一个进程的目的
    62. //被唤醒后,会在inet_csk_wait_for_connect中将wait从等待队列删除。
    63. //func为autoremove_wake_function,此函数不只唤醒进程还会将wait节点从队列删除
    64. if (curr->func(curr, mode, wake_flags, key) &&
    65. (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
    66. break;
    67. }
    68. }

    解决惊群问题后的代码测试
    client端代码

    1. root@master:~# cat client.c
    2. #include <sys/socket.h>
    3. #include <stdio.h>
    4. #include <errno.h>
    5. #include <unistd.h>
    6. #include <netinet/in.h>
    7. #include <arpa/inet.h>
    8. void main(void)
    9. {
    10. int fd, ret;
    11. struct sockaddr_in addr;
    12. fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    13. if(fd < 0)
    14. {
    15. perror("socket create failed");
    16. return ;
    17. }
    18. addr.sin_family = AF_INET;
    19. addr.sin_port = htons(2222);
    20. addr.sin_addr.s_addr = inet_addr("192.168.122.20");
    21. ret = connect(fd, (const struct sockaddr *)&addr, sizeof(addr));
    22. if( ret != 0)
    23. {
    24. perror("socket connect1 failed");
    25. return ;
    26. }
    27. perror("socket connect succefsully\n");
    28. sleep(1000);
    29. }

    server端代码

    1. root@master:~# cat server.c
    2. #include <stdio.h>
    3. #include <stdlib.h>
    4. #include <sys/types.h>
    5. #include <sys/socket.h>
    6. #include <sys/wait.h>
    7. #include <string.h>
    8. #include <netinet/in.h>
    9. #include <unistd.h>
    10. #include <errno.h>
    11. #define PROCESS_NUM 5
    12. int main()
    13. {
    14. int fd = socket(PF_INET, SOCK_STREAM, 0);
    15. int connfd;
    16. int pid;
    17. struct sockaddr_in serveraddr;
    18. serveraddr.sin_family = AF_INET;
    19. serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    20. serveraddr.sin_port = htons(2222);
    21. bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
    22. listen(fd, 1024);
    23. int i;
    24. for(i = 0; i < PROCESS_NUM; ++i){
    25. pid = fork();
    26. if(pid == 0){
    27. printf("I'm pid: %d\n", getpid());
    28. while(1){
    29. connfd = accept(fd, (struct sockaddr *)NULL, NULL);
    30. if(connfd != -1)
    31. {
    32. printf("process %d accept success\n", getpid());
    33. close(connfd);
    34. }else{
    35. printf("process %d accept a connection failed: %s\n", getpid(), strerror(errno));
    36. }
    37. }
    38. }
    39. }
    40. wait(0);
    41. return 0;
    42. }

    server端创建block阻塞型的socket后,fork出10个进程,accept同一个fd。可以使用strace跟踪server端子进程执行结果,可看到每次client连接server只有一个进程被唤醒

    1. root@master:~# ./server
    2. I'm pid: 10794
    3. I'm pid: 10796
    4. I'm pid: 10795
    5. I'm pid: 10797
    6. I'm pid: 10798
    7. process 10794 accept success --->10794先调用accept,则会被先唤醒
    8. process 10796 accept success
    9. process 10795 accept success
    10. process 10797 accept success
    11. process 10798 accept success

    client连接server五次,每次都有不同的进程来处理,处理事件的进程顺序由调用顺序决定。

    1. root@master:~# ./client
    2. socket connect succefsully
    3. : Success
    4. socket connect succefsully
    5. : Success
    6. socket connect succefsully
    7. : Success
    8. socket connect succefsully
    9. : Success

    3. poll/select惊群问题

    先说结论,poll/select是存在惊群问题的。下面用代码验证,再看看代码流程。
    client代码仍然使用accept时的代码。下面是poll模式下server端代码

    1. root@master:~# cat poll_thunder.c
    2. #include<stdio.h>
    3. #include<stdlib.h>
    4. #include<sys/types.h>
    5. #include<sys/socket.h>
    6. #include<sys/wait.h>
    7. #include<string.h>
    8. #include<netinet/in.h>
    9. #include<unistd.h>
    10. #include <errno.h>
    11. #include <poll.h>
    12. #define PROCESS_NUM 10
    13. int main()
    14. {
    15. int fd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    16. int connfd;
    17. int pid;
    18. int status = 1;
    19. char sendbuff[1024];
    20. struct sockaddr_in serveraddr;
    21. serveraddr.sin_family = AF_INET;
    22. serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    23. serveraddr.sin_port = htons(2222);
    24. bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
    25. listen(fd, 1024);
    26. int i, new_fd, ret=0;
    27. struct pollfd clientfds[1];
    28. for(i = 0; i < PROCESS_NUM; ++i){
    29. pid = fork();
    30. if(pid == 0){
    31. //printf("I'm pid: %d, poll on : %d\n", getpid(), fd);
    32. while (1) {
    33. printf("I'm pid: %d, poll on : %d\n", getpid(), fd);
    34. clientfds[0].fd = fd;
    35. clientfds[0].events=POLLIN;
    36. ret = poll(clientfds, 2, -1);
    37. if (ret < 0)
    38. {
    39. perror("poll error");
    40. }
    41. else if(ret == 0)
    42. {
    43. printf("poll timeout\n");
    44. continue;
    45. }
    46. if (clientfds[0].revents&POLLIN) {
    47. new_fd = accept(fd, (struct sockaddr *)NULL, NULL);;
    48. if(new_fd < 0)
    49. {
    50. printf("accept failed: %d on pid: %d\n", errno, getpid());
    51. printf("\n");
    52. continue;
    53. }
    54. printf("new read event: accept new_fd: %d on pid: %d\n", new_fd, getpid());
    55. close(new_fd);
    56. }
    57. printf("\n");
    58. if (clientfds[0].revents&POLLOUT) {
    59. printf("new write event in pid: %d\n", getpid());
    60. }
    61. if (clientfds[0].revents&POLLERR) {
    62. printf("new error event in pid: %d\n", getpid());
    63. }
    64. }
    65. }
    66. }
    67. //int status;
    68. wait(0);
    69. return 0;
    70. }

    client连接server一次,可看到有四个进程被唤醒,只有一个进程接收了新连接。

    1. root@master:~# ./poll
    2. I'm pid: 18907, poll on : 3
    3. I'm pid: 18908, poll on : 3
    4. I'm pid: 18909, poll on : 3
    5. I'm pid: 18910, poll on : 3
    6. I'm pid: 18911, poll on : 3
    7. I'm pid: 18912, poll on : 3
    8. I'm pid: 18913, poll on : 3
    9. I'm pid: 18914, poll on : 3
    10. I'm pid: 18915, poll on : 3
    11. I'm pid: 18916, poll on : 3
    12. new read event: accept new_fd: 4 on pid: 18915 -->只有此进程接收新连接成功
    13. accept failed: 11 on pid: 18916
    14. I'm pid: 18915, poll on : 3
    15. I'm pid: 18916, poll on : 3
    16. accept failed: 11 on pid: 18907
    17. I'm pid: 18907, poll on : 3
    18. accept failed: 11 on pid: 18908
    19. I'm pid: 18908, poll on : 3

    这里有一个问题,按说应该10个进程都被唤醒了,为什么只有四个进程执行到accept处?
    因为进程被唤醒后,会调用目标文件的poll函数获取发生的事件通知用户程序,用户程序调用accept后,会将发生的事件清空。如果清空事件前,被唤醒的进程调用poll还会获取到发生的事件,用户程序再调用accept会返回失败。但是清空事件后,被唤醒的进程调用poll获取不到事件,也就不会通知用户程序,而是继续睡眠,这个情况通过log是看不到的,可以通过strace观察server。
    或者在进程被唤醒后,在调用accept前sleep一段时间,让所有进程都有时间调用poll获取事件就会看到如下log,10个进程都被唤醒并且都调用accept,但是仍然只有一个进程能成功。

    1. root@master:~# ./poll
    2. I'm pid: 20130, poll on : 3
    3. I'm pid: 20131, poll on : 3
    4. I'm pid: 20132, poll on : 3
    5. I'm pid: 20133, poll on : 3
    6. I'm pid: 20135, poll on : 3
    7. I'm pid: 20136, poll on : 3
    8. I'm pid: 20137, poll on : 3
    9. I'm pid: 20138, poll on : 3
    10. I'm pid: 20134, poll on : 3
    11. I'm pid: 20139, poll on : 3
    12. //10个进程都被唤醒执行accept了,但是只有一个能成功接收新连接
    13. accept failed: 11 on pid: 20136
    14. new read event: accept new_fd: 4 on pid: 20133
    15. accept failed: 11 on pid: 20131
    16. I'm pid: 20136, poll on : 3
    17. I'm pid: 20131, poll on : 3
    18. accept failed: 11 on pid: 20139
    19. I'm pid: 20139, poll on : 3
    20. accept failed: 11 on pid: 20132
    21. I'm pid: 20133, poll on : 3
    22. accept failed: 11 on pid: 20135
    23. I'm pid: 20132, poll on : 3
    24. I'm pid: 20135, poll on : 3
    25. accept failed: 11 on pid: 20137
    26. accept failed: 11 on pid: 20130
    27. I'm pid: 20137, poll on : 3
    28. I'm pid: 20130, poll on : 3
    29. accept failed: 11 on pid: 20134
    30. I'm pid: 20134, poll on : 3
    31. accept failed: 11 on pid: 20138
    32. I'm pid: 20138, poll on : 3

    用户调用poll堵塞进程流程
    用户调用poll后,在内核态流程调用:

    1. do_sys_poll->do_poll->do_pollfd-> f.file->f_op->poll, 对于socket来说,poll函数为调用sock_poll
    2. static unsigned int sock_poll(struct file *file, poll_table *wait)
    3. return busy_flag | sock->ops->poll(file, sock, wait); //tcp_poll
    4. unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
    5. {
    6. unsigned int mask;
    7. struct sock *sk = sock->sk;
    8. const struct tcp_sock *tp = tcp_sk(sk);
    9. sock_rps_record_flow(sk);
    10. sock_poll_wait(file, sk_sleep(sk), wait);
    11. if (sk->sk_state == TCP_LISTEN)
    12. return inet_csk_listen_poll(sk);
    13. }
    14. //如果全连接队列不为空,则直接返回POLLIN事件
    15. static inline unsigned int inet_csk_listen_poll(const struct sock *sk)
    16. {
    17. return !reqsk_queue_empty(&inet_csk(sk)->icsk_accept_queue) ?
    18. (POLLIN | POLLRDNORM) : 0;
    19. }
    20. static inline void sock_poll_wait(struct file *filp,
    21. wait_queue_head_t *wait_address, poll_table *p)
    22. {
    23. if (!poll_does_not_wait(p) && wait_address) {
    24. poll_wait(filp, wait_address, p);
    25. /* We need to be sure we are in sync with the
    26. * socket flags modification.
    27. *
    28. * This memory barrier is paired in the wq_has_sleeper.
    29. */
    30. smp_mb();
    31. }
    32. }
    33. static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
    34. {
    35. //p->_qproc为__pollwait
    36. if (p && p->_qproc && wait_address)
    37. p->_qproc(filp, wait_address, p);
    38. }
    39. /* Add a new entry */
    40. static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
    41. {
    42. struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
    43. struct poll_table_entry *entry = poll_get_entry(pwq);
    44. if (!entry)
    45. return;
    46. entry->filp = get_file(filp);
    47. entry->wait_address = wait_address;
    48. entry->key = p->_key;
    49. init_waitqueue_func_entry(&entry->wait, pollwake);
    50. entry->wait.private = pwq;
    51. add_wait_queue(wait_address, &entry->wait);
    52. }
    53. void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
    54. {
    55. unsigned long flags;
    56. //将exclusive标志清除掉,这样有10个进程调用select,就会将10个进程添加到等待队列中。
    57. //因为没有exclusive,所以这10个进程都会被唤醒。
    58. wait->flags &= ~WQ_FLAG_EXCLUSIVE;
    59. spin_lock_irqsave(&q->lock, flags);
    60. __add_wait_queue(q, wait);
    61. spin_unlock_irqrestore(&q->lock, flags);
    62. }

    唤醒进程流程
    唤醒流程和accept中唤醒流程相同,只不过最后调用__wake_up_common时,因为poll添加到等待队列时,没有设置exclusive,所以所有进程都会被唤醒。被唤醒的进程再调用accept接收新连接,但是只有一个进程会成功,其余9个都返回错误EAGAIN。

    4. epoll惊群问题

    还是先说结论:ET模式下不存在惊群问题,LT模式下存在。
    下面通过代码验证,再分析实现流程。

    1. root@master:~# cat epoll_thunder.c
    2. #include<stdio.h>
    3. #include<stdlib.h>
    4. #include<sys/types.h>
    5. #include<sys/socket.h>
    6. #include<sys/wait.h>
    7. #include<string.h>
    8. #include<netinet/in.h>
    9. #include<unistd.h>
    10. #include <errno.h>
    11. #include<sys/epoll.h>
    12. #define MAXEVENTS 64
    13. #define PROCESS_NUM 10
    14. int main()
    15. {
    16. int fd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    17. int connfd;
    18. int pid;
    19. int i, epoll_fd, new_fd, ret=0, num;
    20. struct epoll_event event;
    21. struct epoll_event *events;
    22. struct sockaddr_in serveraddr;
    23. serveraddr.sin_family = AF_INET;
    24. serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    25. serveraddr.sin_port = htons(2222);
    26. bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
    27. listen(fd, 1024);
    28. if ((epoll_fd = epoll_create(MAXEVENTS))< 0) {
    29. perror("epoll_create");
    30. exit(1);
    31. }
    32. event.data.fd = fd;
    33. event.events = EPOLLIN | EPOLLET;
    34. //event.events = EPOLLIN;
    35. if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) < 0){
    36. perror("epoll_ctl");
    37. exit(1);
    38. }
    39. events = calloc(MAXEVENTS, sizeof(event));
    40. for(i = 0; i < PROCESS_NUM; ++i) {
    41. pid = fork();
    42. if(pid == 0) {
    43. while (1) {
    44. printf("I'm pid: %d, epoll on : %d\n", getpid(), fd);
    45. num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
    46. if (num < 0) {
    47. printf("epoll_wait failed %d, on pid %d\n", errno, getpid());
    48. continue;
    49. }
    50. for(i = 0; i < num; ++i){
    51. if((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))){
    52. fprintf(stderr, "epoll error\n");
    53. close(events[i].data.fd);
    54. continue;
    55. }else if(fd == events[i].data.fd){
    56. new_fd = accept(fd, (struct sockaddr *)NULL, NULL);;
    57. if(new_fd < 0)
    58. {
    59. printf("accept failed: %d on pid: %d\n", errno, getpid());
    60. printf("\n");
    61. continue;
    62. }
    63. printf("new read event: accept new_fd: %d on pid: %d\n", new_fd, getpid());
    64. close(new_fd);
    65. }
    66. }
    67. }
    68. }
    69. }
    70. wait(0);
    71. return 0;
    72. }

    在epoll_thunder.c中,如果events加上标志EPOLLET就是ET模式,不加的话默认是LT模式

    1. event.data.fd = fd;
    2. event.events = EPOLLIN | EPOLLET;
    3. epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event)

    ET模式测试结果如下,始终只有一个进程接收新连接,并且是同一个进程。这是因为调用epoll_wait堵塞时,添加wait节点加到了ep等待队列头部,并且是exclusive的,而唤醒进程时总是从队列头部开始,由于设置了exclusive,所以只好唤醒一个进程。
    为什么总是同一个进程呢?是因为第一个被唤醒的进程马上又调用epoll_wait将其再次加入等待队列头部,所以下次事件到来时仍然唤醒同一个进程。

    1. root@ubuntu:/home/jk/socket# ./epoll
    2. I'm pid: 119280, epoll on : 3
    3. I'm pid: 119281, epoll on : 3
    4. I'm pid: 119282, epoll on : 3
    5. I'm pid: 119285, epoll on : 3
    6. I'm pid: 119283, epoll on : 3
    7. I'm pid: 119287, epoll on : 3
    8. I'm pid: 119284, epoll on : 3
    9. I'm pid: 119286, epoll on : 3
    10. I'm pid: 119288, epoll on : 3
    11. I'm pid: 119289, epoll on : 3
    12. new read event: accept new_fd: 5 on pid: 119289
    13. I'm pid: 119289, epoll on : 3
    14. new read event: accept new_fd: 5 on pid: 119289
    15. I'm pid: 119289, epoll on : 3
    16. new read event: accept new_fd: 5 on pid: 119289
    17. I'm pid: 119289, epoll on : 3

    下面将进程被唤醒后sleep一段时间20s,然后用client连接server,会发现新连接总是被等待队列头部的进程处理。

    1. root@master:~# ./epoll
    2. I'm pid: 27656, epoll on : 3
    3. I'm pid: 27657, epoll on : 3
    4. I'm pid: 27658, epoll on : 3
    5. I'm pid: 27659, epoll on : 3
    6. I'm pid: 27660, epoll on : 3
    7. I'm pid: 27661, epoll on : 3
    8. I'm pid: 27662, epoll on : 3
    9. I'm pid: 27663, epoll on : 3
    10. I'm pid: 27664, epoll on : 3
    11. I'm pid: 27665, epoll on : 3
    12. new read event: accept new_fd: 5 on pid: 27665 -->第一次连接被27665进程处理,然后sleep 20s
    13. new read event: accept new_fd: 5 on pid: 27664 -->第二次连接被27664进程处理,然后sleep 20s
    14. new read event: accept new_fd: 5 on pid: 27663 -->第三次连接被27663进程处理,然后sleep 20s
    15. I'm pid: 27665, epoll on : 3 -->进程27665 sleep结束,重新开始wait
    16. new read event: accept new_fd: 5 on pid: 27665 -->第四次连接又被27663进程处理

    LT模式下,测试结果如下,从结果看,好像也是只有一个进程被唤醒了,但是实际上唤醒进程不只一个

    1. root@ubuntu:/home/jk/socket# ./epoll
    2. I'm pid: 119318, epoll on : 3
    3. I'm pid: 119319, epoll on : 3
    4. I'm pid: 119320, epoll on : 3
    5. I'm pid: 119324, epoll on : 3
    6. I'm pid: 119321, epoll on : 3
    7. I'm pid: 119326, epoll on : 3
    8. I'm pid: 119322, epoll on : 3
    9. I'm pid: 119323, epoll on : 3
    10. I'm pid: 119325, epoll on : 3
    11. I'm pid: 119327, epoll on : 3
    12. new read event: accept new_fd: 5 on pid: 119327
    13. I'm pid: 119327, epoll on : 3
    14. new read event: accept new_fd: 5 on pid: 119327
    15. I'm pid: 119327, epoll on : 3
    16. new read event: accept new_fd: 5 on pid: 119327
    17. I'm pid: 119327, epoll on : 3

    LT模式下,如果添加sleep后,就会看到所有进程都被唤醒了。唤醒多少个进程和被唤醒进程处理时间长短有关系,如果处理越快,唤醒的进程越少。

    1. num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
    2. if (num < 0) {
    3. printf("epoll_wait failed %d, on pid %d\n", errno, getpid());
    4. continue;
    5. }
    6. sleep(2);

    再次执行,可看到所有进程都被唤醒了

    1. root@ubuntu:/home/jk/socket# ./epoll
    2. I'm pid: 119480, epoll on : 3
    3. I'm pid: 119481, epoll on : 3
    4. I'm pid: 119482, epoll on : 3
    5. I'm pid: 119483, epoll on : 3
    6. I'm pid: 119484, epoll on : 3
    7. I'm pid: 119485, epoll on : 3
    8. I'm pid: 119486, epoll on : 3
    9. I'm pid: 119487, epoll on : 3
    10. I'm pid: 119488, epoll on : 3
    11. I'm pid: 119489, epoll on : 3
    12. new read event: accept new_fd: 5 on pid: 119489
    13. accept failed: 11 on pid: 119488
    14. I'm pid: 119488, epoll on : 3
    15. I'm pid: 119489, epoll on : 3
    16. accept failed: 11 on pid: 119487
    17. I'm pid: 119487, epoll on : 3
    18. accept failed: 11 on pid: 119486
    19. I'm pid: 119486, epoll on : 3
    20. accept failed: 11 on pid: 119485
    21. I'm pid: 119485, epoll on : 3
    22. accept failed: 11 on pid: 119484
    23. I'm pid: 119484, epoll on : 3
    24. accept failed: 11 on pid: 119483
    25. I'm pid: 119483, epoll on : 3
    26. accept failed: 11 on pid: 119482
    27. I'm pid: 119482, epoll on : 3
    28. accept failed: 11 on pid: 119481
    29. I'm pid: 119481, epoll on : 3
    30. accept failed: 11 on pid: 119480
    31. I'm pid: 119480, epoll on : 3

    代码分析
    调用epll_wait时,如果需要堵塞等待,则将调用进程加入到ep等待队列中,设置exclusive,并且添加到队列头部,如果有新连接到来,也只会唤醒一个进程。

    1. init_waitqueue_entry(&wait, current);
    2. __add_wait_queue_exclusive(&ep->wq, &wait);

    这样看貌似epoll_wait已经解决了惊群问题,但在LT和ET模式下处理流程的差别导致了LT模式下惊群问题。

    假如调用epoll_ctl将一个fd加入到epoll进行监听,会调用目标文件fd的poll函数,将wait节点(fd上有新事件发生时,调用ep_poll_callback唤醒监听进程)加入到目标文件fd的等待队列中,再fork十个进程调用epoll_wait等待事件到来,这样就会有10个进程堵塞在ep的等待队列中(如果有事件发生时,则调用default_wake_function唤醒堵塞进程)。

    如果此时client和server完成了三次握手,则会调用socket的fd等待队列上的task,即会调用ep_poll_callback,将发生事件的fd添加到就绪链表rdlist中,如果ep的等待队列中不为空(此例不为空,有10个节点),则会唤醒第一个进程(因为添加了exclusive标志,所以只会唤醒第一个进程)。

    1. ep_poll_callback
    2. if (waitqueue_active(&ep->wq))
    3. wake_up_locked(&ep->wq);

    第一个进程被唤醒后,首先将自己从ep的等待队列中删除,然后调用rdlist上发生事件的fd的poll函数获取发生的事件,将其传递到用户程序,如果是用户程序感兴趣的事件,用户程序再调用accept接收新连接。

    1. ep_send_events_proc
    2. revents = ep_item_poll(epi, &pt);
    3. if (revents) {
    4. __put_user(revents, &uevent->events)

    如果是LT模式,则将目标文件再次添加到就绪链表rdlist,
    如果是ET模式,就不会将目标文件再次添加到就绪链表rdlist。

    1. ep_send_events_proc
    2. revents = ep_item_poll(epi, &pt);
    3. if (revents) {
    4. if (!(epi->event.events & EPOLLET))
    5. list_add_tail(&epi->rdllink, &ep->rdllist);

    后面流程还会做如下判断

    1. ep_scan_ready_list
    2. if (!list_empty(&ep->rdllist)) {
    3. if (waitqueue_active(&ep->wq))
    4. wake_up_locked(&ep->wq);

    在LT模式下,rdlist不为空,并且ep->wq中还有9个进程在堵塞等待,则又会唤醒第二个进程。
    第二个进程被唤醒后,后面处理方式和第一个进程相同。
    如果第一个进程调用accept把事件取走了,则第二个进程调用目标文件poll函数时就得不到事件,ep->wq中的其他进程就不会被唤醒了。如果每个被唤醒进程调用accept的时间更长,则会唤醒更多的进程。

    在ET模式下,rdlist为空,就不会唤醒等待队列上的其他进程了。

    epoll下涉及两个等待队列:
    a. 调用epoll_ctl时,将进程添加到目标文件的等待队列中,目标文件发生事件时调用ep_poll_callback,判断如果ep等待队列不为空,则唤醒ep等待队列上的进程。
    b. 调用epoll_wait时,将进程添加到ep的等待队列中。

    参考

    https://www.cnblogs.com/Anker/p/7071849.html
    https://blog.csdn.net/lyztyycode/article/details/78648798

    也可参考:https://www.jianshu.com/p/122860366fdf 

  • 相关阅读:
    外包公司“混”了2年,我只认真做了5件事,如今顺利拿到阿里 Offer。
    【教3妹学mysql】联合索引问题优化
    【Android开发】Android服务和系统服务
    C语言协程
    .NET开源的简单、快速、强大的前后端分离后台权限管理系统
    centos下Iptables的安装(离线)
    2022-08-18 mysql/stonedb-aggregate场景group by分析
    化学之理财之道
    Redis 网络模型 -- 阻塞非阻塞IO、IO多路复用、epoll详解
    【MySQL】高性能高可用设计实战-索引篇
  • 原文地址:https://blog.csdn.net/fengcai_ke/article/details/126564012