set命令,在我们看来很简单,set zhangsan lisi,redis给我们返回一个 ok,就完事了。那redis的服务端是怎么处理这条简单的命令的?是不是像我们看起来的这么简单。今天这篇文章就来聊聊这个问题。
在上一篇文章中,我们聊了redis IO多路复用的事件驱动框架。我们大致了解了redis是如何接收连接,如何将客户端的连接行为封装成事件并结合IO多路复用实现了对客户端连接的监听
这一篇,我们聊聊当内核监听到客户端连接事件后,具体是如何处理连接事件的,我们用set命令来举例子。
上一篇的最后,我们讲到了redis通过epoll_wait函数从内核轮询就绪的事件,获取到事件后,开始执行回调处理函数。
这部分代码在ae.c的aeProcessEvents方法中
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
......
//eventLoop->maxfd != -1,这个表达式代表有IO事件发生
if (eventLoop->maxfd != -1 ||
//(flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT)),这个表达式代表有紧急的时间事件发生
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
......
//调用多路复用API
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
//轮询获取就绪的事件
numevents = aeApiPoll(eventLoop, tvp);
......
for (j = 0; j < numevents; j++) {
//获取就绪事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
......
//如果触发的是可读事件,调用事件注册时设置的读事件回调处理函数
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
......
}
}
......
}
接下来,流程就到了执行注册函数,也就是acceptTcpHandler方法,我们看一下acceptTcpHandler方法的实现。主要逻辑是:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
......
while(max--) {
//创建已连接套接字cfd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
......
acceptCommonHandler(cfd,0,cip);
}
}
可以看到,主要逻辑在acceptCommonHandler中
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
//创建客户端
if ((c = createClient(fd)) == NULL) {
......
}
......
}
acceptCommonHandler中,针对就绪的连接事件,首先创建一个客户端client,注意一下这个结构体,后面的命令执行、命令返回都需要用到这个结构体,很重要.
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
//创建客户端
if ((c = createClient(fd)) == NULL) {
......
}
......
}
继续进入createClient方法,可以看到在createClient方法中,又创建了一个读事件,执行函数是readQueryFromClient
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
......
if (fd != -1) {
......
//这里给已连接事件注册的事件类型是:AE_READABLE,
//这是因为无论客户端发送的请求是读或写操作,对于 server 来说,都是要读取客户端的请求并解析处理
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
......
}
至此,连接事件处理函数的主要逻辑就过完了,其中最主要的两块逻辑,一块是创建客户端,一块是注册一个读事件。读事件注册好了,接下来,就等着客户端发送具体的执行命令。
假设此时,我们在客户端执行了set zhangsan lisi
首先redis客户端先进行处理,使用RESP协议将这条命令发给redis服务端,服务端收到这条命令后,走事件处理的逻辑,之后触发回调函数,我们的这个例子里,回调函数就是readQueryFromClient。
我们看一下readQueryFromClient的逻辑,其实不管是写还是读,对于redis来说都需要先读,之后在命令执行时再区分写还是读。
//处理读事件的函数
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
......
//为读缓冲区分配空间
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
//调用read函数读取数据
nread = read(fd, c->querybuf+qblen, readlen);
......
//进一步处理读取内容
processInputBufferAndReplicate(c);
}
可以看到,主要就是调用read函数读取客户端数据,然后调用processInputBufferAndReplicate处理函数
我们进入processInputBufferAndReplicate看一下
void processInputBufferAndReplicate(client *c) {
//当前客户端不属于主从复制中的主节点
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
......
if (applied) {
//将主节点接收到的命令同步给从节点
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
我们进入processInputBuffer方法看一下。这个方法的主要逻辑有两部分。
一部分是processMultibulkBuffer函数的执行。
一部分是processCommand函数的执行。
void processInputBuffer(client *c) {
server.current_client = c;
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
......
//如果命令以"*"开头,说明是RESP协议的请求,RESP协议是redis客户端和服务器端的通信协议
if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
//不是RESP协议,那就是管道命令.比如:telnet命令
c->reqtype = PROTO_REQ_INLINE;
}
}
if (c->reqtype == PROTO_REQ_INLINE) {
//如果不是RESP协议,执行该函数
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
//如果是RESP协议,执行该函数
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
//调用processCommand执行命令
if (processCommand(c) == C_OK) {
......
}
......
}
}
......
}
我看这部分源码的时候,忽略了processMultibulkBuffer,但其实这部分代码很重要。里面的主要逻辑是解析RESP协议的内容,比如我们执行了一条set命令 set zhangsan lisi,此时redis客户端会将这条命令以RESP协议的形式发送到redis的服务端。到了服务端,就要靠processMultibulkBuffer进行解析。如果忽略了这部分代码,后面的逻辑就看不明白。
所以,我们先看processMultibulkBuffer方法.这个方法中,我们主要就看我保留的这部分逻辑,这部分就是在解析RESP协议,比如:set zhangsan lisi,最终会被解析成三个RedisObject结构体,存储到c->argv数组中,argv数组是一个RedisObject数组。后面set命令真正执行时,会从该数组中获取到数据执行。
int processMultibulkBuffer(client *c) {
......
if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
/* Not enough data (+2 == trailing \r\n) */
break;
} else {
//解析RESP协议,获取到RESP协议中的具体指令,赋值到client->argv变量上,后面lookupCommand函数会用解析到的指令名称查询真正的执行函数,比如:set的执行函数是setCommand
......
c->argv[c->argc++] =
createStringObject(c->querybuf+c->qb_pos,c->bulklen);
c->qb_pos += c->bulklen+2;
......
c->bulklen = -1;
c->multibulklen--;
}
}
......
}
processMultibulkBuffer方法执行完,我们就获取到了客户端想要执行的指令以及对应的键值对数据。接下来,我们返回主流程,继续看processInputBuffer的处理逻辑,解析完命令后,后面就开始处理命令。这里面有两段逻辑最重要。
第一段:根据前面解析RESP协议得到的c->argv数组获取到指令名称,比如:set或者get,通过命令的名称从lookupCommand函数中获取到执行命令的函数
int processCommand(client *c) {
//是否为quit命令
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
//quit命令直接退出
......
}
......
//在全局变量server的commands成员变量中查找相关的命令,存储命令的数据结构是一个hash表
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
......
/* Exec the command */
//如果客户端有CLIENT_MULTI标记,并且当前命令不是exec、discard、multi和watch命令,将命令入队保存,等待后续一起处理
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
//将命令入队
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
//调用call函数执行命令
call(c,CMD_CALL_FULL);
......
}
return C_OK;
}
这里岔开话题,说一下lookupCommand函数,这个函数的主要作用就是根据命令名称获取到执行命令的函数。里面的具体实现是从一个server.commands成员变量中获取数据.server.commands是一个hash结构,lookupCommand的出参是redisCommand结构体。
struct redisCommand *lookupCommand(sds name) {
return dictFetchValue(server.commands, name);
}
server.commands的初始化操作是在main函数的initServerConfig()函数中,先创建hash,然后填充hash
void initServerConfig(void) {
......
//创建hash结构
server.commands = dictCreate(&commandTableDictType,NULL);
......
//填充hash结构
populateCommandTable();
......
}
populateCommandTable函数就是将redisCommandTable中的数据放入server.commands哈希结构中
void populateCommandTable(void) {
int j;
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
......
//哈希结构的key是RedisCommand中的name,value是RedisCommand结构体
retval1 = dictAdd(server.commands, sdsnew(c->name), c);
......
}
}
填充hash结构,就是将redisCommandTable数组中的数据填充到hash结构中.
redisCommandTable数组是RedisCommand结构体的集合
struct redisCommand redisCommandTable[] = {
//第一个参数是命令的名称
//第二个参数是命令的实现函数
......
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0}
......
}
可以看到redisCommandTable中有很多我们熟悉的命令,比如:set、get命令。
现在server.commands哈希结构体中有了数据,假如我们执行的是set命令,所以通过lookupCommand获取命令执行函数,就是setCommand,接着调用call函数,执行该命令,所以我们进入setCommand函数继续看逻辑。
void setCommand(client *c) {
int j;
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = OBJ_SET_NO_FLAGS;
......
for (j = 3; j < c->argc; j++) {
char *a = c->argv[j]->ptr;
robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
if ((a[0] == 'n' || a[0] == 'N') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_XX))
{
flags |= OBJ_SET_NX;
}
}
......
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
首先判断set命令的具体指令,我们知道,除了set命令,还有setNX,setEX。之后就调用setGenericCommand函数执行具体的逻辑
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
......
//如果有NX选项,就查找key是否存在,如果key存在,直接返回null
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
//开始设置键值对
setKey(c->db,key,val);
......
//如果客户端设置了过期时间.这里需要处理过期的逻辑
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
//调用addReply函数响应客户端
addReply(c, ok_reply ? ok_reply : shared.ok);
}
setGenericCommand函数的逻辑中,比较重要的有两块逻辑:
一块是setKey,存储键值对
一块是addReply,响应客户端。
我们先看setKey的执行逻辑
setKey的入参,有三个,第一个是库名,默认是0库,第二个是set命令的key,第三个是set命令的value值
void setKey(redisDb *db, robj *key, robj *val) {
//查找key是否存在
if (lookupKeyWrite(db,key) == NULL) {
//不存在新增
dbAdd(db,key,val);
} else {
//存在覆盖
dbOverwrite(db,key,val);
}
......
}
进入lookupKeyWrite函数,主要是查找key是否存在,查找key的逻辑是在dictFind函数中,该函数入参有两个,第一个是全局hash函数,第二个是key值
robj *lookupKey(redisDb *db, robj *key, int flags) {
//从redis库的全局hash结构中查找
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
......
return val;
} else {
return NULL;
}
}
看到这里,其实我们就知道,redis的set命令也用到了hash结构,用来加速命令的执行。如果未找到该key,就直接返回null.然后就执行键值对的新增命令。
新增命令的具体逻辑,在dictAddRaw函数中。
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
//全局hash表是否在进行rehash
if (dictIsRehashing(d)) _dictRehashStep(d);
//key已经存在,直接返回
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
如果在进行rehash,使用1表,否则使用0表。这块是一个面试题,rehash的过程中,会用到两个hash表
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
//下面就开始真正的新增操作了
//先初始化一个Entry节点
entry = zmalloc(sizeof(*entry));
//这两行代码是在将新的entry添加到现有链表中。注意ht->table[index],这代表一个元素。
//如果index这个位置上有元素A,那新建entry的下一个元素就是A,同时index的位置上的节点变更为新建的entry
entry->next = ht->table[index];
ht->table[index] = entry;
//hash表的节点数增加1
ht->used++;
//将key值设置到Entry中,这个函数是一个宏,用gdb打断点的话,宏里面直接走的else逻辑,也就是(entry)->key = (_key_);一个简单的赋值操作
dictSetKey(d, entry, key);
return entry;
}
至此,我们就跟完了一条完整的set命令执行过程。
set命令之后,然后就是响应客户端的操作,逻辑在addReply方法中,主要就是执行prepareClientToWrite函数
void addReply(client *c, robj *obj) {
//执行prepareClientToWrite函数
if (prepareClientToWrite(c) != C_OK) return;
......
}
prepareClientToWrite函数封装了clientInstallWriteHandler函数,其中主要的逻辑就是将客户端插入到server.clients_pending_write列表中。很显然,这不是真正的响应客户端操作,因为我们知道读取客户端数据,用的是read函数,那响应客户端数据,盲猜也得是write之类的函数。所以,一定有另外的地方处理clients_pending_write列表.
处理的逻辑在server.c文件的beforeSleep函数中
void beforeSleep(struct aeEventLoop *eventLoop) {
......
//注册写事件处理逻辑
handleClientsWithPendingWrites();
......
}
handleClientsWithPendingWrites没有入参。可以看到该函数的主要逻辑就是从clients_pending_write链表中获取节点,然后进行处理,这里面会调用write函数
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
//从clients_pending_write列表中获取数据
int processed = listLength(server.clients_pending_write);
//获取待写回的客户端列表,这个li是一个迭代器的指针
listRewind(server.clients_pending_write,&li);
//遍历每一个待写回的客户端
while((ln = listNext(&li))) {
......
//获取到待响应的客户端
client *c = listNodeValue(ln);
//调用writeToClient将当前客户端的输出缓冲区数据写回,该函数封装了和read方法对应的write方法
if (writeToClient(c->fd,c,0) == C_ERR) continue;
......
}
}
beforeSleep函数的触发逻辑是在server.c的main方法中
首先通过aeSetBeforeSleepProc函数将beforeSleep注册到EventLoop结构体中。
int main(int argc, char **argv) {
......
//redis启动过程中会将server.c文件中的beforeSleep函数注册到事件循环框架结构体的beforesleep变量处
aeSetBeforeSleepProc(server.el,beforeSleep);
......
}
之后,随着事件循环不断的执行,beforeSleep函数也会不断的执行。
void aeMain(aeEventLoop *eventLoop) {
......
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
//执行beforeSleep函数
eventLoop->beforesleep(eventLoop);
///......不断的调用内核,获取就绪的事件进行处理
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
以上,就是一个完整的set命令的执行过程。
文章参考了极客时间的