• IO多路复用--select,poll,epoll


    背景

    传统的IO,OS只提供了对一个FD进行操作的功能,也就是BIO

    • 1对1模型
      image.png

    • 线程池模型
      image.png

    传统IO模式基于OS提供的功能,就限制了一个线程同一个时刻就只能处理一个连接。

    IO多路复用

    基于传统IO的模式,OS提供了IO多路复用模型,一个线程可以同一个时刻处理多个连接,可以一次性给多个FD给内核进行操作。

    select

    select的逻辑比较简单,接受从用户态传过来的FD数组,循环遍历调用VFS中file->poll函数判断是否有准备数据可以进行操作,如果有设置标识位。

    image.png

    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        aeApiState *state = eventLoop->apidata;
        int retval, j, numevents = 0;
    
        memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
        memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
    
        retval = select(eventLoop->maxfd+1,
                    &state->_rfds,&state->_wfds,NULL,tvp);
        if (retval > 0) {
            for (j = 0; j <= eventLoop->maxfd; j++) {
                int mask = 0;
                aeFileEvent *fe = &eventLoop->events[j];
    
                if (fe->mask == AE_NONE) continue;
                if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                    mask |= AE_READABLE;
                if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                    mask |= AE_WRITABLE;
                eventLoop->fired[numevents].fd = j;
                eventLoop->fired[numevents].mask = mask;
                numevents++;
            }
        }
        return numevents;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    asmlinkage long
    sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp)
    {
    	fd_set_bits fds;
    	char *bits;
    	long timeout;
    	int ret, size, max_fdset;
    
    	timeout = MAX_SCHEDULE_TIMEOUT;
    	if (tvp) {
    		time_t sec, usec;
    
    		if ((ret = verify_area(VERIFY_READ, tvp, sizeof(*tvp)))
    		    || (ret = __get_user(sec, &tvp->tv_sec))
    		    || (ret = __get_user(usec, &tvp->tv_usec)))
    			goto out_nofds;
    
    		ret = -EINVAL;
    		if (sec < 0 || usec < 0)
    			goto out_nofds;
    
    		if ((unsigned long) sec < MAX_SELECT_SECONDS) {
    			timeout = ROUND_UP(usec, 1000000/HZ);
    			timeout += sec * (unsigned long) HZ;
    		}
    	}
    
    	ret = -EINVAL;
    	if (n < 0)
    		goto out_nofds;
    
    	/* max_fdset can increase, so grab it once to avoid race */
    	max_fdset = current->files->max_fdset;
    	if (n > max_fdset)
    		n = max_fdset;
    
    	/*
    	 * We need 6 bitmaps (in/out/ex for both incoming and outgoing),
    	 * since we used fdset we need to allocate memory in units of
    	 * long-words. 
    	 */
    	ret = -ENOMEM;
    	size = FDS_BYTES(n);
    	bits = select_bits_alloc(size);
    	if (!bits)
    		goto out_nofds;
    	fds.in      = (unsigned long *)  bits;
    	fds.out     = (unsigned long *) (bits +   size);
    	fds.ex      = (unsigned long *) (bits + 2*size);
    	fds.res_in  = (unsigned long *) (bits + 3*size);
    	fds.res_out = (unsigned long *) (bits + 4*size);
    	fds.res_ex  = (unsigned long *) (bits + 5*size);
    
    	if ((ret = get_fd_set(n, inp, fds.in)) ||
    	    (ret = get_fd_set(n, outp, fds.out)) ||
    	    (ret = get_fd_set(n, exp, fds.ex)))
    		goto out;
    	zero_fd_set(n, fds.res_in);
    	zero_fd_set(n, fds.res_out);
    	zero_fd_set(n, fds.res_ex);
    
    	ret = do_select(n, &fds, &timeout);
    
    	if (tvp && !(current->personality & STICKY_TIMEOUTS)) {
    		time_t sec = 0, usec = 0;
    		if (timeout) {
    			sec = timeout / HZ;
    			usec = timeout % HZ;
    			usec *= (1000000/HZ);
    		}
    		put_user(sec, &tvp->tv_sec);
    		put_user(usec, &tvp->tv_usec);
    	}
    
    	if (ret < 0)
    		goto out;
    	if (!ret) {
    		ret = -ERESTARTNOHAND;
    		if (signal_pending(current))
    			goto out;
    		ret = 0;
    	}
    
    	set_fd_set(n, inp, fds.res_in);
    	set_fd_set(n, outp, fds.res_out);
    	set_fd_set(n, exp, fds.res_ex);
    
    out:
    	select_bits_free(bits, size);
    out_nofds:
    	return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    int do_select(int n, fd_set_bits *fds, long *timeout)
    {
    	struct poll_wqueues table;
    	poll_table *wait;
    	int retval, i;
    	long __timeout = *timeout;
    
     	spin_lock(&current->files->file_lock);
    	retval = max_select_fd(n, fds);
    	spin_unlock(&current->files->file_lock);
    
    	if (retval < 0)
    		return retval;
    	n = retval;
    
    	poll_initwait(&table);
    	wait = &table.pt;
    	if (!__timeout)
    		wait = NULL;
    	retval = 0;
    	for (;;) {
    		unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
    
    		set_current_state(TASK_INTERRUPTIBLE);
    
    		inp = fds->in; outp = fds->out; exp = fds->ex;
    		rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
    
    		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
    			unsigned long in, out, ex, all_bits, bit = 1, mask, j;
    			unsigned long res_in = 0, res_out = 0, res_ex = 0;
    			struct file_operations *f_op = NULL;
    			struct file *file = NULL;
    
    			in = *inp++; out = *outp++; ex = *exp++;
    			all_bits = in | out | ex;
    			if (all_bits == 0) {
    				i += __NFDBITS;
    				continue;
    			}
    
    			for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
    				if (i >= n)
    					break;
    				if (!(bit & all_bits))
    					continue;
    				file = fget(i);
    				if (file) {
    					f_op = file->f_op;
    					mask = DEFAULT_POLLMASK;
    					if (f_op && f_op->poll)
    						mask = (*f_op->poll)(file, retval ? NULL : wait);
    					fput(file);
    					if ((mask & POLLIN_SET) && (in & bit)) {
    						res_in |= bit;
    						retval++;
    					}
    					if ((mask & POLLOUT_SET) && (out & bit)) {
    						res_out |= bit;
    						retval++;
    					}
    					if ((mask & POLLEX_SET) && (ex & bit)) {
    						res_ex |= bit;
    						retval++;
    					}
    				}
    			}
    			if (res_in)
    				*rinp = res_in;
    			if (res_out)
    				*routp = res_out;
    			if (res_ex)
    				*rexp = res_ex;
    		}
    		wait = NULL;
    		if (retval || !__timeout || signal_pending(current))
    			break;
    		if(table.error) {
    			retval = table.error;
    			break;
    		}
    		__timeout = schedule_timeout(__timeout);
    	}
    	__set_current_state(TASK_RUNNING);
    
    	poll_freewait(&table);
    
    	/*
    	 * Up-to-date the caller timeout.
    	 */
    	*timeout = __timeout;
    	return retval;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    poll
    sys_poll
    asmlinkage long sys_poll(struct pollfd __user * ufds, unsigned int nfds, long timeout)
    {
    	struct poll_wqueues table;
     	int fdcount, err;
     	unsigned int i;
    	struct poll_list *head;
     	struct poll_list *walk;
    
    	/* Do a sanity check on nfds ... */
    	if (nfds > current->files->max_fdset && nfds > OPEN_MAX)
    		return -EINVAL;
    
    	if (timeout) {
    		/* Careful about overflow in the intermediate values */
    		if ((unsigned long) timeout < MAX_SCHEDULE_TIMEOUT / HZ)
    			timeout = (unsigned long)(timeout*HZ+999)/1000+1;
    		else /* Negative or overflow */
    			timeout = MAX_SCHEDULE_TIMEOUT;
    	}
    
    	poll_initwait(&table);
    
    	head = NULL;
    	walk = NULL;
    	i = nfds;
    	err = -ENOMEM;
    	while(i!=0) {
    		struct poll_list *pp;
    		pp = kmalloc(sizeof(struct poll_list)+
    				sizeof(struct pollfd)*
    				(i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i),
    					GFP_KERNEL);
    		if(pp==NULL)
    			goto out_fds;
    		pp->next=NULL;
    		pp->len = (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i);
    		if (head == NULL)
    			head = pp;
    		else
    			walk->next = pp;
    
    		walk = pp;
    		if (copy_from_user(pp->entries, ufds + nfds-i, 
    				sizeof(struct pollfd)*pp->len)) {
    			err = -EFAULT;
    			goto out_fds;
    		}
    		i -= pp->len;
    	}
    	fdcount = do_poll(nfds, head, &table, timeout);
    
    	/* OK, now copy the revents fields back to user space. */
    	walk = head;
    	err = -EFAULT;
    	while(walk != NULL) {
    		struct pollfd *fds = walk->entries;
    		int j;
    
    		for (j=0; j < walk->len; j++, ufds++) {
    			if(__put_user(fds[j].revents, &ufds->revents))
    				goto out_fds;
    		}
    		walk = walk->next;
      	}
    	err = fdcount;
    	if (!fdcount && signal_pending(current))
    		err = -EINTR;
    out_fds:
    	walk = head;
    	while(walk!=NULL) {
    		struct poll_list *pp = walk->next;
    		kfree(walk);
    		walk = pp;
    	}
    	poll_freewait(&table);
    	return err;
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    do_poll
    static int do_poll(unsigned int nfds,  struct poll_list *list,
    			struct poll_wqueues *wait, long timeout)
    {
    	int count = 0;
    	poll_table* pt = &wait->pt;
    
    	if (!timeout)
    		pt = NULL;
     
    	for (;;) {
    		struct poll_list *walk;
    		set_current_state(TASK_INTERRUPTIBLE);
    		walk = list;
    		while(walk != NULL) {
    			do_pollfd( walk->len, walk->entries, &pt, &count);
    			walk = walk->next;
    		}
    		pt = NULL;
    		if (count || !timeout || signal_pending(current))
    			break;
    		count = wait->error;
    		if (count)
    			break;
    		timeout = schedule_timeout(timeout);
    	}
    	__set_current_state(TASK_RUNNING);
    	return count;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    do_pollfd
    static void do_pollfd(unsigned int num, struct pollfd * fdpage,
    	poll_table ** pwait, int *count)
    {
    	int i;
    
    	for (i = 0; i < num; i++) {
    		int fd;
    		unsigned int mask;
    		struct pollfd *fdp;
    
    		mask = 0;
    		fdp = fdpage+i;
    		fd = fdp->fd;
    		if (fd >= 0) {
    			struct file * file = fget(fd);
    			mask = POLLNVAL;
    			if (file != NULL) {
    				mask = DEFAULT_POLLMASK;
    				if (file->f_op && file->f_op->poll)
    					mask = file->f_op->poll(file, *pwait);
    				mask &= fdp->events | POLLERR | POLLHUP;
    				fput(file);
    			}
    			if (mask) {
    				*pwait = NULL;
    				(*count)++;
    			}
    		}
    		fdp->revents = mask;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • select 函数的声明比较复杂,有read,write,excpect 等参数,而poll直接封装到一个参数中了,比较优雅。
    • select 中内核操作使用数组,poll使用链表,因为数组需要连续的空间,而内核中的连续内存比较昂贵。
    • select和poll都需要自己遍历所有的FD集合,就算没有事件过来也需要遍历,造成资源的浪费;
    epoll

    image.png

    关键函数
    • epoll_create:创建epfd,eventpoll,并设置一些VFS相关的操作,例如:file的f_op的函数指针。
    • epoll_clt: fd的CRUD操作,函数比较复杂,涉及比较多结构转化,回调函数的操作等。
    • epoll_wait: 等待eventpoll的rdllist集合中是否有准备好的事件的FD。

    tip epoll_clt函数部分结构图,可以自行进行分析。

    epoll_ctl.png

    总结

    从传统IO到多路复用是操作系统IO模型的变化,像JAVA提供的NIO不过是对操作系统提供多路复用进行封装而已,所以一个很重要的观念是操作系统提供什么,上层就只能使用什么。

    tip: 上面源码基于 redis-2.6,linux-2.6

  • 相关阅读:
    重制版 day 12 函数的进阶
    微信小程序 24 播放音乐页的完善①
    等保2.0测评手册之Mysql
    【Unity Android】Unity链接安卓手机调试
    服务器应用程序不可用的原因是什么引起的
    dolphinscheduler负载均衡源码
    神经网络示意图怎么画,ppt画神经网络模型图
    Dubbo 一些你不一定知道但是很好用的功能
    VMwareworkstation安装Centos7教程
    还在用双层for循环吗?太慢了
  • 原文地址:https://blog.csdn.net/xb1964109474/article/details/126755784