目录
2. 结构体aeFileEvent、aeFiredEvent、aeTimeEvent
6. 通过Redis网络部分,学到如何实现reactor模式
上一章节讲解了Redis的网络交互流程,没有对关于网络部分的结构体有具体的讲解,所以该文章就主要讲解Redis网络部分使用的结构体。
epoll有三个函数调用:
- typedef struct aeApiState {
- int epfd;
- struct epoll_event *events;
- } aeApiState;
主要流程:
- //现在还没了解到aeEventLoop,可以先不用管aeEventLoop相关的,不影响看代码。
- static int aeApiCreate(aeEventLoop *eventLoop) {
- aeApiState *state = zmalloc(sizeof(aeApiState));
-
- if (!state) return -1;
- state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
- if (!state->events) {
- zfree(state);
- return -1;
- }
- state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
- if (state->epfd == -1) {
- zfree(state->events);
- zfree(state);
- return -1;
- }
- anetCloexec(state->epfd);
- eventLoop->apidata = state;
- return 0;
- }
-
- //重置aeApiState的events的大小,即是重置存放就绪fd的空间大小
- static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
- aeApiState *state = eventLoop->apidata;
-
- state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
- return 0;
- }
-
- //释放结构体aeApiState
- static void aeApiFree(aeEventLoop *eventLoop) {
- aeApiState *state = eventLoop->apidata;
-
- close(state->epfd);
- zfree(state->events);
- zfree(state);
- }
主要是两个:添加和删除。
- static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
- aeApiState *state = eventLoop->apidata;
- struct epoll_event ee = {0}; /* avoid valgrind warning */
- /* If the fd was already monitored for some event, we need a MOD
- * operation. Otherwise we need an ADD operation. */
- int op = eventLoop->events[fd].mask == AE_NONE ?
- EPOLL_CTL_ADD : EPOLL_CTL_MOD;
-
- ee.events = 0;
- //得到要关注的事件类型mask
- mask |= eventLoop->events[fd].mask; /* Merge old events */
- if (mask & AE_READABLE) ee.events |= EPOLLIN;
- if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
- ee.data.fd = fd;
- if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
- return 0;
- }
-
- static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
- aeApiState *state = eventLoop->apidata;
- struct epoll_event ee = {0}; /* avoid valgrind warning */
- int mask = eventLoop->events[fd].mask & (~delmask);
-
- ee.events = 0;
- if (mask & AE_READABLE) ee.events |= EPOLLIN;
- if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
- ee.data.fd = fd;
- if (mask != AE_NONE) {
- epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
- } else {
- /* Note, Kernel < 2.6.9 requires a non null event pointer even for
- * EPOLL_CTL_DEL. */
- epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
- }
- }
即是调用epoll_wait(...)。
- static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
- aeApiState *state = eventLoop->apidata;
- int retval, numevents = 0;
-
- //eventLoop->setsize就是state->evnets数组的元素个数
- retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
- tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
- if (retval > 0) {
- int j;
-
- numevents = retval;
- for (j = 0; j < numevents; j++) { //遍历就绪的fd
- int mask = 0; //mask就是该fd就绪的事件
- struct epoll_event *e = state->events+j;
-
- if (e->events & EPOLLIN) mask |= AE_READABLE;
- if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
- if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
- if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
- //把就绪的fd存储到 eventLoop->fired[j]中
- //可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛
- eventLoop->fired[j].fd = e->data.fd;
- eventLoop->fired[j].mask = mask;
- }
- }
- return numevents;
- }
一个客户端建立连接后,会把该客户端的fd添加到epoll上,并监听其关注的事件。假如当前fd需要监听读事件,那该fd读就绪,那对应的读事件函数就会被调用。
所以,可以把fd关注的事件和fd就绪会执行的回调函数封装成一个结构体,Redis叫其是aeFileEvent。
- /* File event structure */
- typedef struct aeFileEvent {
- int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
- aeFileProc *rfileProc;
- aeFileProc *wfileProc;
- void *clientData;
- } aeFileEvent;
-
- //回调函数的类型
- typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
-
- //前面的形式可能不好看出,用c++11写的话,如下
- //using aeFileProc=std::function
;
所以,客户端就会有自己的aeFileEvent,即是一个客户端就对应一个aeFileEvent。 当读或写就绪时,就会调用对应的函数。
其实,服务端也是对应一个aeFileEvent,因为也要监听服务端的读事件。当客户端来连接,服务端读事件就绪,就会调用服务端的读事件回调函数进行accept。
所以就会有创建aeFileEvent,每个客户端或者一个服务端都会使用函数aeCreateFileEvent。(后续会详讲,先留个印象)
看到这里读者可能会有疑惑,不是说要对某个fd进行注册监听的吗,怎么该结构体没有fd的呢?
那是因为aeFileEvent是存储在数组内。数组的下标就是该aeFileEvent对应的fd,也即是客户端对应的fd。(后续从代码中可以看出的)
如名字一样,其就是就绪事件。一个就绪事件,就肯定是可以从中知道是哪个fd就绪,还有知道是哪种类型事件就绪。
epoll_wait返回的一个就绪fd就对应一个aeFireEvent。
aeApiPoll就是把epoll_wait返回的就绪fd和事件类型 逐个赋值给aeFireEvent。
- /* A fired event */
- typedef struct aeFiredEvent {
- int fd;
- int mask;
- } aeFiredEvent;
-
-
- //把epoll_wait返回的就绪fd和事件赋值给fired[j]
- static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
- aeApiState *state = eventLoop->apidata;
- int retval, numevents = 0;
-
- //eventLoop->setsize就是state->evnets数组的元素个数
- retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
- tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
- if (retval > 0) {
- numevents = retval;
- for (int j = 0; j < numevents; j++) { //遍历就绪的fd
- int mask = 0; //mask就是该fd就绪的事件
- struct epoll_event *e = state->events+j;
-
- if (e->events & EPOLLIN) mask |= AE_READABLE;
- if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
- if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
- if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
-
- //把就绪的fd存储到 eventLoop->fired[j]中
- //可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛
- eventLoop->fired[j].fd = e->data.fd;
- eventLoop->fired[j].mask = mask;
- }
- }
- return numevents;
- }
其是时间事件,可以认为是个定时器,定时会执行给定的函数。
从代码可以看到其是个双向链表。
- //时间事件结构
- typedef struct aeTimeEvent {
- long long id; //时间事件的唯一标识符,自增
-
- //事件的到达时间(即是执行时间)
- long when_sec; /* seconds */
- long when_ms; /* milliseconds */
-
- //事件处理函数 (到期执行的回调函数)
- aeTimeProc *timeProc;
-
- //事件释放函数 (回调函数)
- aeEventFinalizerProc *finalizerProc;
- void *clientData; //多路复用库的私有数据
- //双向链表
- struct aeTimeEvent *prev;
- struct aeTimeEvent *next;
- int refcount; //以防止计时器事件在递归时间事件调用中释放
- } aeTimeEvent;
-
- //时间事件回调函数类型
- typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
-
- //删除定时事件的回调函数类型
- typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
EventLoop是个事件循环,其主要功能就是实现 while(1){ epoll_wait(....) }。所以说是个事件循环,这也对epoll再进一步封装。
aeEventLoop是Reactor模型的具体抽象,把网络读写事件和时间事件(定时器任务)可以统一处理。
- //ae.h
- // 事件循环
- typedef struct aeEventLoop {
- int maxfd; //已经注册的文件描述符的最大值
- int setsize; //setsize是能注册的fd的最大值(epoll_wait函数中的参数maxevents)
- long long timeEventNextId; //下一个要注册的时间事件id
-
- time_t lastTime; //最后一次执行时间事件的时间
- aeFileEvent *events; //是数组,已注册的文件事件 (就是IO event)
- aeFiredEvent *fired; //数组,已就绪的文件事件
-
- aeTimeEvent *timeEventHead; //定时器链表的头结点(头结点不一定是最早超时的任务)
-
- int stop; //eventLoop的开关
-
- void *apidata; //具体的IO多路复用实现,即是前面讲的结构体aeApiState变量。
- aeBeforeSleepProc *beforesleep;//在处理事件前要执行的回调函数(即是在执行epoll_wait()之前)
- aeBeforeSleepProc *aftersleep;//在处理事件后要执行的回调函数(即是在执行epoll_wait()之后)
- int flags; //设置的标识位
- } aeEventLoop;
-
- //进入循环等待之前的回调函数类型
- typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
从上面的注释知道 aeEventLoop的成员变量的用处。所以这里就只重点讲解两个变量*events 和 *fired。
- aeEventLoop *aeCreateEventLoop(int setsize) {
- aeEventLoop *eventLoop;
- int i;
- //分配内存给eventLoop
- eventLoop = zmalloc(sizeof(*eventLoop))
- //分配内存给eventLoop中的events 和 fired 数组
- eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
- eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
-
- //设置其一些成员变量
- eventLoop->setsize = setsize;
- eventLoop->timeEventHead = NULL;
- eventLoop->timeEventNextId = 0;
- eventLoop->stop = 0;
- eventLoop->maxfd = -1;
- eventLoop->beforesleep = NULL;
- eventLoop->aftersleep = NULL;
- eventLoop->flags = 0;
- //创建epoll fd,即是调用epoll_create, 内部把创建出来的apiState赋值给eventLoop->apidata
- aeApiCreate(eventLoop)
-
- //当前每个aeFileEvent都不关注任何事件
- for (i = 0; i < setsize; i++)
- eventLoop->events[i].mask = AE_NONE;
- return eventLoop;
- //省略了些内存申请失败的处理................
- }
内部会调用aeApiAddEvent。
- int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
- aeFileProc *proc, void *clientData)
- {
- if (fd >= eventLoop->setsize) { //判断客户端fd是否大于events数组元素个数
- errno = ERANGE;
- return AE_ERR; //若是大于,返回错误
- }
- //从events数组中获取一个元素
- aeFileEvent *fe = &eventLoop->events[fd]; //从这可以看出events的下标就是客户端的fd
-
- if (aeApiAddEvent(eventLoop, fd, mask) == -1) //添加fd到epoll上,并关注事件mask
- return AE_ERR;
- fe->mask |= mask; //设置结构体aeFileEvent关注的事件
- //设置回调函数
- if (mask & AE_READABLE) fe->rfileProc = proc;
- if (mask & AE_WRITABLE) fe->wfileProc = proc;
- fe->clientData = clientData; //设置回调函数的参数
-
- //更新已监听的fd的最大值
- if (fd > eventLoop->maxfd)
- eventLoop->maxfd = fd;
- return AE_OK;
- }
-
- //创建时间事件,即是定时器
- long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
- aeTimeProc *proc, void *clientData,
- aeEventFinalizerProc *finalizerProc)
- {
- long long id = eventLoop->timeEventNextId++;
- aeTimeEvent *te;
-
- te = zmalloc(sizeof(*te));
- if (te == NULL) return AE_ERR;
- te->id = id;
- te->when = getMonotonicUs() + milliseconds * 1000;
- //设置定时器的回调函数
- te->timeProc = proc;
- te->finalizerProc = finalizerProc;
- te->clientData = clientData; //设置回调函数的参数
- te->prev = NULL;
- te->next = eventLoop->timeEventHead; //从这句代码可以看出其是往头部插入的
- te->refcount = 0;
- if (te->next)
- te->next->prev = te;
- eventLoop->timeEventHead = te; //更新链表头部为新插入的节点
- return id;
- }
通过传入的fd找到evnets数组中的元素,之后调用aeApiDelEvent进行删除。之后吧该位置的
- void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
- {
- if (fd >= eventLoop->setsize) return;
- //通过传入的fd找到events数组中的元素
- aeFileEvent *fe = &eventLoop->events[fd];
- if (fe->mask == AE_NONE) return;
-
- /* We want to always remove AE_BARRIER if set when AE_WRITABLE
- * is removed. */
- if (mask & AE_WRITABLE) mask |= AE_BARRIER;
-
- aeApiDelEvent(eventLoop, fd, mask); //进行删除
- fe->mask = fe->mask & (~mask); //取消监听mask事件
- if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
- /* Update the max fd */
- int j;
- for (j = eventLoop->maxfd-1; j >= 0; j--)
- if (eventLoop->events[j].mask != AE_NONE) break;
- eventLoop->maxfd = j; //更新已注册的fd的最大值
- }
- }
aeMain就是一个while()循环,循环内部是aeProcessEvents函数。
- void aeMain(aeEventLoop *eventLoop) {
- eventLoop->stop = 0;
- while (!eventLoop->stop) {
- aeProcessEvents(eventLoop, AE_ALL_EVENTS|
- AE_CALL_BEFORE_SLEEP|
- AE_CALL_AFTER_SLEEP);
- }
- }
aeProcessEvents的主要步骤:
回调函数beforsleep和aftersleep可以先省略不关注的。因为目前其还没有用途,先知道有这两个回调函数就行。
- //为了能更好理解读懂,该函数只展示了主体流程,有些细节是没有展示
- int aeProcessEvents(aeEventLoop *eventLoop, int flags)
- {
- int processed = 0, numevents;
- if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))){
- struct timeval tv, *tvp;
- //省略了epoll_wait函数参数timeout的计算过程,其不是框架的重点,后续可以再详细了解
- ...............................
-
- //执行befroesleep回调函数,这是在初始化server时绑定的,可以先不关注
- if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
- eventLoop->beforesleep(eventLoop);
-
- //即是调用epoll_wait, 等待就绪的fd
- numevents = aeApiPoll(eventLoop, tvp);
- ...................
- //遍历执行已就绪的fd的回调函数
- for (int j = 0; j < numevents; j++) {
- //在aeApiPoll中把就绪的fd和事件赋值给了fired[j]
- aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
- //得到就绪的fd和事件类型
- int mask = eventLoop->fired[j].mask;
- int fd = eventLoop->fired[j].fd;
- int fired = 0; /* Number of events fired for current fd. */
-
- //下面就执行对应的回调函数
- if (!invert && fe->mask & mask & AE_READABLE) { //读就绪,执行读回调函数
- fe->rfileProc(eventLoop,fd,fe->clientData,mask);
- fired++;
- fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
- }
-
- if (fe->mask & mask & AE_WRITABLE) { //写就绪,执行写回调函数
- if (!fired || fe->wfileProc != fe->rfileProc) {
- fe->wfileProc(eventLoop,fd,fe->clientData,mask);
- fired++;
- }
- }
- ......................
- processed++;
- }
- }
-
- if (flags & AE_TIME_EVENTS) //flags有需要,就执行时间事件,即是执行定时器任务
- processed += processTimeEvents(eventLoop);
- return processed; /* return the number of processed file/time events */
- }
看aeProcessEvents时候可能会有疑惑,调用fe->rfileProc,但是都不知道该函数是什么内容的。
因为这是通过绑定的。就是把一个函数赋值给fe->rfileProc。
从main()入手。
- //server.c
- int main(int argc, char **argv) {
- .......................
- initServer();
- }
-
- //initServer函数中就有绑定回调函数的
- void initServer(void) {
- //省略了很多其他部分的内容
- //socket(...),bind(...),listen(...),创建服务端的流程....................
- ............................................
- aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
-
- /* Create an event handler for accepting new connections in TCP sockets. */
- createSocketAcceptHandler(&server.ipfd, acceptTcpHandler)
-
- /* Register before and after sleep handlers (note this needs to be done
- * before loading persistence since it is used by processEventsWhileBlocked. */
- aeSetBeforeSleepProc(server.el,beforeSleep);
- aeSetAfterSleepProc(server.el,afterSleep);
- ................................
- }
- void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
- eventLoop->beforesleep = beforesleep;
- }
-
- void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
- eventLoop->aftersleep = aftersleep;
- }
在服务器初始化时候就绑定了这两个函数,那么在运行aeProcessEvents时候,就会调用函数beforesleep和aftersleep。
就是在createSocketAcceptHandler,其就是调用aeCreateFileEvent。sfd->fd[j]就是服务端fd,AE_READABLE就是需要监听的事件,即是监听服务端的读事件。回调函数是accept_handler,即是把accept_handler赋值给fe->rfileProc。
从函数aeProcessEvents,会返回就绪的fd和就绪的事件类型。当服务端fd返回的时候,是读事件就绪,就会调用fe->rfileProc。而这时rfileProc就是accpet_handler,即是会调用accpet_handler。
- //sfd是个数组,我们看源码的话,目前就当其是一个元素的即可,就不用使用for
- //就把这个函数当做就是使用aeCreateFileEvent即可。
- int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
- for (int j = 0; j < sfd->count; j++) {
- aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL)
- }
- //错误处理没有展示
- return C_OK;
- }
-
- int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
- aeFileProc *proc, void *clientData)
- {
- ................
- fe->mask |= mask;
- if (mask & AE_READABLE) fe->rfileProc = proc;
- if (mask & AE_WRITABLE) fe->wfileProc = proc;
- fe->clientData = clientData;
- .....................
- }
回调函数acceptTcpHandler很明显会调用accpet去对客户建立连接的。
继续跟着函数acceptTcpHandler。
- void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
-
- while(max--) {
- //调用accept
- cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
- ............................
- acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
- }
- }
-
- static void acceptCommonHandler(connection *conn, int flags, char *ip) {
- .................
- /* Create connection and client */
- client *c = createClient(conn))
-
- /* Last chance to keep flags */
- c->flags |= flags;
- }
-
- client *createClient(connection *conn) {
- client *c = zmalloc(sizeof(client));
-
- if (conn) {
- connSetReadHandler(conn, readQueryFromClient); //设置客户端的读回调函数
- connSetPrivateData(conn, c);
- }
- ...................
- if (conn) linkClient(c); //把该客户端添加到服务器server.client链表中保存
- }
-
- static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
- return conn->type->set_read_handler(conn, func);
- }
set_read_handler也是个回调函数,其设置成了是函数connSocketSetReadHandler。
这里set_read_handler也是个回调函数是因为有两种类型的connection,Redis是把一个客户端封装成一个connnection(该结构体后续会讲解)。
- //当前就认为conn->type->ae_handler是参数func即可,内部有很多兜兜转转
- static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
- if (func == conn->read_handler) return C_OK;
-
- conn->read_handler = func;
- if (!conn->read_handler)
- aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
- else
- if (aeCreateFileEvent(server.el,conn->fd,
- AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
- return C_OK;
- }
这样就把readQueryFromClient设置给了fe->rfileProc。当epoll_wait返回就绪的fd和事件,若该fd是客户端,也是读事件,那就会执行fe->rfileProc,即是执行readQueryFromClient。
这个是在绑定的beforeSleep函数中。
- void beforeSleep(struct aeEventLoop *eventLoop) {
- ..........................
- /* Handle writes with pending output buffers. */
- handleClientsWithPendingWritesUsingThreads();
- .............................
- }
-
- int handleClientsWithPendingWritesUsingThreads(void) {
- ...................
- listRewind(server.clients_pending_write,&li);
- while((ln = listNext(&li))) {
- client *c = listNodeValue(ln);
-
- //如果缓冲区中还有回复客户端的数据,就需要设置写回调函数,当epoll_wait返回客户端fd的写事件时候,就会执行sendReplyToClient
- if (clientHasPendingReplies(c) &&
- connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
- {
- }
- }
- ......................
- }
-
- static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
- return conn->type->set_write_handler(conn, func, 0);
- }
和set_read_handler一样,set_write_handler也是后面设置的。该函数是connSocketSetWriteHandler。
- static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
- if (func == conn->write_handler) return C_OK;
-
- conn->write_handler = func;
- if (barrier)
- conn->flags |= CONN_FLAG_WRITE_BARRIER;
- else
- conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
- if (!conn->write_handler)
- aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
- else
- if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,
- conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
- return C_OK;
- }
这样就把 sendReplyToClient设置给了fe->wfileProc。当epoll_wait返回客户端fd的写事件时候,就会调用fe->wfileProc,即是执行sendReplyToClient。
使用aeCreateTimeEvent来绑定时间事件的回调函数。在aeProcessEvents函数内部会调用processTimeEvents去执行定时器任务。
- long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
- aeTimeProc *proc, void *clientData,
- aeEventFinalizerProc *finalizerProc)
- {
- long long id = eventLoop->timeEventNextId++;
- aeTimeEvent *te;
-
- te = zmalloc(sizeof(*te));
- if (te == NULL) return AE_ERR;
- te->id = id;
- te->when = getMonotonicUs() + milliseconds * 1000;
- //设置事件的回调函数
- te->timeProc = proc;
- te->finalizerProc = finalizerProc;
- te->clientData = clientData;
- te->prev = NULL;
- te->next = eventLoop->timeEventHead;
- te->refcount = 0;
- if (te->next)
- te->next->prev = te;
- eventLoop->timeEventHead = te;
- return id;
- }
首先要明确一点,reactor模式就是基于IO多路复用的。事件驱动也是IO多路复用的,不是说使用了reactor模式才是使用了事件驱动。
以事件为连接点,当有IO事件准备就绪时,就会通知用户,并且告知用户返回的是什么类型的事件,进而用户可以执行相对应的任务。这样就不用在IO等待上浪费资源,这便是事件驱动的核心思想。
比如你点了两份外卖,外卖A,外卖B。之后你无需时刻打电话去问外卖到了没。外卖到的时候,外卖员会打电话通知你。这中途你就可以做自己的事,不用纯纯等待。还有可以知道是外卖A到了还是外卖B到了,外卖员会告知是哪个外卖到的。
这个就是事件驱动。IO事件准备就绪时,会自动通知用户,并会告知其事件类型。
所以应该是,IO多路复用 + 回调机制 构成了 reactor模式。
还有reactor模式是同步的。因为其是使用IO多路复用的,而IO多路复用是同步的。
IO操作是同步还是异步,关键看数据在内核空间与用户空间的拷贝过程(数据读写的IO操作)

