• redis一条set命令的执行过程


    set命令,在我们看来很简单,set zhangsan lisi,redis给我们返回一个 ok,就完事了。那redis的服务端是怎么处理这条简单的命令的?是不是像我们看起来的这么简单。今天这篇文章就来聊聊这个问题。
    在上一篇文章中,我们聊了redis IO多路复用的事件驱动框架。我们大致了解了redis是如何接收连接,如何将客户端的连接行为封装成事件并结合IO多路复用实现了对客户端连接的监听
    这一篇,我们聊聊当内核监听到客户端连接事件后,具体是如何处理连接事件的,我们用set命令来举例子。

    1、事件回调函数执行

    上一篇的最后,我们讲到了redis通过epoll_wait函数从内核轮询就绪的事件,获取到事件后,开始执行回调处理函数。
    这部分代码在ae.c的aeProcessEvents方法中

    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        ......
        //eventLoop->maxfd != -1,这个表达式代表有IO事件发生
        if (eventLoop->maxfd != -1 ||
            //(flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT)),这个表达式代表有紧急的时间事件发生
            ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
            ......
    
            //调用多路复用API
            /* Call the multiplexing API, will return only on timeout or when
             * some event fires. */
            //轮询获取就绪的事件
            numevents = aeApiPoll(eventLoop, tvp);
            ......
    
            for (j = 0; j < numevents; j++) {
    
                //获取就绪事件
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
                ......
    
                //如果触发的是可读事件,调用事件注册时设置的读事件回调处理函数
                if (!invert && fe->mask & mask & AE_READABLE) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
    
                ......
            }
        }
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    2、处理客户端连接

    接下来,流程就到了执行注册函数,也就是acceptTcpHandler方法,我们看一下acceptTcpHandler方法的实现。主要逻辑是:

    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        ......
    
        while(max--) {
            //创建已连接套接字cfd
            cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
            ......
            acceptCommonHandler(cfd,0,cip);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    可以看到,主要逻辑在acceptCommonHandler中

    static void acceptCommonHandler(int fd, int flags, char *ip) {
        client *c;
        //创建客户端
        if ((c = createClient(fd)) == NULL) {
            ......
        }
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    acceptCommonHandler中,针对就绪的连接事件,首先创建一个客户端client,注意一下这个结构体,后面的命令执行、命令返回都需要用到这个结构体,很重要.

    static void acceptCommonHandler(int fd, int flags, char *ip) {
        client *c;
        //创建客户端
        if ((c = createClient(fd)) == NULL) {
            ......
        }
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    继续进入createClient方法,可以看到在createClient方法中,又创建了一个读事件,执行函数是readQueryFromClient

    client *createClient(int fd) {
        client *c = zmalloc(sizeof(client));
        ......
        if (fd != -1) {
            ......
            //这里给已连接事件注册的事件类型是:AE_READABLE,
            //这是因为无论客户端发送的请求是读或写操作,对于 server 来说,都是要读取客户端的请求并解析处理
            if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                readQueryFromClient, c) == AE_ERR)
            {
                close(fd);
                zfree(c);
                return NULL;
            }
        }
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    3、处理读事件

    至此,连接事件处理函数的主要逻辑就过完了,其中最主要的两块逻辑,一块是创建客户端,一块是注册一个读事件。读事件注册好了,接下来,就等着客户端发送具体的执行命令。
    假设此时,我们在客户端执行了set zhangsan lisi
    首先redis客户端先进行处理,使用RESP协议将这条命令发给redis服务端,服务端收到这条命令后,走事件处理的逻辑,之后触发回调函数,我们的这个例子里,回调函数就是readQueryFromClient。
    我们看一下readQueryFromClient的逻辑,其实不管是写还是读,对于redis来说都需要先读,之后在命令执行时再区分写还是读。

    //处理读事件的函数
    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
        client *c = (client*) privdata;
        ......
        //为读缓冲区分配空间
        c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
        //调用read函数读取数据
        nread = read(fd, c->querybuf+qblen, readlen);
        ......
        //进一步处理读取内容
        processInputBufferAndReplicate(c);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    可以看到,主要就是调用read函数读取客户端数据,然后调用processInputBufferAndReplicate处理函数
    我们进入processInputBufferAndReplicate看一下

    void processInputBufferAndReplicate(client *c) {
        //当前客户端不属于主从复制中的主节点
        if (!(c->flags & CLIENT_MASTER)) {
            processInputBuffer(c);
        } else {
            ......
            if (applied) {
                //将主节点接收到的命令同步给从节点
                replicationFeedSlavesFromMasterStream(server.slaves,
                        c->pending_querybuf, applied);
                sdsrange(c->pending_querybuf,applied,-1);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    4、读事件对应命令解析

    我们进入processInputBuffer方法看一下。这个方法的主要逻辑有两部分。
    一部分是processMultibulkBuffer函数的执行。
    一部分是processCommand函数的执行。

    void processInputBuffer(client *c) {
        server.current_client = c;
    
        /* Keep processing while there is something in the input buffer */
        while(c->qb_pos < sdslen(c->querybuf)) {
            ......
            //如果命令以"*"开头,说明是RESP协议的请求,RESP协议是redis客户端和服务器端的通信协议
            if (!c->reqtype) {
                if (c->querybuf[c->qb_pos] == '*') {
                    c->reqtype = PROTO_REQ_MULTIBULK;
                } else {
                    //不是RESP协议,那就是管道命令.比如:telnet命令
                    c->reqtype = PROTO_REQ_INLINE;
                }
            }
    
            if (c->reqtype == PROTO_REQ_INLINE) {
                //如果不是RESP协议,执行该函数
                if (processInlineBuffer(c) != C_OK) break;
            } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
                //如果是RESP协议,执行该函数
                if (processMultibulkBuffer(c) != C_OK) break;
            } else {
                serverPanic("Unknown request type");
            }
    
            /* Multibulk processing could see a <= 0 length. */
            if (c->argc == 0) {
                resetClient(c);
            } else {
                /* Only reset the client when the command was executed. */
                //调用processCommand执行命令
                if (processCommand(c) == C_OK) {
                    ......
                }
                ......
            }
        }
    
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    我看这部分源码的时候,忽略了processMultibulkBuffer,但其实这部分代码很重要。里面的主要逻辑是解析RESP协议的内容,比如我们执行了一条set命令 set zhangsan lisi,此时redis客户端会将这条命令以RESP协议的形式发送到redis的服务端。到了服务端,就要靠processMultibulkBuffer进行解析。如果忽略了这部分代码,后面的逻辑就看不明白。
    所以,我们先看processMultibulkBuffer方法.这个方法中,我们主要就看我保留的这部分逻辑,这部分就是在解析RESP协议,比如:set zhangsan lisi,最终会被解析成三个RedisObject结构体,存储到c->argv数组中,argv数组是一个RedisObject数组。后面set命令真正执行时,会从该数组中获取到数据执行。

    int processMultibulkBuffer(client *c) {
        ......
            if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
                /* Not enough data (+2 == trailing \r\n) */
                break;
            } else {
                //解析RESP协议,获取到RESP协议中的具体指令,赋值到client->argv变量上,后面lookupCommand函数会用解析到的指令名称查询真正的执行函数,比如:set的执行函数是setCommand
                ......
                    c->argv[c->argc++] =
                        createStringObject(c->querybuf+c->qb_pos,c->bulklen);
                    c->qb_pos += c->bulklen+2;
                ......
                c->bulklen = -1;
                c->multibulklen--;
            }
        }
    
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    processMultibulkBuffer方法执行完,我们就获取到了客户端想要执行的指令以及对应的键值对数据。接下来,我们返回主流程,继续看processInputBuffer的处理逻辑,解析完命令后,后面就开始处理命令。这里面有两段逻辑最重要。
    第一段:根据前面解析RESP协议得到的c->argv数组获取到指令名称,比如:set或者get,通过命令的名称从lookupCommand函数中获取到执行命令的函数

    5、获取命令执行函数

    int processCommand(client *c) {
        
        //是否为quit命令
        if (!strcasecmp(c->argv[0]->ptr,"quit")) {
            //quit命令直接退出
            ......
        }
    
        ......
        //在全局变量server的commands成员变量中查找相关的命令,存储命令的数据结构是一个hash表
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        ......
    
        /* Exec the command */
        //如果客户端有CLIENT_MULTI标记,并且当前命令不是exec、discard、multi和watch命令,将命令入队保存,等待后续一起处理
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
            //将命令入队
            queueMultiCommand(c);
            addReply(c,shared.queued);
        } else {
            //调用call函数执行命令
            call(c,CMD_CALL_FULL);
            ......
        }
        return C_OK;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    这里岔开话题,说一下lookupCommand函数,这个函数的主要作用就是根据命令名称获取到执行命令的函数。里面的具体实现是从一个server.commands成员变量中获取数据.server.commands是一个hash结构,lookupCommand的出参是redisCommand结构体。

    struct redisCommand *lookupCommand(sds name) {
        return dictFetchValue(server.commands, name);
    }
    
    • 1
    • 2
    • 3

    6、命令执行函数填充

    server.commands的初始化操作是在main函数的initServerConfig()函数中,先创建hash,然后填充hash

    void initServerConfig(void) {
        ......
        //创建hash结构
        server.commands = dictCreate(&commandTableDictType,NULL);
        ......
        //填充hash结构
        populateCommandTable();
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    populateCommandTable函数就是将redisCommandTable中的数据放入server.commands哈希结构中

    void populateCommandTable(void) {
        int j;
        int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
    
        for (j = 0; j < numcommands; j++) {
            struct redisCommand *c = redisCommandTable+j;
            ......
            //哈希结构的key是RedisCommand中的name,value是RedisCommand结构体
    
            retval1 = dictAdd(server.commands, sdsnew(c->name), c);
            ......
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    填充hash结构,就是将redisCommandTable数组中的数据填充到hash结构中.
    redisCommandTable数组是RedisCommand结构体的集合

    struct redisCommand redisCommandTable[] = {
        //第一个参数是命令的名称
        //第二个参数是命令的实现函数
        ......
        {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
        {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
        {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
        {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0}
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    可以看到redisCommandTable中有很多我们熟悉的命令,比如:set、get命令。

    7、set命令执行函数

    现在server.commands哈希结构体中有了数据,假如我们执行的是set命令,所以通过lookupCommand获取命令执行函数,就是setCommand,接着调用call函数,执行该命令,所以我们进入setCommand函数继续看逻辑。

    void setCommand(client *c) {
        int j;
        robj *expire = NULL;
        int unit = UNIT_SECONDS;
        int flags = OBJ_SET_NO_FLAGS;
        ......
        for (j = 3; j < c->argc; j++) {
            char *a = c->argv[j]->ptr;
            robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
    
            if ((a[0] == 'n' || a[0] == 'N') &&
                (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                !(flags & OBJ_SET_XX))
            {
                flags |= OBJ_SET_NX;
            }
        }
        ......
        setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    首先判断set命令的具体指令,我们知道,除了set命令,还有setNX,setEX。之后就调用setGenericCommand函数执行具体的逻辑

    void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
        ......
        //如果有NX选项,就查找key是否存在,如果key存在,直接返回null
        if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
            (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
        {
            addReply(c, abort_reply ? abort_reply : shared.nullbulk);
            return;
        }
        //开始设置键值对
        setKey(c->db,key,val);
        ......
        //如果客户端设置了过期时间.这里需要处理过期的逻辑
        if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
        notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
        if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
            "expire",key,c->db->id);
        
        //调用addReply函数响应客户端
        addReply(c, ok_reply ? ok_reply : shared.ok);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    setGenericCommand函数的逻辑中,比较重要的有两块逻辑:
    一块是setKey,存储键值对
    一块是addReply,响应客户端。
    我们先看setKey的执行逻辑
    setKey的入参,有三个,第一个是库名,默认是0库,第二个是set命令的key,第三个是set命令的value值

    void setKey(redisDb *db, robj *key, robj *val) {
        //查找key是否存在
        if (lookupKeyWrite(db,key) == NULL) {
            //不存在新增
            dbAdd(db,key,val);
        } else {
            //存在覆盖
            dbOverwrite(db,key,val);
        }
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    进入lookupKeyWrite函数,主要是查找key是否存在,查找key的逻辑是在dictFind函数中,该函数入参有两个,第一个是全局hash函数,第二个是key值

    robj *lookupKey(redisDb *db, robj *key, int flags) {
        //从redis库的全局hash结构中查找
        dictEntry *de = dictFind(db->dict,key->ptr);
        if (de) {
            robj *val = dictGetVal(de);
    
            ......
            return val;
        } else {
            return NULL;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    看到这里,其实我们就知道,redis的set命令也用到了hash结构,用来加速命令的执行。如果未找到该key,就直接返回null.然后就执行键值对的新增命令。
    新增命令的具体逻辑,在dictAddRaw函数中。

    dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
    {
        long index;
        dictEntry *entry;
        dictht *ht;
        //全局hash表是否在进行rehash
        if (dictIsRehashing(d)) _dictRehashStep(d);
    
        //key已经存在,直接返回
        if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
            return NULL;
    
        如果在进行rehash,使用1,否则使用0表。这块是一个面试题,rehash的过程中,会用到两个hash表
        ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
        //下面就开始真正的新增操作了
        //先初始化一个Entry节点
        entry = zmalloc(sizeof(*entry));
    
        //这两行代码是在将新的entry添加到现有链表中。注意ht->table[index],这代表一个元素。
        //如果index这个位置上有元素A,那新建entry的下一个元素就是A,同时index的位置上的节点变更为新建的entry
        entry->next = ht->table[index];
        ht->table[index] = entry;
        //hash表的节点数增加1
        ht->used++;
    
        //将key值设置到Entry中,这个函数是一个宏,用gdb打断点的话,宏里面直接走的else逻辑,也就是(entry)->key = (_key_);一个简单的赋值操作
        dictSetKey(d, entry, key);
        return entry;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    8、客户端命令响应

    至此,我们就跟完了一条完整的set命令执行过程。
    set命令之后,然后就是响应客户端的操作,逻辑在addReply方法中,主要就是执行prepareClientToWrite函数

    void addReply(client *c, robj *obj) {
        //执行prepareClientToWrite函数
        if (prepareClientToWrite(c) != C_OK) return;
    
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    prepareClientToWrite函数封装了clientInstallWriteHandler函数,其中主要的逻辑就是将客户端插入到server.clients_pending_write列表中。很显然,这不是真正的响应客户端操作,因为我们知道读取客户端数据,用的是read函数,那响应客户端数据,盲猜也得是write之类的函数。所以,一定有另外的地方处理clients_pending_write列表.
    处理的逻辑在server.c文件的beforeSleep函数中

    void beforeSleep(struct aeEventLoop *eventLoop) {
        ......
        //注册写事件处理逻辑
        handleClientsWithPendingWrites();
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    handleClientsWithPendingWrites没有入参。可以看到该函数的主要逻辑就是从clients_pending_write链表中获取节点,然后进行处理,这里面会调用write函数

    int handleClientsWithPendingWrites(void) {
        listIter li;
        listNode *ln;
        //从clients_pending_write列表中获取数据
        int processed = listLength(server.clients_pending_write);
        //获取待写回的客户端列表,这个li是一个迭代器的指针
        listRewind(server.clients_pending_write,&li);
        //遍历每一个待写回的客户端
        while((ln = listNext(&li))) {
            ......
            //获取到待响应的客户端
            client *c = listNodeValue(ln);
            
            //调用writeToClient将当前客户端的输出缓冲区数据写回,该函数封装了和read方法对应的write方法
            if (writeToClient(c->fd,c,0) == C_ERR) continue;
    
            ......
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    beforeSleep函数的触发逻辑是在server.c的main方法中
    首先通过aeSetBeforeSleepProc函数将beforeSleep注册到EventLoop结构体中。

    int main(int argc, char **argv) {
    	......
    	//redis启动过程中会将server.c文件中的beforeSleep函数注册到事件循环框架结构体的beforesleep变量处
        aeSetBeforeSleepProc(server.el,beforeSleep);
    	......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    之后,随着事件循环不断的执行,beforeSleep函数也会不断的执行。

    void aeMain(aeEventLoop *eventLoop) {
        ......
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                
                //执行beforeSleep函数
                eventLoop->beforesleep(eventLoop);
            
            ///......不断的调用内核,获取就绪的事件进行处理
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    以上,就是一个完整的set命令的执行过程。

    文章参考了极客时间的,课程中没有我描述的这么详细,算是对课程做了一个补充

  • 相关阅读:
    上班族有什么靠谱的兼职副业可以做?
    微服务SpringCloud(Bus 消息总线)整合十九
    零基础上手unity VR开发【Oculus账号体系准备】
    记一次线程堵塞(挂起)导致消息队列积压
    【数智化人物展】同方有云联合创始人兼总经理江琦:云计算,引领数智化升级的动能...
    Groovy语法&Gradle配置学习笔记
    Android---APK 瘦身
    从0到0.01入门React | 008.精选 React 面试题
    梦开始的地方 —— C语言指针进阶
    九、Linux高阶命令
  • 原文地址:https://blog.csdn.net/qq_39839075/article/details/134352140