目录
本文章主要内容是下面两部分:
什么时候使用了connection?在创建一个客户端时候,会同时创建一个connection来绑定该fd。
- void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
- while(max--) {
- cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
- .............
- acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
- }
- }
-
- //创建connection
- connection *connCreateAcceptedSocket(int fd) {
- connection *conn = connCreateSocket();
- conn->fd = fd;
- conn->state = CONN_STATE_ACCEPTING;
- return conn;
- }
该结构是一个完成的连接,客户端的fd封装成一个connection。
- struct connection {
- ConnectionType *type;
- ConnectionState state; //表示该客户端当前的连接状态
- short int flags;
- short int refs; //该连接被引用的数量
- int last_errno; //该连接的最终错误
- void *private_data; //在网络这部分,可以认为是结构体client
- //一些对应的回调函数
- ConnectionCallbackFunc conn_handler;
- ConnectionCallbackFunc write_handler;
- ConnectionCallbackFunc read_handler;
- int fd; //该客户端对应的fd
- };
-
- //回调函数的类型
- typedef void (*ConnectionCallbackFunc)(struct connection *conn);
-
- //connection的flags的值
- #define CONN_FLAG_CLOSE_SCHEDULED (1<<0) /* Closed scheduled by a handler */
- #define CONN_FLAG_WRITE_BARRIER (1<<1) /* Write barrier requested */
- //一般是先执行读事件,之后再执行写事件,但是想要置换顺序的话,其flags置为CONN_FLAG_WRITE_BARRIER,就可以换顺序了
-
- //表示客户端当前的连接状态
- typedef enum {
- CONN_STATE_NONE = 0,
- CONN_STATE_CONNECTING,
- CONN_STATE_ACCEPTING,
- CONN_STATE_CONNECTED,
- CONN_STATE_CLOSED,
- CONN_STATE_ERROR
- } ConnectionState;
connection有个ConnectionType属性,这里是一堆接口(函数的第一个参数都是connection),而struct connection是操作对象。
那么该结构与ConnectionType配合使用。不同ConnectionType的connection就会有不同的接口。
- typedef struct ConnectionType {
- void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
- int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
- int (*write)(struct connection *conn, const void *data, size_t data_len);
- int (*read)(struct connection *conn, void *buf, size_t buf_len);
- void (*close)(struct connection *conn);
- int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
- int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
- int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
- .........................
- int (*get_type)(struct connection *conn);
- } ConnectionType;
-
- //这里有个要点需要留意:所有的函数类型的参数都是struct connecton* 开头的,
- //但是只有ae_handler类型不是,其参数没有connection的,其是在参数clientData位置上
为什么要有这个类型ConnectionType呢?是因为Redis中默认有两种类型的connection。感觉像是面向对象的,继承,不同类型的connection会有不同的方法。
Redis从版本6开始支持SSL / TLS,这是一项可选功能,需要在编译时启用。所以才弄了两种类型。
- //要对这些接口有印象,后续就是使用这些接口的
-
- ConnectionType CT_Socket = {
- //这些都是函数,比如把函数connSocketEventHandler赋值给ae_hander
- .ae_handler = connSocketEventHandler,
- .close = connSocketClose,
- .write = connSocketWrite,
- .read = connSocketRead,
- .accept = connSocketAccept,
- .connect = connSocketConnect,
- .set_write_handler = connSocketSetWriteHandler,
- .set_read_handler = connSocketSetReadHandler,
- ............................................
- .get_type = connSocketGetType
- };
-
- //tls.c
- #ifdef USE_OPENSSL
- //需要定义了USE_OPENSSL,这个CT_TLS才会生效
- ConnectionType CT_TLS = {
- .ae_handler = tlsEventHandler,
- .accept = connTLSAccept,
- .connect = connTLSConnect,
- .blocking_connect = connTLSBlockingConnect,
- .read = connTLSRead,
- .write = connTLSWrite,
- .close = connTLSClose,
- .set_write_handler = connTLSSetWriteHandler,
- .set_read_handler = connTLSSetReadHandler
- .........................................................
- .get_type = connTLSGetType
- };
那编译时候不使用TLS的,那创建的connection的type就是CT_Socket类型。从源码可知,所以后面我们就关注CT_Socket的接口就行。
- connection *connCreateAcceptedSocket(int fd) {
- connection *conn = connCreateSocket();
- conn->fd = fd; //设置对应的fd
- conn->state = CONN_STATE_ACCEPTING; //设置状态
- return conn;
- }
-
- connection *connCreateSocket() {
- connection *conn = zcalloc(sizeof(connection));
- conn->type = &CT_Socket; //这个重点,是CT_Socket类型
- conn->fd = -1;
-
- return conn;
- }
-
- //tls.c
- static connection *createTLSConnection(int client_side) {
- SSL_CTX *ctx = redis_tls_ctx;
- if (client_side && redis_tls_client_ctx)
- ctx = redis_tls_client_ctx;
- tls_connection *conn = zcalloc(sizeof(tls_connection));
- conn->c.type = &CT_TLS; //这个就是CT_TLS类型的
- conn->c.fd = -1;
- conn->ssl = SSL_new(ctx);
- return (connection *) conn;
- }
什么时候使用到client?还是从创建一个客户端acceptTcpHandler开始。
- void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- ........................
- acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
- }
- static void acceptCommonHandler(connection *conn, int flags, char *ip) {
- ..............................
- client *c = createClient(conn); //创建client
- }
- client *createClient(connection *conn) {
- client *c = zmalloc(sizeof(client));
- if (conn) {
- .........................
- connSetReadHandler(conn, readQueryFromClient); //设置读事件回调函数
- connSetPrivateData(conn, c); //把client变量c赋值给conection->privateData
- }
- //初始化client的一些变量
- ................
- if (conn) linkClient(c); //把该客户端添加到服务器server.client链表中保存
- }
Redis使用结构体client存储客户端连接的所有信息。这里面就包括了客户端对应的connection。
- //redis5版本的是有fd,而redis6版本的用connection替代了fd
- typedef struct client {
- uint64_t id; /* Client incremental unique ID. */
- connection *conn; //客户对应的connection
- int resp; /* RESP protocol version. Can be 2 or 3. */
- redisDb *db; //select命令选择的数据库对象
-
- //从客户端读取的数据存储的位置,即输入缓冲区
- sds querybuf; /* Buffer we use to accumulate client queries. */
- size_t qb_pos; /* The position we have read in querybuf. */
- // 命令和命令参数
- int argc; /* Num of arguments of current command. */
- robj **argv; /* Arguments of current command. */
-
- struct redisCommand *cmd; //待执行的命令
-
- int reqtype; /* Request protocol type: PROTO_REQ_* */
-
- int multibulklen; /* Number of multi bulk arguments left to read. */
- long bulklen; /* Length of bulk argument in multi bulk request. */
- //回复客户端数据的链表
- list *reply; /* List of reply objects to send to the client. */
- unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
- size_t sentlen; /* Amount of bytes already sent in the current
- buffer or object being sent. */
-
- uint64_t flags; /* 客户端标识,Client flags: CLIENT_* macros. */
-
- ...............
- /* Response buffer */ //回复客户端数据的地方,即输出缓冲区,若是不够空间,就存放在reply中
- int bufpos;
- char buf[PROTO_REPLY_CHUNK_BYTES];
- } client;
- void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
- ..........
- while(max--) {
- cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
- ........................
- acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
- }
- }
-
- static void acceptCommonHandler(connection *conn, int flags, char *ip) {
- ..............................
- /* Create connection and client */
- client *c = createClient(conn);
- }
- client *createClient(connection *conn) {
- client *c = zmalloc(sizeof(client));
- if (conn) {
- .........................
- connSetReadHandler(conn, readQueryFromClient); //设置读事件回调函数
- connSetPrivateData(conn, c); //把client变量c赋值给conection->privateData
- }
- }
-
- static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
- return conn->type->set_read_handler(conn, func);
- }
调用函数connSetReadHandler设置读事件回调函数。看到该函数的实现,可能会比较疑惑。所以这时就需要关联上面讲的ConnectionType属性,其是CT_Socket。所以我们查看到CT_Socket的set_read_handler是函数 connSocketSetReadHandler。
那么connSetReadHandler的实现就变成如下。那么其最终也是调用aeCreateFileEvent来创建一个FileEvent,并且绑定func给对应的fileProc。
- static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
- //return conn->type->set_read_handler(conn, func);
- //就是调用connSocketSetReadHandler
- return connSocketSetReadHandler(conn, func);
- }
-
- static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
- if (func == conn->read_handler) return C_OK;
-
- conn->read_handler = func;
- if (!conn->read_handler)
- aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
- else
- if (aeCreateFileEvent(server.el,conn->fd,
- AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
- return C_OK;
- }
-
- int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
- aeFileProc *proc, void *clientData)
- {
- aeFileEvent *fe = &eventLoop->events[fd];
- ....................
- fe->mask |= mask;
- if (mask & AE_READABLE) fe->rfileProc = proc;
- if (mask & AE_WRITABLE) fe->wfileProc = proc;
- fe->clientData = clientData;
- }
那么又有疑惑了,不是说绑定func的吗?怎么函数aeCreateFileEvent中的参数是conn->type->ae_handler?我们先保留这个疑问,看完写事件回调函数的设置。
- void beforeSleep(struct aeEventLoop *eventLoop) {
- .................
- /* Handle writes with pending output buffers. */
- handleClientsWithPendingWritesUsingThreads();
- }
- int handleClientsWithPendingWritesUsingThreads(void) {
- .....................
- listRewind(server.clients_pending_write,&li);
- while((ln = listNext(&li))) {
- client *c = listNodeValue(ln);
-
- //设置写事件回调函数
- if (clientHasPendingReplies(c) &&
- connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
- ..................
- }
- }
-
- static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
- return conn->type->set_write_handler(conn, func, 0);
- }
conn->type->set_write_handler绑定的是connSocketSetWriteHandler。那么connSetWriteHandler的实现即是:
- static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
- //return conn->type->set_write_handler(conn, func, 0);
- return connSocketSetWriteHandler(conn, func, 0);
- }
-
- static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
- if (func == conn->write_handler) return C_OK;
-
- conn->write_handler = func;
- if (barrier)
- conn->flags |= CONN_FLAG_WRITE_BARRIER;
- else
- conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
- if (!conn->write_handler)
- aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
- else
- if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,
- conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
- return C_OK;
- }
设置写事件回调函数的也是使用conn->type->ae_handler。
说明读写事件的回调函数的设置都统一是使用conn->type->ae_handler。ae_handler对应的是connSocketEventHandler。
- static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
- {
- connection *conn = clientData;
- //创建connection时候,设置了state=CONN_STATE_ACCEPTING,所以这个判断不成立
- if (conn->state == CONN_STATE_CONNECTING && (mask & AE_WRITABLE) && conn->conn_handler) {
- int conn_error = connGetSocketError(conn);
- if (conn_error) {
- conn->last_errno = conn_error;
- conn->state = CONN_STATE_ERROR;
- } else {
- conn->state = CONN_STATE_CONNECTED;
- }
- if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
- if (!callHandler(conn, conn->conn_handler)) return;
- conn->conn_handler = NULL;
- }
-
- //位全为1结果才是1,初始时候flags是0,所以&CONN_FLAG_WRITE_BARRIER后也是0
- //只有后续设置flags=CONN_FLAG_WRITE_BARRIER后,再&结果才是1
- int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
-
- int call_write = (mask & AE_WRITABLE) && conn->write_handler;
- int call_read = (mask & AE_READABLE) && conn->read_handler;
-
- //执行对应的回调函数
- /* Handle normal I/O flows */
- if (!invert && call_read) {
- if (!callHandler(conn, conn->read_handler)) return;
- }
- /* Fire the writable event. */
- if (call_write) {
- if (!callHandler(conn, conn->write_handler)) return;
- }
- /* If we have to invert the call, fire the readable event now
- * after the writable one. */
- if (invert && call_read) {
- if (!callHandler(conn, conn->read_handler)) return;
- }
- }
-
- static inline int callHandler(connection *conn, ConnectionCallbackFunc handler) {
- connIncrRefs(conn); //增加refs值以保护连接
- if (handler) handler(conn); //这里就是执行回调函数,即是执行readQueryFromClient等
- connDecrRefs(conn); //回调函数执行后,refs--
- if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) {
- if (!connHasRefs(conn)) connClose(conn); //如果refs==0,执行延迟关闭
- return 0;
- }
- return 1;
- }
-
- static inline void connIncrRefs(connection *conn) {
- conn->refs++;
- }
到这里终于知道设置conn->type->ae_handler作为回调函数的原因了。
前面的设置读写回调时候,把readQueryFromClient绑定给conn->read_handler,把sendReplyToClient绑定给conn->write_handler。
所以,connSocketEventHandler函数中既有读事件回调函数,也有写事件回调函数。所以,我们可以这样认为,connSocketEventHandler是connection的处理中心。在主框架的epoll中并不会直接调用客户端的读写回调函数,而是统一调用connSocketEventHandler,这样一来相当于是框架与connection解耦了。
我回头查看了Redis5.0.10版本的,发现,其是直接设置readQueryFromClient作为回调函数的,这个版本也是没有结构体connection,其是直接使用client的。
- //Reids5.0.10版本
- void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
- while(max--) {
- cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
- ...........
- acceptCommonHandler(cfd,0,cip);
- }
- }
-
- static void acceptCommonHandler(int fd, int flags, char *ip) {
- client *c = createClient(fd)
- .................
- }
-
- client *createClient(int fd) {
- client *c = zmalloc(sizeof(client));
- if (fd != -1) {
- aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c)
- ......................
- }
- }
Redis6后,开始支持SSL / TLS,添加了conneciton,这个connection就是有两种类型。所以才这样弄吧。
当事件就绪时,那都是执行connSocketEventHandler,而其都有读/写事件的回调函数。
我认为,那结构体aeFileEvent可以只拥有一个aeFileProc即可。
- typedef struct aeFileEvent {
- int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
- aeFileProc *rfileProc;
- aeFileProc *wfileProc;
- void *clientData;
- } aeFileEvent;
-
- //可以改写成如下
- typedef struct aeFileEvent {
- int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
- aeFileProc *fileProc; //只用一个回调函数就行
- void *clientData;
- } aeFileEvent;
而在函数aeProcessEvents中不再需要判别是读事件还是写事件了。可以改写成如下:
- //展示主体,主要是修改了for循环内部,不管是哪种类型,统一是使用fe->fileProc(....)
- int aeProcessEvents(aeEventLoop *eventLoop, int flags)
- {
- int processed = 0, numevents;
- ...............................
- if (eventLoop->maxfd != -1 ||
- ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
- struct timeval tv, *tvp;
-
- if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
- eventLoop->beforesleep(eventLoop);
-
- /* Call the multiplexing API, will return only on timeout or when
- * some event fires. */
- numevents = aeApiPoll(eventLoop, tvp);
- for (int j = 0; j < numevents; j++) {
- aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
- int mask = eventLoop->fired[j].mask;
- int fd = eventLoop->fired[j].fd;
-
- if (fe->mask) //表示该fd是有关注的事件类型的,就可以执行对应的读或写
- //该函数就是调用connSocketEventHandler
- fe->fileProc(eventLoop,fd,fe->clientData,mask);
-
- }
- }
- }
可以这样写的原因,是因为connSocketEventHandler中有写回调和读回调,只要传事件类型进去就知道是使用读回调还是写回调。这种写法就更加统一了。
那为什么Redis作者不这样做呢?是为了兼容之前版本的,或者是我漏了什么细节是不能这样操作的呢?若有见解,欢迎在评论区讨论指出。
创建连接时候,需要设置客户端的读回调。
createClient--->connSetReadHandler--->(conn->type->set_read_handler)--->(connSocketReadHandler,其内部把读回调函数readQueryFromClient赋值给read_handler)--->(aeCreateFileEvent,把conn->type->ae_handler赋值给rfileProc)。

单线程的情况:
aeProcessEvents--->beforesleep--->handleClientsWithPendingWritesUsingThreads--->handleClientsWithPendingWrites--->connSetWriteHandlerWithBarrier--->(connSocketSetWriteHandler,把sendReplyToClient赋值给write_handler)--->(aeCreateFileEvent,把conn->type->ae_handler赋值给wfileProc)。

当事件就绪(假设是读事件),就会执行fe->rfileProc函数,那该函数就是执行connSocketEventHandler,接着其内部会调用callHandler函数,callHandler就会调用conn->read_handler。
其实不管是读事件还是写事件,都是执行connSocketEventHandler。
即是aeProcessEvents--->rfileProc--->connSocketEventHandler--->callHandler--->(conn->read_handler,即是readQueryFromClient)。

aeProcessEvents--->rfileProc--->acceptTcpHandler。

对比:
服务器端的是直接调用回调函数acceptTcpHandler。
而客户端的是调用connSocketEventHandler,再在connSocketEventHandler内部判断若是读事件,才执行回调函数readQueryFromClient。