• redis源码分析之IO多路复用


    1、简述

    众所周知,redis是一款抗高并发的利器,据官方压测,单机可达10万qps。但背后实际处理命令的线程只有一条,这听上去其实挺匪夷所思的,因为在我们的日常开发中,说到高并发,多线程是一个非常常用的解决方案。那redis凭什么靠一条线程,就能支持高并发呢?最主要的原因,就是标题所说的IO多路复用,IO多路复用是怎么做的呢?这是老八股了,IO多路复用,背后依赖的是多路复用的函数,有select、poll、epoll,linux默认使用的是epoll函数,redis把客户端连接通过epoll函数给到内核,内核监听到连接有可读写的事件,就将该事件返回redis进行处理。那具体的实现细节呢?redis怎么给的内核,内核又怎么返回的呢?今天,聊赖这里面的细节

    2、多路复用的三个函数

    epoll函数由3个函数组合来完成多路复用这件事。分别是:
    epoll_create、epoll_ctl、epoll_wait
    1)、epoll_create:创建epoll实例
    2)、epoll_ctl:将连接对应的socket描述符注册到epoll实例中
    3)、epoll_wait:获取epoll实例中可读写的描述符
    画一个简单的流程图串一下这三个函数的作用
    请添加图片描述
    从图中可以看出,redis在启动的时候,先是通过epoll_create函数创建epoll实例,然后绑定端口、监听端口,然后通过epoll_ctl函数注册连接事件,最后会搞一个死循环,通过epoll_wait函数获取可读写的事件(每一个事件对应的都是一个可读写的客户端连接)
    铺垫完上面的流程,我们看一下源码。redis的启动源码在server.c文件的main方法中,main方法是redis启动的入口,其中有很多流程,但是我们不要全部都看,就看图中流程涉及到的逻辑

    3、创建epoll实例

    首先是通过内核提供的epoll_create函数创建epoll实例,这个流程入口在initServer方法中.

    void initServer(void) {
        ......
    
        //创建epoll实例
        server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
        if (server.el == NULL) {
            serverLog(LL_WARNING,
                "Failed creating the event loop. Error message: '%s'",
                strerror(errno));
            exit(1);
        }
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    aeCreateEventLoop是创建epoll实例的入口,我们进入这个方法。

    aeEventLoop *aeCreateEventLoop(int setsize) {
        ......
        
        if (aeApiCreate(eventLoop) == -1) goto err;
        
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    其中又调用了一个aeApiCreate方法,这个方法是对epoll_create函数做了一层封装,我们继续进入aeApiCreate方法。

    static int aeApiCreate(aeEventLoop *eventLoop) {
        ......
        
        //创建epoll实例
        //这里的1024并不是说epoll函数只能监听1024个描述符.因为在2.6.8内核之后,内核维护的是一个动态的队列,理论上我们可以一直添加描述符
        
        state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
        
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这里就看到了我们想找的epoll_create函数。(这里顺便说一下,这种只看主流程的源码阅读方法,很容易能得到一些结论,也很容易坚持下去。)

    4、绑定端口、监听端口

    创建完epoll实例后,接下来就是绑定端口、监听端口。
    这部分的代码也是在initServer方法中,就在创建epoll实例的下方

    void initServer(void) {
        
        ......
        //创建epoll实例
        server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
        if (server.el == NULL) {
            serverLog(LL_WARNING,
                "Failed creating the event loop. Error message: '%s'",
                strerror(errno));
            exit(1);
        }
        server.db = zmalloc(sizeof(redisDb)*server.dbnum);
    
        ......
        
        //绑定、监听端口
        if (server.port != 0 &&
            listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
            exit(1);
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    绑定、监听端口的逻辑在listenToPort方法中,该方法的入参有3个值。
    第一个值:要绑定、监听的端口,默认是6379。
    第二个值:描述符。
    第三个值:描述符的数量。
    后面这两个参数还没值,需要到listenToPort方法中赋值。

    int listenToPort(int port, int *fds, int *count) {
        ......
        //绑定IPV6
        fds[*count] = anetTcp6Server(server.neterr,port,NULL,server.tcp_backlog);
    
        ......
        //绑定IPV4
        fds[*count] = anetTcpServer(server.neterr,port,NULL,server.tcp_backlog);
    
        ......
        (*count)++;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    用gdb debug一下,可以看到,最终fds数组一共赋2个值,count赋值2

    5、向epoll实例注册连接事件

    这个逻辑还是在initServer方法中。server.ipfd_count的值就是上面的那个count值,是2。所以这个循环会执行2次,注册2个连接事件,一个IPV4、一个IPV6
    aeCreateFileEvent,是一个非常重要的方法,是用来创建事件的。该方法有5个入参。
    第一个:redis对应epoll实例的结构体。
    第二个:需要注册的描述符。
    第三个:需要注册的事件类型。
    第四个:事件触发后的回调函数。
    第五个:客户端数据。我们是注册连接事件,所以不会有客户端数据,因为还没有客户端连接redis

    void initServer(void) {
        ......
        //注册连接事件
        
        for (j = 0; j < server.ipfd_count; j++) {
        
        ......
    
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR)
        
        ......
        }
    
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    我们进入aeCreateFileEvent方法,

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
            aeFileProc *proc, void *clientData)
    {
        ......
        
        //aeApiAddEvent函数内部调用epoll_ctl函数
        if (aeApiAddEvent(eventLoop, fd, mask) == -1)
            return AE_ERR;
        ......
        //将acceptTcpHandler回调函数挂到当前连接事件上
        if (mask & AE_READABLE) fe->rfileProc = proc;
        if (mask & AE_WRITABLE) fe->wfileProc = proc;
        
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    aeCreateFileEvent主要就是做两件事,注册连接事件、给事件注册回调函数,回调函数proc就是acceptTcpHandler。aeApiAddEvent是对epoll_ctl函数的封装。我们进入aeApiAddEvent方法看一下

    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
        ......
        //调用epoll的epoll_ctl函数注册事件,一共4个参数。
        //1、epoll实例
        //2、要执行的操作类型,添加事件还是修改修改事件。第一次肯定是添加事件
        //3、要监听的文件描述符
        //4、epoll_event类型变量
        if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这里,我们就看到了epoll_ctl函数。

    6、从epoll实例中获取就绪的事件

    这个获取就绪事件的动作,是在main方法的aeMain函数中。

    int main(int argc, char **argv) {
        ......
        //执行aeMain函数开启事件循环处理框架。就是用epoll_wait从内核获取就绪的事件
        aeMain(server.el);
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    我们进入aeMain函数。

    void aeMain(aeEventLoop *eventLoop) {
        //只要redis实例没有停止,while循环就会一直执行
        eventLoop->stop = 0;
        //stop是redis服务是否停止的标志,如果stop值变为1,说明redis服务停止了
        while (!eventLoop->stop) {
            ......
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    我们看到获取就绪的事件函数是aeProcessEvents,我们进入其中看一下

    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
            ......
            //调用多路复用API
            numevents = aeApiPoll(eventLoop, tvp);
            ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可以看到一个aeApiPoll函数,该函数是对epoll_wait函数的封装,我们继续进入aeApiPoll函数。

    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        aeApiState *state = eventLoop->apidata;
        int retval, numevents = 0;
        //等待有可读写的事件发生.返回值为可读写的事件数量
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
        if (retval > 0) {
            int j;
            //获得监听到的事件数量
            numevents = retval;
            //针对每一个就绪的事件进行处理
            for (j = 0; j < numevents; j++) {
                //保存事件信息
                int mask = 0;
    
                //获取到当前就绪的这个事件
                struct epoll_event *e = state->events+j;
    
                //EPOLLIN代表epoll模型的读事件,这一行代码的意思是将epoll的读事件映射到redis事件驱动框架的读事件
                if (e->events & EPOLLIN) mask |= AE_READABLE;
    
                //EPOLLOUT代表epoll模型的写事件,这一行代码的意思是将epoll的写事件映射到redis事件驱动框架的写事件
                if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
    
                //EPOLLERR:错误事件,表示文件描述符对应套接字出错
                if (e->events & EPOLLERR) mask |= AE_WRITABLE;
                if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
    
                //将epoll模型中已就绪的描述符映射到redis事件循环框架的就绪事件数组中
                eventLoop->fired[j].fd = e->data.fd;
    
                //给已就绪的事件设置事件类型
                eventLoop->fired[j].mask = mask;
            }
        }
        return numevents;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    在其中,我们看到了epoll_wait函数,epoll_wait一共四个入参。
    第一个:要监听的epoll实例描述符。
    第二个:内核会将就绪的事件放入该集合。
    第三个:要监听的描述符数量。
    第四个:等待结果返回的超时时间。
    返回了结果后,后面就是处理就绪的事件,epoll_wait方法是redis IO多路复用的关键所在,redis能不断的接收客户端请求,依赖的就是epoll_wait函数,我给每一行代码都加了注释,可以细看一下。

    最后说一下,文章参考了极客时间的redis源码课程《redis源码剖析与实战》,文章写的很好,有兴趣的小伙伴可以去看看。

  • 相关阅读:
    MySQL 表的增删查改
    Prompt Engineering 高级提示工程技巧
    偷个懒,简化Git提交脚本输入
    Object Storage 东西虽小作用很大
    QT调用onnx 模型Demo(代码和讲解)
    第六章 java集合
    SpringBoot 使用 Minio 进行文件存储
    论文翻译:2022_DeepFilterNet2: Towards Real-Time Speech Enhancement On Embedded Devices For Fullband Audio
    来自华为的暴击?传高通裁员赔偿N+7 | 百能云芯
    聚苯乙烯-二乙烯基苯-甲基丙烯酸缩水甘油酯(PS-GMA)微球/聚吡咯复合微球的性能
  • 原文地址:https://blog.csdn.net/qq_39839075/article/details/134221762