• redis源码解析


    环境

    • redis1.0

    redis早期版本代码不多,但包括redis核心功能,如事件驱动、线程模型、数据模型,适合阅读研究其核心结构

    核心文件

    • anet
    1. anet负责基本的socket操作
    2. 包括连接建立、读、写,服务端端口监听、接收连接等
    3. 底层采用select多路复用器
    • ae
    typedef struct aeEventLoop {
        long long timeEventNextId;
        aeFileEvent *fileEventHead;
        aeTimeEvent *timeEventHead;
        int stop;
    } aeEventLoop;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. ae为事件驱动处理库
    2. 事件包括io事件以及定时事件
    • redis

    redis主要负责客户端连接建立、命令处理、定时任务处理

    线程模型

    • 网络编程主要包括建立与客户端的连接、读写事件、业务处理三个部分,根据实际的业务情况,可以把三部分分别或组合放在不同的线程中进行处理,由此产生了不同的线程模型。而redis采用单线程处理了连接建立、io读写、业务处理甚至还包括定时任务,这一切都基于其事件驱动模型。
    • aeEventLoop包括io(aeFileEvent)和定时任务(aeTimeEvent)两类事件,每类事件为一个链表队列,程序采用循环遍历事件队列进行事件处理。
    void aeMain(aeEventLoop *eventLoop)
    {
        eventLoop->stop = 0;
        while (!eventLoop->stop)
            aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    数据模型

    数据库

    typedef struct redisDb {
        dict *dict;
        dict *expires;
        int id;
    } redisDb;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • redis默认有16个数据库,每个数据库包括两个hash表,一个用于存储无过期时间的kv,另一个用于存储有过期时间的kv

    kv

    typedef struct redisObject {
        void *ptr;
        int type;
        int refcount;
    } robj;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ● kv都为redisObject类型,该结构包括数据、数据的类型(String、List、Hash、Set)、引用数量,只是key的数据类型为String

    主要数据结构

    • adlist

    双向链表结构

    • dict
    1. dict为hash表,内部采用数组存储元素
    2. 出现hash冲突时,采用链表存储
    3. 每次扩容为原大小的2倍,实际大小为2n
    • sds

    字符串操作库

    核心方法

    程序入口main

    int main(int argc, char **argv) {
        initServerConfig();
        if (argc == 2) {
            ResetServerSaveParams();
            loadServerConfig(argv[1]);
        } else if (argc > 2) {
            fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
            exit(1);
        } else {
            redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
        }
        initServer();
        if (server.daemonize) daemonize();
        redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
    #ifdef __linux__
        linuxOvercommitMemoryWarning();
    #endif
        if (rdbLoad(server.dbfilename) == REDIS_OK)
            redisLog(REDIS_NOTICE,"DB loaded from disk");
        if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
            acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
        redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
        aeMain(server.el);
        aeDeleteEventLoop(server.el);
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    1. 程序入口为redis.c的main方法
    2. initServer方法中主要包括绑定端口、建立网络监听、入队定时任务事件
    3. 开启事件循环

    建立连接

    static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        int cport, cfd;
        char cip[128];
        redisClient *c;
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(mask);
        REDIS_NOTUSED(privdata);
    
        cfd = anetAccept(server.neterr, fd, cip, &cport);
        if (cfd == AE_ERR) {
            redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
            return;
        }
        redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
        if ((c = createClient(cfd)) == NULL) {
            redisLog(REDIS_WARNING,"Error allocating resoures for the client");
            close(cfd); /* May be already closed, just ingore errors */
            return;
        }
        /* If maxclient directive is set and this is one client more... close the
         * connection. Note that we create the client instead to check before
         * for this condition, since now the socket is already set in nonblocking
         * mode and we can send an error for free using the Kernel I/O */
        if (server.maxclients && listLength(server.clients) > server.maxclients) {
            char *err = "-ERR max number of clients reached\r\n";
    
            /* That's a best effort error message, don't check write errors */
            (void) write(c->fd,err,strlen(err));
            freeClient(c);
            return;
        }
        server.stat_numconnections++;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    创建客户端

    static redisClient *createClient(int fd) {
        redisClient *c = zmalloc(sizeof(*c));
    
        anetNonBlock(NULL,fd);
        anetTcpNoDelay(NULL,fd);
        if (!c) return NULL;
        selectDb(c,0);
        c->fd = fd;
        c->querybuf = sdsempty();
        c->argc = 0;
        c->argv = NULL;
        c->bulklen = -1;
        c->sentlen = 0;
        c->flags = 0;
        c->lastinteraction = time(NULL);
        c->authenticated = 0;
        c->replstate = REDIS_REPL_NONE;
        if ((c->reply = listCreate()) == NULL) oom("listCreate");
        listSetFreeMethod(c->reply,decrRefCount);
        listSetDupMethod(c->reply,dupClientReplyValue);
        if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
            readQueryFromClient, c, NULL) == AE_ERR) {
            freeClient(c);
            return NULL;
        }
        if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
        return c;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    连接建立后,会把当前客户端加入io事件队列

    io读

    static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
        redisClient *c = (redisClient*) privdata;
        char buf[REDIS_IOBUF_LEN];
        int nread;
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(mask);
    
        nread = read(fd, buf, REDIS_IOBUF_LEN);
        if (nread == -1) {
            if (errno == EAGAIN) {
                nread = 0;
            } else {
                redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
                freeClient(c);
                return;
            }
        } else if (nread == 0) {
            redisLog(REDIS_DEBUG, "Client closed connection");
            freeClient(c);
            return;
        }
        if (nread) {
            c->querybuf = sdscatlen(c->querybuf, buf, nread);
            c->lastinteraction = time(NULL);
        } else {
            return;
        }
    
    again:
        if (c->bulklen == -1) {
            /* Read the first line of the query */
            char *p = strchr(c->querybuf,'\n');
            size_t querylen;
    
            if (p) {
                sds query, *argv;
                int argc, j;
                
                query = c->querybuf;
                c->querybuf = sdsempty();
                querylen = 1+(p-(query));
                if (sdslen(query) > querylen) {
                    /* leave data after the first line of the query in the buffer */
                    c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
                }
                *p = '\0'; /* remove "\n" */
                if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
                sdsupdatelen(query);
    
                /* Now we can split the query in arguments */
                if (sdslen(query) == 0) {
                    /* Ignore empty query */
                    sdsfree(query);
                    return;
                }
                argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
                if (argv == NULL) oom("sdssplitlen");
                sdsfree(query);
    
                if (c->argv) zfree(c->argv);
                c->argv = zmalloc(sizeof(robj*)*argc);
                if (c->argv == NULL) oom("allocating arguments list for client");
    
                for (j = 0; j < argc; j++) {
                    if (sdslen(argv[j])) {
                        c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
                        c->argc++;
                    } else {
                        sdsfree(argv[j]);
                    }
                }
                zfree(argv);
                /* Execute the command. If the client is still valid
                 * after processCommand() return and there is something
                 * on the query buffer try to process the next command. */
                if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
                return;
            } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
                redisLog(REDIS_DEBUG, "Client protocol error");
                freeClient(c);
                return;
            }
        } else {
            /* Bulk read handling. Note that if we are at this point
               the client already sent a command terminated with a newline,
               we are reading the bulk data that is actually the last
               argument of the command. */
            int qbl = sdslen(c->querybuf);
    
            if (c->bulklen <= qbl) {
                /* Copy everything but the final CRLF as final argument */
                c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
                c->argc++;
                c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
                processCommand(c);
                return;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99

    readQueryFromClient 负责io数据读取、完成数据的拆包组装、解析

    命令处理

    static int processCommand(redisClient *c) {
        struct redisCommand *cmd;
        long long dirty;
    
        /* Free some memory if needed (maxmemory setting) */
        if (server.maxmemory) freeMemoryIfNeeded();
    
        /* The QUIT command is handled as a special case. Normal command
         * procs are unable to close the client connection safely */
        if (!strcasecmp(c->argv[0]->ptr,"quit")) {
            freeClient(c);
            return 0;
        }
        cmd = lookupCommand(c->argv[0]->ptr);
        if (!cmd) {
            addReplySds(c,sdsnew("-ERR unknown command\r\n"));
            resetClient(c);
            return 1;
        } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
                   (c->argc < -cmd->arity)) {
            addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
            resetClient(c);
            return 1;
        } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
            addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
            resetClient(c);
            return 1;
        } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
            int bulklen = atoi(c->argv[c->argc-1]->ptr);
    
            decrRefCount(c->argv[c->argc-1]);
            if (bulklen < 0 || bulklen > 1024*1024*1024) {
                c->argc--;
                addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
                resetClient(c);
                return 1;
            }
            c->argc--;
            c->bulklen = bulklen+2; /* add two bytes for CR+LF */
            /* It is possible that the bulk read is already in the
             * buffer. Check this condition and handle it accordingly */
            if ((signed)sdslen(c->querybuf) >= c->bulklen) {
                c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
                c->argc++;
                c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
            } else {
                return 1;
            }
        }
        /* Let's try to share objects on the command arguments vector */
        if (server.shareobjects) {
            int j;
            for(j = 1; j < c->argc; j++)
                c->argv[j] = tryObjectSharing(c->argv[j]);
        }
        /* Check if the user is authenticated */
        if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
            addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
            resetClient(c);
            return 1;
        }
    
        /* Exec the command */
        dirty = server.dirty;
        cmd->proc(c);
        if (server.dirty-dirty != 0 && listLength(server.slaves))
            replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
        if (listLength(server.monitors))
            replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
        server.stat_numcommands++;
    
        /* Prepare the client for the next command */
        if (c->flags & REDIS_CLOSE) {
            freeClient(c);
            return 0;
        }
        resetClient(c);
        return 1;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    定时任务处理

    static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        int j, loops = server.cronloops++;
        REDIS_NOTUSED(eventLoop);
        REDIS_NOTUSED(id);
        REDIS_NOTUSED(clientData);
    
        /* Update the global state with the amount of used memory */
        server.usedmemory = zmalloc_used_memory();
    
        /* Show some info about non-empty databases */
        for (j = 0; j < server.dbnum; j++) {
            long long size, used, vkeys;
    
            size = dictSlots(server.db[j].dict);
            used = dictSize(server.db[j].dict);
            vkeys = dictSize(server.db[j].expires);
            if (!(loops % 5) && (used || vkeys)) {
                redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
                /* dictPrintStats(server.dict); */
            }
        }
    
        /* We don't want to resize the hash tables while a bacground saving
         * is in progress: the saving child is created using fork() that is
         * implemented with a copy-on-write semantic in most modern systems, so
         * if we resize the HT while there is the saving child at work actually
         * a lot of memory movements in the parent will cause a lot of pages
         * copied. */
        if (!server.bgsaveinprogress) tryResizeHashTables();
    
        /* Show information about connected clients */
        if (!(loops % 5)) {
            redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
                listLength(server.clients)-listLength(server.slaves),
                listLength(server.slaves),
                server.usedmemory,
                dictSize(server.sharingpool));
        }
    
        /* Close connections of timedout clients */
        if (server.maxidletime && !(loops % 10))
            closeTimedoutClients();
    
        /* Check if a background saving in progress terminated */
        if (server.bgsaveinprogress) {
            int statloc;
            if (wait4(-1,&statloc,WNOHANG,NULL)) {
                int exitcode = WEXITSTATUS(statloc);
                int bysignal = WIFSIGNALED(statloc);
    
                if (!bysignal && exitcode == 0) {
                    redisLog(REDIS_NOTICE,
                        "Background saving terminated with success");
                    server.dirty = 0;
                    server.lastsave = time(NULL);
                } else if (!bysignal && exitcode != 0) {
                    redisLog(REDIS_WARNING, "Background saving error");
                } else {
                    redisLog(REDIS_WARNING,
                        "Background saving terminated by signal");
                    rdbRemoveTempFile(server.bgsavechildpid);
                }
                server.bgsaveinprogress = 0;
                server.bgsavechildpid = -1;
                updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
            }
        } else {
            /* If there is not a background saving in progress check if
             * we have to save now */
             time_t now = time(NULL);
             for (j = 0; j < server.saveparamslen; j++) {
                struct saveparam *sp = server.saveparams+j;
    
                if (server.dirty >= sp->changes &&
                    now-server.lastsave > sp->seconds) {
                    redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                        sp->changes, sp->seconds);
                    rdbSaveBackground(server.dbfilename);
                    break;
                }
             }
        }
    
        /* Try to expire a few timed out keys */
        for (j = 0; j < server.dbnum; j++) {
            redisDb *db = server.db+j;
            int num = dictSize(db->expires);
    
            if (num) {
                time_t now = time(NULL);
    
                if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
                    num = REDIS_EXPIRELOOKUPS_PER_CRON;
                while (num--) {
                    dictEntry *de;
                    time_t t;
    
                    if ((de = dictGetRandomKey(db->expires)) == NULL) break;
                    t = (time_t) dictGetEntryVal(de);
                    if (now > t) {
                        deleteKey(db,dictGetEntryKey(de));
                    }
                }
            }
        }
    
        /* Check if we should connect to a MASTER */
        if (server.replstate == REDIS_REPL_CONNECT) {
            redisLog(REDIS_NOTICE,"Connecting to MASTER...");
            if (syncWithMaster() == REDIS_OK) {
                redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
            }
        }
        return 1000;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115

    serverCron主要完成客户端清理、过期key清理、rdbsave、master同步

  • 相关阅读:
    如何做到,小程序上线1个月总用户量提高70%
    C语言常用的字符串函数(含模拟实现)
    java计算机毕业设计-酒店管理系统-源码+mysql数据库+系统+lw文档+部署
    虚拟网络适配器的实现
    【C语言】-结构体内存对齐。附详细图解
    华为OD机试 - 最长的指定瑕疵度的元音子串 - 正则表达式(Java 2023 B卷 200分)
    PHP即刻送达同城派送小程序系统
    【云原生 | Kubernetes 系列】K8s 实战 使用 Kustomize 对 Kubernetes 对象进行声明式管理
    C语言第十二课(中):操作符详解【单目、关系、逻辑、条件操作符】
    深入浅出 testing-library
  • 原文地址:https://blog.csdn.net/hawk199/article/details/126146117