• Redis网络部分相关的结构体2 和 绑定回调函数细节


    目录

    1. struct connection

    ConnectionType属性

    创建connection

    2. struct client

    3. 绑定客户端回调函数的流程

    3.1. 读事件回调函数的设置

    3.2. 写事件回调函数的设置

    3.3. connSocketEventHandler函数

    3.4. Redis5版本的设置回调函数

    3.5. 个人的一些想法,修改源码

    4. 总结设置客户端回调函数的流程

    读事件回调函数的设置

    写事件回调函数的设置流程

    5. 回调函数的调用流程

    客户端

    服务器端 


    本文章主要内容是下面两部分:

    • 两个结构体:connectionclient
    • 设置回调函数的注意点和函数connSocketEventHandler

    1. struct connection

    什么时候使用了connection?在创建一个客户端时候,会同时创建一个connection来绑定该fd。

    1. void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    2. int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    3. while(max--) {
    4. cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    5. .............
    6. acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    7. }
    8. }
    9. //创建connection
    10. connection *connCreateAcceptedSocket(int fd) {
    11. connection *conn = connCreateSocket();
    12. conn->fd = fd;
    13. conn->state = CONN_STATE_ACCEPTING;
    14. return conn;
    15. }

    该结构是一个完成的连接,客户端的fd封装成一个connection。

    1. struct connection {
    2. ConnectionType *type;
    3. ConnectionState state; //表示该客户端当前的连接状态
    4. short int flags;
    5. short int refs; //该连接被引用的数量
    6. int last_errno; //该连接的最终错误
    7. void *private_data; //在网络这部分,可以认为是结构体client
    8. //一些对应的回调函数
    9. ConnectionCallbackFunc conn_handler;
    10. ConnectionCallbackFunc write_handler;
    11. ConnectionCallbackFunc read_handler;
    12. int fd; //该客户端对应的fd
    13. };
    14. //回调函数的类型
    15. typedef void (*ConnectionCallbackFunc)(struct connection *conn);
    16. //connection的flags的值
    17. #define CONN_FLAG_CLOSE_SCHEDULED (1<<0) /* Closed scheduled by a handler */
    18. #define CONN_FLAG_WRITE_BARRIER (1<<1) /* Write barrier requested */
    19. //一般是先执行读事件,之后再执行写事件,但是想要置换顺序的话,其flags置为CONN_FLAG_WRITE_BARRIER,就可以换顺序了
    20. //表示客户端当前的连接状态
    21. typedef enum {
    22. CONN_STATE_NONE = 0,
    23. CONN_STATE_CONNECTING,
    24. CONN_STATE_ACCEPTING,
    25. CONN_STATE_CONNECTED,
    26. CONN_STATE_CLOSED,
    27. CONN_STATE_ERROR
    28. } ConnectionState;

    ConnectionType属性

    connection有个ConnectionType属性,这里是一堆接口(函数的第一个参数都是connection),而struct connection是操作对象。

    那么该结构与ConnectionType配合使用。不同ConnectionType的connection就会有不同的接口。

    1. typedef struct ConnectionType {
    2. void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
    3. int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
    4. int (*write)(struct connection *conn, const void *data, size_t data_len);
    5. int (*read)(struct connection *conn, void *buf, size_t buf_len);
    6. void (*close)(struct connection *conn);
    7. int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
    8. int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
    9. int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
    10. .........................
    11. int (*get_type)(struct connection *conn);
    12. } ConnectionType;
    13. //这里有个要点需要留意:所有的函数类型的参数都是struct connecton* 开头的,
    14. //但是只有ae_handler类型不是,其参数没有connection的,其是在参数clientData位置上

    为什么要有这个类型ConnectionType呢?是因为Redis中默认有两种类型的connection。感觉像是面向对象的,继承,不同类型的connection会有不同的方法。

    Redis从版本6开始支持SSL / TLS,这是一项可选功能,需要在编译时启用。所以才弄了两种类型。

    1. //要对这些接口有印象,后续就是使用这些接口的
    2. ConnectionType CT_Socket = {
    3. //这些都是函数,比如把函数connSocketEventHandler赋值给ae_hander
    4. .ae_handler = connSocketEventHandler,
    5. .close = connSocketClose,
    6. .write = connSocketWrite,
    7. .read = connSocketRead,
    8. .accept = connSocketAccept,
    9. .connect = connSocketConnect,
    10. .set_write_handler = connSocketSetWriteHandler,
    11. .set_read_handler = connSocketSetReadHandler,
    12. ............................................
    13. .get_type = connSocketGetType
    14. };
    15. //tls.c
    16. #ifdef USE_OPENSSL
    17. //需要定义了USE_OPENSSL,这个CT_TLS才会生效
    18. ConnectionType CT_TLS = {
    19. .ae_handler = tlsEventHandler,
    20. .accept = connTLSAccept,
    21. .connect = connTLSConnect,
    22. .blocking_connect = connTLSBlockingConnect,
    23. .read = connTLSRead,
    24. .write = connTLSWrite,
    25. .close = connTLSClose,
    26. .set_write_handler = connTLSSetWriteHandler,
    27. .set_read_handler = connTLSSetReadHandler
    28. .........................................................
    29. .get_type = connTLSGetType
    30. };

    创建connection

    那编译时候不使用TLS的,那创建的connection的type就是CT_Socket类型。从源码可知,所以后面我们就关注CT_Socket的接口就行。

    1. connection *connCreateAcceptedSocket(int fd) {
    2. connection *conn = connCreateSocket();
    3. conn->fd = fd; //设置对应的fd
    4. conn->state = CONN_STATE_ACCEPTING; //设置状态
    5. return conn;
    6. }
    7. connection *connCreateSocket() {
    8. connection *conn = zcalloc(sizeof(connection));
    9. conn->type = &CT_Socket; //这个重点,是CT_Socket类型
    10. conn->fd = -1;
    11. return conn;
    12. }
    13. //tls.c
    14. static connection *createTLSConnection(int client_side) {
    15. SSL_CTX *ctx = redis_tls_ctx;
    16. if (client_side && redis_tls_client_ctx)
    17. ctx = redis_tls_client_ctx;
    18. tls_connection *conn = zcalloc(sizeof(tls_connection));
    19. conn->c.type = &CT_TLS; //这个就是CT_TLS类型的
    20. conn->c.fd = -1;
    21. conn->ssl = SSL_new(ctx);
    22. return (connection *) conn;
    23. }

    2. struct client

    什么时候使用到client?还是从创建一个客户端acceptTcpHandler开始。

    1. void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    2. ........................
    3. acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    4. }
    5. static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    6. ..............................
    7. client *c = createClient(conn); //创建client
    8. }
    9. client *createClient(connection *conn) {
    10. client *c = zmalloc(sizeof(client));
    11. if (conn) {
    12. .........................
    13. connSetReadHandler(conn, readQueryFromClient); //设置读事件回调函数
    14. connSetPrivateData(conn, c); //把client变量c赋值给conection->privateData
    15. }
    16. //初始化client的一些变量
    17. ................
    18. if (conn) linkClient(c); //把该客户端添加到服务器server.client链表中保存
    19. }

    Redis使用结构体client存储客户端连接的所有信息。这里面就包括了客户端对应的connection。

    1. //redis5版本的是有fd,而redis6版本的用connection替代了fd
    2. typedef struct client {
    3. uint64_t id; /* Client incremental unique ID. */
    4. connection *conn; //客户对应的connection
    5. int resp; /* RESP protocol version. Can be 2 or 3. */
    6. redisDb *db; //select命令选择的数据库对象
    7. //从客户端读取的数据存储的位置,即输入缓冲区
    8. sds querybuf; /* Buffer we use to accumulate client queries. */
    9. size_t qb_pos; /* The position we have read in querybuf. */
    10. // 命令和命令参数
    11. int argc; /* Num of arguments of current command. */
    12. robj **argv; /* Arguments of current command. */
    13. struct redisCommand *cmd; //待执行的命令
    14. int reqtype; /* Request protocol type: PROTO_REQ_* */
    15. int multibulklen; /* Number of multi bulk arguments left to read. */
    16. long bulklen; /* Length of bulk argument in multi bulk request. */
    17. //回复客户端数据的链表
    18. list *reply; /* List of reply objects to send to the client. */
    19. unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
    20. size_t sentlen; /* Amount of bytes already sent in the current
    21. buffer or object being sent. */
    22. uint64_t flags; /* 客户端标识,Client flags: CLIENT_* macros. */
    23. ...............
    24. /* Response buffer */ //回复客户端数据的地方,即输出缓冲区,若是不够空间,就存放在reply中
    25. int bufpos;
    26. char buf[PROTO_REPLY_CHUNK_BYTES];
    27. } client;

    3. 绑定客户端回调函数的流程

    3.1. 读事件回调函数的设置

    1. void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    2. int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    3. ..........
    4. while(max--) {
    5. cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    6. ........................
    7. acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    8. }
    9. }
    10. static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    11. ..............................
    12. /* Create connection and client */
    13. client *c = createClient(conn);
    14. }
    15. client *createClient(connection *conn) {
    16. client *c = zmalloc(sizeof(client));
    17. if (conn) {
    18. .........................
    19. connSetReadHandler(conn, readQueryFromClient); //设置读事件回调函数
    20. connSetPrivateData(conn, c); //把client变量c赋值给conection->privateData
    21. }
    22. }
    23. static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    24. return conn->type->set_read_handler(conn, func);
    25. }

    调用函数connSetReadHandler设置读事件回调函数。看到该函数的实现,可能会比较疑惑。所以这时就需要关联上面讲的ConnectionType属性,其是CT_Socket。所以我们查看到CT_Socket的set_read_handler是函数 connSocketSetReadHandler。

    那么connSetReadHandler的实现就变成如下。那么其最终也是调用aeCreateFileEvent来创建一个FileEvent,并且绑定func给对应的fileProc

    1. static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    2. //return conn->type->set_read_handler(conn, func);
    3. //就是调用connSocketSetReadHandler
    4. return connSocketSetReadHandler(conn, func);
    5. }
    6. static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    7. if (func == conn->read_handler) return C_OK;
    8. conn->read_handler = func;
    9. if (!conn->read_handler)
    10. aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    11. else
    12. if (aeCreateFileEvent(server.el,conn->fd,
    13. AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    14. return C_OK;
    15. }
    16. int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
    17. aeFileProc *proc, void *clientData)
    18. {
    19. aeFileEvent *fe = &eventLoop->events[fd];
    20. ....................
    21. fe->mask |= mask;
    22. if (mask & AE_READABLE) fe->rfileProc = proc;
    23. if (mask & AE_WRITABLE) fe->wfileProc = proc;
    24. fe->clientData = clientData;
    25. }

    那么又有疑惑了,不是说绑定func的吗?怎么函数aeCreateFileEvent中的参数是conn->type->ae_handler?我们先保留这个疑问,看完写事件回调函数的设置。

    3.2. 写事件回调函数的设置

    1. void beforeSleep(struct aeEventLoop *eventLoop) {
    2. .................
    3. /* Handle writes with pending output buffers. */
    4. handleClientsWithPendingWritesUsingThreads();
    5. }
    6. int handleClientsWithPendingWritesUsingThreads(void) {
    7. .....................
    8. listRewind(server.clients_pending_write,&li);
    9. while((ln = listNext(&li))) {
    10. client *c = listNodeValue(ln);
    11. //设置写事件回调函数
    12. if (clientHasPendingReplies(c) &&
    13. connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
    14. ..................
    15. }
    16. }
    17. static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
    18. return conn->type->set_write_handler(conn, func, 0);
    19. }

    conn->type->set_write_handler绑定的是connSocketSetWriteHandler。那么connSetWriteHandler的实现即是:

    1. static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
    2. //return conn->type->set_write_handler(conn, func, 0);
    3. return connSocketSetWriteHandler(conn, func, 0);
    4. }
    5. static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
    6. if (func == conn->write_handler) return C_OK;
    7. conn->write_handler = func;
    8. if (barrier)
    9. conn->flags |= CONN_FLAG_WRITE_BARRIER;
    10. else
    11. conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
    12. if (!conn->write_handler)
    13. aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
    14. else
    15. if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,
    16. conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    17. return C_OK;
    18. }

    设置写事件回调函数的也是使用conn->type->ae_handler。

    说明读写事件的回调函数的设置都统一是使用conn->type->ae_handler。ae_handler对应的是connSocketEventHandler

    3.3. connSocketEventHandler函数

    1. static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
    2. {
    3. connection *conn = clientData;
    4. //创建connection时候,设置了state=CONN_STATE_ACCEPTING,所以这个判断不成立
    5. if (conn->state == CONN_STATE_CONNECTING && (mask & AE_WRITABLE) && conn->conn_handler) {
    6. int conn_error = connGetSocketError(conn);
    7. if (conn_error) {
    8. conn->last_errno = conn_error;
    9. conn->state = CONN_STATE_ERROR;
    10. } else {
    11. conn->state = CONN_STATE_CONNECTED;
    12. }
    13. if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
    14. if (!callHandler(conn, conn->conn_handler)) return;
    15. conn->conn_handler = NULL;
    16. }
    17. //位全为1结果才是1,初始时候flags是0,所以&CONN_FLAG_WRITE_BARRIER后也是0
    18. //只有后续设置flags=CONN_FLAG_WRITE_BARRIER后,再&结果才是1
    19. int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
    20. int call_write = (mask & AE_WRITABLE) && conn->write_handler;
    21. int call_read = (mask & AE_READABLE) && conn->read_handler;
    22. //执行对应的回调函数
    23. /* Handle normal I/O flows */
    24. if (!invert && call_read) {
    25. if (!callHandler(conn, conn->read_handler)) return;
    26. }
    27. /* Fire the writable event. */
    28. if (call_write) {
    29. if (!callHandler(conn, conn->write_handler)) return;
    30. }
    31. /* If we have to invert the call, fire the readable event now
    32. * after the writable one. */
    33. if (invert && call_read) {
    34. if (!callHandler(conn, conn->read_handler)) return;
    35. }
    36. }
    37. static inline int callHandler(connection *conn, ConnectionCallbackFunc handler) {
    38. connIncrRefs(conn); //增加refs值以保护连接
    39. if (handler) handler(conn); //这里就是执行回调函数,即是执行readQueryFromClient等
    40. connDecrRefs(conn); //回调函数执行后,refs--
    41. if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) {
    42. if (!connHasRefs(conn)) connClose(conn); //如果refs==0,执行延迟关闭
    43. return 0;
    44. }
    45. return 1;
    46. }
    47. static inline void connIncrRefs(connection *conn) {
    48. conn->refs++;
    49. }

    到这里终于知道设置conn->type->ae_handler作为回调函数的原因了。

    前面的设置读写回调时候,把readQueryFromClient绑定给conn->read_handler把sendReplyToClient绑定给conn->write_handler

    所以,connSocketEventHandler函数中既有读事件回调函数,也有写事件回调函数。所以,我们可以这样认为,connSocketEventHandler是connection的处理中心。在主框架的epoll中并不会直接调用客户端的读写回调函数,而是统一调用connSocketEventHandler这样一来相当于是框架与connection解耦了。

    3.4. Redis5版本的设置回调函数

    我回头查看了Redis5.0.10版本的,发现,其是直接设置readQueryFromClient作为回调函数这个版本也是没有结构体connection,其是直接使用client的。

    1. //Reids5.0.10版本
    2. void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    3. int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    4. while(max--) {
    5. cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    6. ...........
    7. acceptCommonHandler(cfd,0,cip);
    8. }
    9. }
    10. static void acceptCommonHandler(int fd, int flags, char *ip) {
    11. client *c = createClient(fd)
    12. .................
    13. }
    14. client *createClient(int fd) {
    15. client *c = zmalloc(sizeof(client));
    16. if (fd != -1) {
    17. aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c)
    18. ......................
    19. }
    20. }

    Redis6后,开始支持SSL / TLS,添加了conneciton,这个connection就是有两种类型。所以才这样弄吧。

    3.5. 个人的一些想法,修改源码

    当事件就绪时,那都是执行connSocketEventHandler,而其都有读/写事件的回调函数。

    我认为,那结构体aeFileEvent可以只拥有一个aeFileProc即可。

    1. typedef struct aeFileEvent {
    2. int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    3. aeFileProc *rfileProc;
    4. aeFileProc *wfileProc;
    5. void *clientData;
    6. } aeFileEvent;
    7. //可以改写成如下
    8. typedef struct aeFileEvent {
    9. int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    10. aeFileProc *fileProc; //只用一个回调函数就行
    11. void *clientData;
    12. } aeFileEvent;

    而在函数aeProcessEvents中不再需要判别是读事件还是写事件了。可以改写成如下:

    1. //展示主体,主要是修改了for循环内部,不管是哪种类型,统一是使用fe->fileProc(....)
    2. int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    3. {
    4. int processed = 0, numevents;
    5. ...............................
    6. if (eventLoop->maxfd != -1 ||
    7. ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
    8. struct timeval tv, *tvp;
    9. if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
    10. eventLoop->beforesleep(eventLoop);
    11. /* Call the multiplexing API, will return only on timeout or when
    12. * some event fires. */
    13. numevents = aeApiPoll(eventLoop, tvp);
    14. for (int j = 0; j < numevents; j++) {
    15. aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    16. int mask = eventLoop->fired[j].mask;
    17. int fd = eventLoop->fired[j].fd;
    18. if (fe->mask) //表示该fd是有关注的事件类型的,就可以执行对应的读或写
    19. //该函数就是调用connSocketEventHandler
    20. fe->fileProc(eventLoop,fd,fe->clientData,mask);
    21. }
    22. }
    23. }

    可以这样写的原因,是因为connSocketEventHandler中有写回调和读回调,只要传事件类型进去就知道是使用读回调还是写回调。这种写法就更加统一了。 

    那为什么Redis作者不这样做呢?是为了兼容之前版本的,或者是我漏了什么细节是不能这样操作的呢?若有见解,欢迎在评论区讨论指出。

    4. 总结设置客户端回调函数的流程

    读事件回调函数的设置

    创建连接时候,需要设置客户端的读回调。

    createClient--->connSetReadHandler--->(conn->type->set_read_handler)--->(connSocketReadHandler,其内部把读回调函数readQueryFromClient赋值给read_handler)--->(aeCreateFileEvent,把conn->type->ae_handler赋值给rfileProc)。

    写事件回调函数的设置流程

    单线程的情况:

    aeProcessEvents--->beforesleep--->handleClientsWithPendingWritesUsingThreads--->handleClientsWithPendingWrites--->connSetWriteHandlerWithBarrier--->(connSocketSetWriteHandler,把sendReplyToClient赋值给write_handler)--->(aeCreateFileEvent,把conn->type->ae_handler赋值给wfileProc)。

    5. 回调函数的调用流程

    客户端

    当事件就绪(假设是读事件),就会执行fe->rfileProc函数,那该函数就是执行connSocketEventHandler,接着其内部会调用callHandler函数,callHandler就会调用conn->read_handler。

    其实不管是读事件还是写事件,都是执行connSocketEventHandler

    即是aeProcessEvents--->rfileProc--->connSocketEventHandler--->callHandler--->(conn->read_handler,即是readQueryFromClient)。

    服务器端 

    aeProcessEvents--->rfileProc--->acceptTcpHandler。

    对比:

    服务器端的是直接调用回调函数acceptTcpHandler。

    而客户端的是调用connSocketEventHandler再在connSocketEventHandler内部判断若是读事件,才执行回调函数readQueryFromClient

  • 相关阅读:
    中石油勘探院张弢:从业务到架构全面探讨中国石油的数字化转型之路
    JAVAEE初阶相关内容第十三弹--文件操作 IO
    亚马逊创建eks
    powershell批量修改后缀名
    STM32WB55的FUS更新及协议栈固件烧写方法
    软件安全测试怎么做?如何确保软件授权安全
    Inductive Representation Learning on Large Graphs 论文/GraphSAGE学习笔记
    java计算机毕业设计springboot+vue气象观测数据样本构建与分析系统-天气预报网站
    springboot+vue+elementUI大学生体质测试管理系统#毕业设计
    如何重新训练模型?
  • 原文地址:https://blog.csdn.net/m0_57408211/article/details/138181940