网上很多说其可以很好处理高并发,但是我觉得IO多路复用也可以处理的。
还有说可扩展性,通过增加Reactor实例个数来充分利用CPU资源;那通过在其他线程再创建epoll也行的。
所以,我觉得一个很大的优势是:
即写应用处理时,只需关注如何处理业务逻辑。Reactor框架本身与具体事件处理逻辑无关。
假如是把reactor模式做成一个网络库给用户使用。那用户就只需要关注处理请求的逻辑即可。该网络库对外开放setCallback函数。
假设,用户写服务器服务,收到客户端发来的数据(一串数字),想对数字做加法 或者想对数字做乘法都行。只要使用setCallback函数设置好处理请求的逻辑就行。这就是应用处理请求的逻辑,与事件分发框架完全分离,这是很方便的。
假如该框架是不对外开放的(就是用户不能使用setCallback),像Redis一样,为什么还需要使用reactor模式,为什么不可以只用IO多路复用,不用回调机制呢?
我认为,也是解耦的好处。Redis编写人员需要改变处理请求的逻辑时候,就只改变某个函数即可,不需要深入到epoll框架去改变,这就是很方便的。
Redis的reactor模式没有使用到结构体event_data_t的指针变量ptr。
若是想逐步实现recactor模式,这里推荐下本人写的0.仿造muduo,实现linux服务器开发思路
该文章专栏有详细的逐步实现的reactor模式的代码流程。欢迎查看。