在 Redis6.0 版本之前,采用的是单线程模型,即:命令的读取、解析、执行及回复都是在一个线程中执行。但Redis仍可以提供极为优秀的并发能力,核心在于优秀的代码设计:IO多路复用 + 内存操作 + 优秀的数据结构设计。
从Redis 6.0 版本开始,引入了多线程模型,主要用来分担主线的压力,具体负责io时间的读写和解析,注意:命令的执行仍然在主线程中处理。
以下分析默认你已经具备了网络编程、IO模型和Reactor模型的相关知识,如果这些知识不熟悉的朋友,可以先参考我之前的文章:Posix API与网络协议栈实现原理、Linux五种IO模型 和 Reactor实现原理及代码示例,有了这些预备知识,再来看这篇文章会容易点。
今天我们以一条简单的set命令,到redis返回OK,从源码的角度看看整个执行流程是什么样的~
127.0.0.1:6379> set msg "hello world"
OK
127.0.0.1:6379>
以下内容是基于Redis 6.2.6 版本整理总结。
服务端Reactor模型的初始化主要在src/server.c/initServer() 函数中执行。
// src/server.c/initServer()
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
if (server.el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",
strerror(errno));
exit(1);
}
// src/server.c/initServer()
if (server.port != 0 &&
listenToPort(server.port,&server.ipfd) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
exit(1);
}
createSocketAcceptHandler() 函数通过调用 aeCreateFileEvent,将该服务段的套接字加入到epoll 集合,并为其注册可读事件的回调函数accept_handler,如果内核检测到有客户端连接上来,会执行accept_handler函数。
// src/server.c/initServer()
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
int j;
// sfd->count 监听的地址个数
for (j = 0; j < sfd->count; j++) {
// 注册listen fd 读事件 accept_handler 回调
if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE,
accept_handler,NULL) == AE_ERR) {
/* Rollback */
for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
return C_ERR;
}
}
return C_OK;
}
对于内核提供的多路复用结构,redis 通过aeCreateFileEvent() 函数对其了一层封装。redis支持不同的操作系统下的多路复用,这里我们以linux下的IO多路复用epoll进行举例,底层调用的是 epoll_ctl()。
底层调用 accept 系统调用,处理新的客户端连接。每次当连接事件触发时,acceptTcpHandler 内部每次最多循环MAX_ACCEPTS_PER_CALL 次accep系统调用从内核中的已连接队列中获取新的连接。MAX_ACCEPTS_PER_CALL 机制,通过一次性调用接收多个新连接,提升IO 多路复用能力;又可以避免一次行accept过多的连接,来不及处理,造成文件描述的浪费。
// src/networking.c
#define MAX_ACCEPTS_PER_CALL 1000
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
// 每次最多处理1000条连接
while(max--) {
// 调用 accept 返回新的客户端套接字
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
// 主要用于frok出来的子进程自动关闭这些从父进程继承来的fd,防止fd泄漏
anetCloexec(cfd);
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
acceptCommonHandler() 函数实现
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
...
// 创建 client 对象
if ((c = createClient(conn)) == NULL) {
...
}
...
}
createClient() 函数
client *createClient(connection *conn) {
...
if (conn) {
connNonBlock(conn); // 设置非阻塞
connEnableTcpNoDelay(conn); // setsockopt(TCP_NODELAY)
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
// 注册该客户单的读事件回调函数
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c); // 将c保存到conn的私有空间
}
...
return c;
}
readQueryFromClient 方法非常重要,是redis读取数据的入口。后面再详聊。到这里客户端连接的处理及读事件的回调注册,就很明了了。
// src/connection.h
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_read_handler(conn, func);
}
// src/connection.c
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
// 指定当前客户单的read_handler为func
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else // 将客户端fd加入到epoll集合,并注册读事件
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
以上就是Redis服务端读事件注册和监听的全过程,写事件类似,这里就不赘述了,有兴趣的小伙伴可以自己尝试跟一跟。
我们首先要从客户端读取数据并解析,根据命令执行对应的命令函数,将执行结果返回给客户端。
从Redis6.0版本开始,加入了io多线程的优化,这篇文章我们先不展开,后面会专门出一篇文章聊聊io多线程优化。
// src/networking.c
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
...
// 开启io多线程优化,先不展开
if (postponeClientRead(c)) return;
...
// 将客户端数据读到 querybuf 缓冲区
nread = connRead(c->conn, c->querybuf+qblen, readlen);
...
// 处理读缓冲区
processInputBuffer(c);
}
processInputBuffer 函数会根据redis协议解析客户端请求(客户端的请求数据保存在c->querybuf中),将相关参数保存在c->argv数组和c->argc中。如果命令就绪,就会调用 processCommand 处理命令。
// src/networking.c
void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {
...
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
...
// 数据处理流程:read decode compute encode write
// 这里进行真正的 compute 命令处理
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return;
}
}
}
...
}
int processCommandAndResetClient(client *c) {
...
if (processCommand(c) == C_OK) {
commandProcessed(c);
}
...
}
int processCommand(client *c) {
// 获取命令行的第一个参数,比如: set msg "hello world" 命令中的 set
// 通过 set 查找redisCommand字典,找到对应的cmd entry
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
/* Exec the command */
// 执行命令
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
c->cmd->proc != resetCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL); // 在这里执行具体命令
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}
void call(client *c, int flags) { // 执行命令的核心函数
..
// 这里调用具体的命令处理函数, 比如: setCommand函数
c->cmd->proc(c);
..
}
Redis中的命令都是独立封装的,每个命令及对应的处理函数注册保存在 src/server.c/ redisCommandTable 中。
// src/server.c
struct redisCommand redisCommandTable[] = {
...
{"set",setCommand,-3,
"write use-memory @string",
0,NULL,1,1,1,0,0,0},
...
}
setCommand 函数实现。 在 call 方法调用 c->cmd->proc©时,redis通过解析,发现是set命令,就会调用 setCommand 函数,
// src/t_string.c
/* SET key value [NX] [XX] [KEEPTTL] [GET] [EX ] [PX ]
* [EXAT ][PXAT ] */
void setCommand(client *c) {
...
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
...
{
if (milliseconds <= 0 || (unit == UNIT_SECONDS && milliseconds > LLONG_MAX / 1000)) {
addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name);
}
...
if (when <= 0) {
addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name);
}
}
...
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
return;
}
...
if (!(flags & OBJ_SET_GET)) {
addReply(c, ok_reply ? ok_reply : shared.ok);
}
...
}
每个命令在处理完后,都会有addReplyXxx 类似的方法调用,这就是Redis在向客户端返回执行结果。但是,并不是直接发送给客户端,而是将响应结构保存在客户端的response buf中。在看addReply的实现之前,我们先来看看 client的定义,以便我们更好的理解。client结构定义如下:
// src/server.c
#define PROTO_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */
typedef struct client {
...
connection *conn;
...
sds querybuf; // 从客户端读取的数据保存在querybuf中
...
int argc; // 命令参数的个数 3
robj **argv; // 命令的具体参数 set "msg" "hello world"
...
list *reply; /* List of reply objects to send to the client. */
...
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
// src/networking.c
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
// 将响应结果保存到客户端的输出缓冲区
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); // 将响应结果保存到客户端的输出链表
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyProtoToList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}
说明:_addReplyToBuffer 是将回复数据保存到client对象的输出缓冲区,_addReplyProtoToList 是保存到输出链表,前者的优先级高。到这里要回复给客户端的数据已经就绪,什么时候发给客户端呢?
在主事件循环 aeMain 中,在每次获取就绪的事件之前,都会先执行 beforesleep 函数,这个函数只要执行一些耗时少的操作,比如过期键的删除、给客户端返回数据等。也就是在这个时候,会将client输出缓冲区中的数据发给客户端。
beforesleep 函数注册:
// src/server.c/initServer
void initServer(void) {
...
// 注册beforeSleep 和 afterSleep 的回调函数,注意这两个函数在主时间循环中的执行位置
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
...
}
发送数据给客户端逻辑,多线程读写的优化,会专门出一篇文章详解,这里也不展开,默认不开启多线程优化:
// stc/networking.c
int handleClientsWithPendingWritesUsingThreads(void) {
...
// 如果没有开启io多线程写,或者待处理的客户端数量较少时,直接由主线程执行写操作
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
...
}
int handleClientsWithPendingWrites(void) {
...
while((ln = listNext(&li))) {
...
// 给客户端发送数据
if (writeToClient(c,0) == C_ERR) continue;
...
}
...
}
事件处理的入口函数是:aeMain(server.el),死循环对就绪的io事件进行处理,直至服务退出。
// Loop事件循环
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
核心逻辑在 aeProcessEvents() 函数中实现:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
...
// 事件循环中处理 beforesleep
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
// numevents 当前就绪的事件个数
numevents = aeApiPoll(eventLoop, tvp); // 底层就是epoll_wait
...
// 处理就绪事件
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
...
// 执行读事件的回调,如果是listenfd就是Accept回调,如果是clientfd就是read回调
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
// 执行写事件回调
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
...
}
}
...
}
以上逻辑是epoll 的固定写法,找到就绪事件,然后再for循环中依次进行处理。如果读事件就绪,直接调用rfileProc方法;如果写事件就绪,调用wfileProc。这得益于redis对“文件事件”的封装,我们来看aeFileEvent 结构定义:
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
rfileProc 和 wfileProc 都是函数指针,我们在创建一个 aeFileEvent 事件的时候,就可以绑定具体的实现。
Redis是单线程,所以服务端ipfd和客户端clifd都是在一个主事件循环中处理。Redis的处理逻辑是:当新的连接事件触发时,acceptTcpHandler 一次调用就可以从内核的Tcp就绪队列中取1000条连接,并将这些连接对应的文件描述符注册到epoll中进行监听。并不是取一条连接就结束,充分利用了IO多路复用的特性。
都是Redis是单线程的,实际上说的是Redis就算在有多个客户端连接的时候,命令处理也是在一个线程中,也就是事件循环的主线程。实际上还有其他的线程,用来执行特定操作,如:
(1)io密集型 或 CPU密集型:
(2)单线程局限性:
因为是单线程,所以同一时刻只能有一个操作在进行。所以,耗时的命令会导致并发的下降,不只是读并发,写并发也会下降。而且单一线程也只能用到一个cpu核心,所以可以在同一个多核的服务器中,可以启动多个实例,组成master-master或者master-slave的形式,耗时的读命令可以完全在slave进行。
(2)为什么Redis不采用多线程
综上,redis的瓶颈存在于网络io层面,因此,如果想要通过多线程来提升系统的并发处理能力,也应该在网络io层面考虑。因此,Redis从6.0版本开始,支持了io多线来处理网络io。关于io多线程优化解析可以关注我后面的文章哦~