①事件驱动(event-driven),高性能;
②轻量级,专注于网络,不如 ACE 那么臃肿庞大
③跨平台,支持 Windows、Linux、*BSD 和 Mac Os
④支持多种 I/O 多路复用技术, epoll、poll、dev/poll、select 和 kqueue 等;
⑤支持 I/O,定时器和信号等事件;
⑥注册事件优先级;
1)响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的;
2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/
进程的切换开销;
3)可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源;
4)可复用性,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性;
1)首先初始化 libevent 库,并保存返回的指针(其实就是创建epoll的红黑树和双向链表)
2)初始化事件 event,设置回调函数和关注的事件
3)设置 event 从属的 event_base(事件,也就是节点装载到红黑树上)
4)是正式的添加事件的时候了(其实就是在红黑树上加入节点,这个节点绑定特定的读
还是写事件)
1)首先初始化 libevent 库,并保存返回的指针(其实就是创建epoll的红黑树和双向链表)
struct event_base * base = event_init();
实际上这一步相当于初始化一个 Reactor 实例;在初始化 libevent 后,就可以注册事件了。
2)初始化事件 event,设置回调函数和关注的事件(其实就是在红黑树上加入节点,这个节点绑定特定的读还是写事件)
evtimer_set(&ev, timer_cb, NULL);
事实上这等价于调用
event_set(&ev, -1, 0, timer_cb, NULL);
event_set 的函数原型是:
void event_set(struct event *ev, int fd, short event, void (*cb)(int,
short, void *), void *arg)
ev:执行要初始化的 event 对象;
fd:该 event 绑定的“句柄”,对于信号事件,它就是关注的信号
event:在该 fd 上关注的事件类型,它可以是 EV_READ, EV_WRITE, EV_SIGNAL;
cb:这是一个函数指针,
当 fd 上的事件 event 发生时,调用该函数执行处理,它有三个参数,(调用时由 event_base 负责传入,按顺序,)
1)fd:于定时事件不需要 fd,并且定时事件是根据添加时(event_add)的超时值设定的,因此
这里 event 也不需要设置
2)event:事件
3)arg:传递给 cb 函数指针的参数
3)设置 event 从属的 event_base(事件,也就是节点装载到红黑树上)
event_base_set(base, &ev);
这一步相当于指明 event 要注册到哪个 event_base 实例上;
4)是正式的添加事件的时候了
event_add(&ev, timeout);
基本信息都已设置完成,只要简单的调用 event_add()函数即可完成,其中 timeout 是定时值;这一步相当于调用 Reactor::register_handler()函数注册事件。
5)程序进入无限循环,等待就绪事件并执行事件处理
event_base_dispatch(base);
上面例子的程序代码如下所示
struct event ev;
struct timeval tv;
void time_cb(int fd, short event, void *argc)
{
printf("timer wakeup\n");
event_add(&ev, &tv); // reschedule timer
}
int main()
{
struct event_base *base = event_init();
tv.tv_sec = 10; // 10s period
tv.tv_usec = 0;
evtimer_set(&ev, time_cb, NULL);
event_add(&ev, &tv);
event_base_dispatch(base);
}
1) 首先应用程序准备并初始化 event,设置好事件类型和回调函数;这对应于前面第步骤
2 和 3;
2) 向 libevent 添加该事件 event。对于定时事件,libevent 使用一个小根堆管理,key 为超
时时间;对于 Signal 和 I/O 事件,libevent 将其放入到等待链表(wait list)中,这是一
个双向链表结构
3) 程序调用 event_base_dispatch()系列函数进入无限循环,等待事件,以 select()函数为例;
每次循环前 libevent 会检查定时事件的最小超时时间 tv,根据 tv 设置 select()的最大等
待时间,以便于后面及时处理超时事件;当 select()返回后,首先检查超时事件,然后检查 I/O 事件;Libevent 将所有的就绪事件,放入到激活链表中;然后对激活链表中的事件,调用事件的回调函数执行事件处理;

1)头文件
2)内部头文件
3)libevent框架
4)对系统I/O多路复用机制的封装
5)定时事件管理
6)信号管理
7)辅助功能函数
8)日志
9)缓冲区管理
10)基本数据结构
11)实用网络库
1)头文件
主要就是 event.h:事件宏定义、接口函数声明,主要结构体 event 的声明;
2)内部头文件
xxx-internal.h:内部数据结构和函数,对外不可见,以达到信息隐藏的目的;
3)libevent框架
event.c:event 整体框架的代码实现;
4)对系统I/O多路复用机制的封装
epoll.c:对 epoll 的封装;
select.c:对 select 的封装;
devpoll.c:对 dev/poll 的封装;
kqueue.c:对 kqueue 的封装;
5)定时事件管理
min-heap.h:其实就是一个以时间作为 key 的小根堆结构;
6)信号管理
signal.c:对信号事件的处理;
7)辅助功能函数
evutil.h 和 evutil.c:一些辅助功能函数,包括创建 socket pair 和一些时间操作函数:加、减和比较等。
8)日志
log.h 和 log.c:log 日志函数
9)缓冲区管理
evbuffer.c 和 buffer.c:libevent 对缓冲区的封装;
10)基本数据结构
compat\sys 下的两个源文件:queue.h 是 libevent 基本数据结构的实现,包括链表,双向链表,队列等;_libevent_time.h:一些用于时间操作的结构体定义、函数和宏定义;
11)实用网络库
http 和 evdns:是基于 libevent 实现的 http 服务器和异步 dns 查询库;
Libevent 是基于事件驱动(event-driven)的,从名字也可以看到 event 是整个库的核心。
event 就是 Reactor 框架中的事件处理程序组件;它提供了函数接口,供 Reactor 在事件发生时调用,以执行相应的事件处理,通常它会绑定一个有效的句柄。首先给出 event 结构体的声明,它位于 event.h 文件中:
struct event {
TAILQ_ENTRY (event) ev_next;
TAILQ_ENTRY (event) ev_active_next;
TAILQ_ENTRY (event) ev_signal_next;
unsigned int min_heap_idx; /* for managing timeouts */
struct event_base *ev_base;
int ev_fd;
short ev_events;
short ev_ncalls;
short *ev_pncalls; /* Allows deletes in callback */
struct timeval ev_timeout;
int ev_pri; /* smaller numbers are higher priority */
void (*ev_callback)(int, short, void *arg);
void *ev_arg;
int ev_res; /* result passed to event callback */
int ev_flags;
};
1)ev_events:event关注的事件类型,它可以是以下3种类型
2)ev_next,ev_active_next 和 ev_signal_next 都是双向链表节点指针;它们是 libevent 对不同事件类型和在不同的时期,对事件的管理时使用到的字段。
3)min_heap_idx 和 ev_timeout
4)ev_base 该事件所属的反应堆实例,这是一个 event_base 结构体,下一节将会详细讲解;
5)ev_fd,对于 I/O 事件,是绑定的文件描述符;对于 signal 事件,是绑定的信号;
6)ev_callback,event 的回调函数,被 ev_base 调用,执行事件处理程序,
这是一个函数指针
7)ev_arg:void*,表明可以是任意类型的数据,在设置 event 时指定;
8)eb_flags:libevent 用于标记 event 信息的字段,表明其当前的状态,
9)ev_ncalls:事件就绪执行时,调用 ev_callback 的次数,通常为 1;
10)ev_pncalls:指针,通常指向 ev_ncalls 或者为 NULL;
11)ev_res:记录了当前激活事件的类型;
1)ev_events:event关注的事件类型,它可以是以下3种类型
I/O事件: EV_WRITE和EV_READ
定时事件:EV_TIMEOUT
信号: EV_SIGNAL
辅助选项:EV_PERSIST,表明是一个永久事件
#define EV_TIMEOUT 0x01
#define EV_READ 0x02
#define EV_WRITE 0x04
#define EV_SIGNAL 0x08
#define EV_PERSIST 0x10 /* Persistant event */
信号和I/O事件不能同时2)ev_next,ev_active_next 和 ev_signal_next 都是双向链表节点指针;它们是 libevent 对不同事件类型和在不同的时期,对事件的管理时使用到的字段。
1)ev_next 就是该 I/O 事件在链表中的位置;称此链表为“已注册事件链表”
2)ev_signal_next 就是 signal 事件在 signal 事件链表中的位置
3)ev_active_next:libevent 将所有的激活事件放入到链表 active list 中,然后遍历 active list 执
行调度,ev_active_next 就指明了 event 在 active list 中的位置
3)min_heap_idx 和 ev_timeout
如果是 timeout 事件,它们是 event 在小根堆中的索引和超
时值,libevent 使用小根堆来管理定时事件,这将在后面定时事件处理时专门讲解
4)ev_base 该事件所属的反应堆实例,这是一个 event_base 结构体,下一节将会详细讲解;
5)ev_fd,对于 I/O 事件,是绑定的文件描述符;对于 signal 事件,是绑定的信号;
6)ev_callback,event 的回调函数,被 ev_base 调用,执行事件处理程序,
这是一个函数指针,原型为:
void (*ev_callback)(int fd, short events, void *arg)
其中参数 fd 对应于 ev_fd;events 对应于 ev_events;arg 对应于 ev_arg;
7)ev_arg:void*,表明可以是任意类型的数据,在设置 event 时指定;
8)eb_flags:libevent 用于标记 event 信息的字段,表明其当前的状态,可能的值有:
#define EVLIST_TIMEOUT 0x01 // event在time堆中
#define EVLIST_INSERTED 0x02 // event在已注册事件链表中
#define EVLIST_SIGNAL 0x04 // 未见使用
#define EVLIST_ACTIVE 0x08 // event在激活链表中
#define EVLIST_INTERNAL 0x10 // 内部使用标记
#define EVLIST_INIT 0x80 // event 已被初始化
9)ev_ncalls:事件就绪执行时,调用 ev_callback 的次数,通常为 1;
10)ev_pncalls:指针,通常指向 ev_ncalls 或者为 NULL;
11)ev_res:记录了当前激活事件的类型;
从 event 结构体中的 3 个链表节点指针和一个堆索引出发,大体上也能窥出 libevent 对
event 的管理方法了,可以参见下面的示意图。
前提思路
每次当有事件 event 转变为就绪状态时,libevent 就会把它移入到 active event list[priority]
中,其中 priority 是 event 的优先级;
执行思路
接着 libevent 会根据自己的调度策略选择就绪事件,调用其 cb_callback()函数执行事件
处理;并根据就绪的句柄和事件类型填充 cb_callback 函数的参数。

要向 libevent 添加一个事件,需要首先设置 event 对象,这通过调用 libevent 提供的函
数有:event_set(), event_base_set(), event_priority_set()来完成;下面分别进行讲解
1) event_set
void event_set(struct event *ev, int fd, short events,
void (*callback)(int, short, void *), void *arg)
1.fd:设置事件 ev 绑定的文件描述符或者信号,对于定时事件,设为-1 即可;
2.event:设置事件类型,比如 EV_READ|EV_PERSIST, EV_WRITE, EV_SIGNAL 等;
3.callback and arg : 设置事件的回调函数以及参数 arg;
4.ev:初始化其它字段,比如缺省的 event_base 和优先级
2) event_base_set
设置 event ev 将要注册到的 event_base;
int event_base_set(struct event_base *base, struct event *ev)
libevent 有一个全局 event_base 指针 current_base,默认情况下事件 ev
将被注册到 current_base 上,使用该函数可以指定不同的 event_base;
如果一个进程中存在多个 libevent 实例,则必须要调用该函数为 event 设
置不同的 event_base;
3) event_priority_set
int event_priority_set(struct event *ev, int pri)
设置event ev的优先级,没什么可说的,注意的一点就是:当ev正处于就绪状态
时,不能设置,返回-1。
回想 Reactor 模式的几个基本组件,本节讲解的部分对应于 Reactor 框架组件。在 libevent
中,这就表现为 event_base 结构体,结构体声明如下,它位于 event-internal.h 文件中
struct event_base {
const struct eventop *evsel;
void *evbase;
int event_count; /* counts number of total events */
int event_count_active; /* counts number of active events */
int event_gotterm; /* Set to terminate loop */
int event_break; /* Set to terminate loop immediately */
/* active event management */
struct event_list **activequeues;
int nactivequeues;
/* signal handling info */
struct evsignal_info sig;
struct event_list eventqueue;
struct timeval event_tv;
struct min_heap timeheap;
struct timeval tv_cache;
};
下面详细解释一下结构体中各字段的含义。
1)evsel 和 evbase:
evsel 和 evbase 这两个字段的设置可能会让人有些迷惑,这里你可以把 evsel 和 evbase
看作是类和静态函数的关系,比如添加事件时的调用行为:evsel->add(evbase, ev),实际执行操作的是 evbase;这相当于 class::add(instance, ev),instance 就是 class 的一个对象实例。evsel指向了全局变量static const struct eventop *eventops[]中的一个;
前面也说过,libevent将系统提供的I/O demultiplex机制统一封装成了eventop结构;因此
eventops[]包含了select、poll、kequeue和epoll等等其中的若干个全局实例对象。
evbase实际上是一个eventop实例对象;
先来看看eventop结构体,它的成员是一系列的函数指针, 在event-internal.h文件中:
struct eventop {
const char *name;
void *(*init)(struct event_base *); // 初始化
int (*add)(void *, struct event *); // 注册事件
int (*del)(void *, struct event *); // 删除事件
int (*dispatch)(struct event_base *, void *, struct timeval *); //
事件分发
void (*dealloc)(struct event_base *, void *); // 注销,释放资源
/* set if we need to reinitialize the event base */
int need_reinit;
};
也就是说,在 libevent 中,每种 I/O demultiplex 机制的实现都必须提供这五个函数接口,
来完成自身的初始化、销毁释放;对事件的注册、注销和分发.比如对于 epoll,libevent 实现了 5 个对应的接口函数,并在初始化时并将 eventop 的 5
个函数指针指向这 5 个函数,那么程序就可以使用 epoll 作为 I/O demultiplex 机制了,这个
在后面会再次提到。
2)activequeues 是一个二级指针,前面讲过 libevent 支持事件优先级,因此你可以把它
看作是数组,其中的元素 activequeues[priority]是一个链表,链表的每个节点指向一个优先
级为 priority 的就绪事件 event。
3)eventqueue,链表,保存了所有的注册事件 event 的指针。
4)sig 是由来管理信号的结构体,将在后面信号处理时专门讲解;
5)timeheap 是管理定时事件的小根堆,将在后面定时事件处理时专门讲解;
6)event_tv 和 tv_cache 是 libevent 用于时间管理的变量,将在后面讲到;
1)创建一个 event_base 对象也既是创建了一个新的 libevent 实例,程序需要通过调用
event_init()(内部调用 event_base_new 函数执行具体操作)函数来创建,该函数同时还对
新生成的 libevent 实例进行了初始化。
2)该函数首先为 event_base 实例申请空间,然后初始化 timer mini-heap,选择并初始化合适的系统 I/O 的 demultiplexer 机制,初始化各事件链表;
3)函数还检测了系统的时间设置,为后面的时间管理打下基础。
前面提到 Reactor 框架的作用就是提供事件的注册、注销接口;根据系统提供的事件多
路分发机制执行事件循环,当有事件进入“就绪”状态时,调用注册事件的回调函数来处理
事件。
int event_add(struct event *ev, const struct timeval *timeout);
int event_del(struct event *ev);
int event_base_loop(struct event_base *base, int loops);
void event_active(struct event *event, int res, short events);
void event_process_active(struct event_base *base);
int event_add(struct event *ev, const struct timeval *tv)
ev:指向要注册的事件;
tv:超时时间;
int event_add(struct event *ev, const struct timeval *tv)
{
struct event_base *base = ev->ev_base; // 要注册到的event_base
const struct eventop *evsel = base->evsel;
void *evbase = base->evbase; // base使用的系统I/O策略
// 新的timer事件,调用timer heap接口在堆上预留一个位置
// 注:这样能保证该操作的原子性:
// 向系统I/O机制注册可能会失败,而当在堆上预留成功后,
// 定时事件的添加将肯定不会失败;
// 而预留位置的可能结果是堆扩充,但是内部元素并不会改变
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
if (min_heap_reserve(&base->timeheap,
1 + min_heap_size(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}
// 如果事件ev不在已注册或者激活链表中,则调用evbase注册事件
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
res = evsel->add(evbase, ev);
if (res != -1) // 注册成功,插入event到已注册链表中
event_queue_insert(base, ev, EVLIST_INSERTED);
}
// 准备添加定时事件
if (res != -1 && tv != NULL) {
struct timeval now;
// EVLIST_TIMEOUT表明event已经在定时器堆中了,删除旧的
if (ev->ev_flags & EVLIST_TIMEOUT)
event_queue_remove(base, ev, EVLIST_TIMEOUT);
// 如果事件已经是就绪状态则从激活链表中删除
if ((ev->ev_flags & EVLIST_ACTIVE) &&
(ev->ev_res & EV_TIMEOUT)) {
// 将ev_callback调用次数设置为0
if (ev->ev_ncalls && ev->ev_pncalls) {
*ev->ev_pncalls = 0;
}
event_queue_remove(base, ev, EVLIST_ACTIVE);
}
// 计算时间,并插入到timer小根堆中
gettime(base, &now);
evutil_timeradd(&now, tv, &ev->ev_timeout);
event_queue_insert(base, ev, EVLIST_TIMEOUT);
}
return (res);
}
event_queue_insert()负责将事件插入到对应的链表中,下面是程序代码;
event_queue_remove()负责将事件从对应的链表中删除,这里就不再重复贴代码了
void event_queue_insert(struct event_base *base, struct event *ev,
int queue)
{
// ev可能已经在激活列表中了,避免重复插入
if (ev->ev_flags & queue) {
if (queue & EVLIST_ACTIVE)
return;
}
// ...
ev->ev_flags |= queue; // 记录queue标记
switch (queue) {
case EVLIST_INSERTED: // I/O或Signal事件,加入已注册事件链表
TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
break;
case EVLIST_ACTIVE: // 就绪事件,加入激活链表
base->event_count_active++;
TAILQ_INSERT_TAIL(base->activequeues[ev->ev_pri], ev,
ev_active_next);
break;
case EVLIST_TIMEOUT: // 定时事件,加入堆
min_heap_push(&base->timeheap, ev);
break;
}
}
int event_del(struct event *ev);
内部原理
该函数将删除事件 ev,对于 I/O 事件,从 I/O 的 demultiplexer 上将事件注销;对于 Signal
事件,将从 Signal 事件链表中删除;对于定时事件,将从堆上删除;
备注
同样删除事件的操作则不一定是原子的,比如删除时间事件之后,有可能从系统 I/O 机
制中注销会失败。
代码
int event_del(struct event *ev)
{
struct event_base *base;
const struct eventop *evsel;
void *evbase;
// ev_base为NULL,表明ev没有被注册
if (ev->ev_base == NULL)
return (-1);
// 取得ev注册的event_base和eventop指针
base = ev->ev_base;
evsel = base->evsel;
evbase = base->evbase;
// 将ev_callback调用次数设置为
if (ev->ev_ncalls && ev->ev_pncalls) {
*ev->ev_pncalls = 0;
}
// 从对应的链表中删除
if (ev->ev_flags & EVLIST_TIMEOUT)
event_queue_remove(base, ev, EVLIST_TIMEOUT);
if (ev->ev_flags & EVLIST_ACTIVE)
event_queue_remove(base, ev, EVLIST_ACTIVE);
if (ev->ev_flags & EVLIST_INSERTED) {
event_queue_remove(base, ev, EVLIST_INSERTED);
// EVLIST_INSERTED表明是I/O或者Signal事件,
// 需要调用I/O demultiplexer注销事件
return (evsel->del(evbase, ev));
}
return (0);
}
现在我们已经初步了解了 libevent 的 Reactor 组件——event_base 和事件管理框架,接下
来就是 libevent 事件处理的中心部分——事件主循环,根据系统提供的事件多路分发机制执
行事件循环,对已注册的就绪事件,调用注册事件的回调函数来处理事件。
Libevent 将 I/O 事件、定时器和信号事件处理很好的结合到了一起,本节也会介绍
libevent 是如何做到这一点的。
Libevent 的事件主循环主要是通过 event_base_loop ()函数完成的,其主要操作如下面的流程图所示,event_base_loop 所作的就是持续执行下面的循环。

清楚了 event_base_loop 所作的主要操作,就可以对比源代码看个究竟了,代码结构还是相当清晰的
int event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
void *evbase = base->evbase;
struct timeval tv;
struct timeval *tv_p;
int res, done;
// 清空时间缓存
base->tv_cache.tv_sec = 0;
// evsignal_base是全局变量,在处理signal时,用于指名signal所属的
event_base实例
if (base->sig.ev_signal_added)
evsignal_base = base;
done = 0;
while (!done) { // 事件主循环
// 查看是否需要跳出循环,程序可以调用event_loopexit_cb()设置
event_gotterm标记
// 调用event_base_loopbreak()设置event_break标记
if (base->event_gotterm) {
base->event_gotterm = 0;
break;
}
if (base->event_break) {
base->event_break = 0;
break;
}
// 校正系统时间,如果系统使用的是非MONOTONIC时间,用户可能会向
后调整了系统时间
// 在timeout_correct函数里,比较last wait time和当前时间,如果
当前时间< last wait time
// 表明时间有问题,这是需要更新timer_heap中所有定时事件的超时时
间。
timeout_correct(base, &tv);
// 根据timer heap中事件的最小超时时间,计算系统I/O demultiplexer
的最大等待时间
tv_p = &tv;
if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
// 依然有未处理的就绪时间,就让I/O demultiplexer立即返回,
不必等待
// 下面会提到,在libevent中,低优先级的就绪事件可能不能立即
被处理
evutil_timerclear(&tv);
}
// 如果当前没有注册事件,就退出
if (!event_haveevents(base)) {
event_debug(("%s: no events registered.", __func__));
return (1);
}
// 更新last wait time,并清空time cache
gettime(base, &base->event_tv);
base->tv_cache.tv_sec = 0;
// 调用系统I/O demultiplexer等待就绪I/O events,可能是
epoll_wait,或者select等;
// 在evsel->dispatch()中,会把就绪signal event、I/O event插入到
激活链表中
res = evsel->dispatch(base, evbase, tv_p);
if (res == -1)
return (-1);
// 将time cache赋值为当前系统时间
gettime(base, &base->tv_cache);
// 检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中
timeout_process(base);
// 调用event_process_active()处理激活链表中的就绪event,调用其
回调函数执行事件处理
// 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件
链表,
// 然后处理链表中的所有就绪事件;
// 因此低优先级的就绪事件可能得不到及时处理;
if (base->event_count_active) {
event_process_active(base);
if (!base->event_count_active && (flags & EVLOOP_ONCE))
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
// 循环结束,清空时间缓存
base->tv_cache.tv_sec = 0;
event_debug(("%s: asked to terminate loop.", __func__));
return (0);
}
信号触发介绍
Signal 是异步事件的经典事例,将 Signal 事件统一到系统的 I/O 多路复用中就不像 Timer
事件那么自然了,Signal 事件的出现对于进程来讲是完全随机的,进程不能只是测试一个变
量来判别是否发生了一个信号,而是必须告诉内核“在此信号发生时,请执行如下的操作”
注意点
如果当 Signal 发生时,并不立即调用 event 的 callback 函数处理信号,而是设法通知系
统的 I/O 机制,让其返回,然后再统一和 I/O 事件以及 Timer 一起处理,不就可以了嘛。是
的,这也是 libevent 中使用的方法。
通知方法
问题的核心在于,当 Signal 发生时,如何通知系统的 I/O 多路复用机制,这里先买个小
关子,放到信号处理一节再详细说明,我想读者肯定也能想出通知的方法,比如使用 pipe。
因为系统的 I/O 机制像 select()和 epoll_wait()都允许程序制定一个最大等待时间(也称
为最大超时时间)timeout,即使没有 I/O 事件发生,它们也保证能在 timeout 时间内返回。
那么根据所有 Timer 事件的最小超时时间来设置系统 I/O 的 timeout 时间;当系统 I/O
返回时,再激活所有就绪的 Timer 事件就可以了,这样就能将 Timer 事件完美的融合到系统的 I/O 机制中了。具体的代码在源文件 event.c 的 event_base_loop()中,现在就对比代码来看看这一处理方法:
if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
// 根据Timer事件计算evsel->dispatch的最大等待时间
timeout_next(base, &tv_p);
} else {
// 如果还有活动事件,就不要等待,让evsel->dispatch立即返回
evutil_timerclear(&tv);
}
// ...
// 调用select() or epoll_wait() 等待就绪I/O事件
res = evsel->dispatch(base, evbase, tv_p);
// ...
// 处理超时事件,将超时事件插入到激活链表中
timeout_process(base);
.......
timeout_next()函数根据堆中具有最小超时值的事件和当前时间来计算等待时间,下面看
看代码:
static int timeout_next(struct event_base *base, struct timeval **tv_p)
{
struct timeval now;
struct event *ev;
struct timeval *tv = *tv_p;
// 堆的首元素具有最小的超时值
if ((ev = min_heap_top(&base->timeheap)) == NULL) {
// 如果没有定时事件,将等待时间设置为NULL,表示一直阻塞直到有
I/O事件发生
*tv_p = NULL;
return (0);
}
// 取得当前时间
gettime(base, &now);
// 如果超时时间<=当前值,不能等待,需要立即返回
if (evutil_timercmp(&ev->ev_timeout, &now, <=)) {
evutil_timerclear(tv);
return (0);
}
// 计算等待的时间=当前时间-最小的超时时间
evutil_timersub(&ev->ev_timeout, &now, tv);
return (0);
}
前提
Libevent 使用堆来管理 Timer 事件,其 key 值就是事件的超时时间,源代码位于文件
min_heap.h 中。
小根堆
所有的数据结构书中都有关于堆的详细介绍,向堆中插入、删除元素时间复杂度都是
O(lgN),N 为堆中元素的个数,而获取最小 key 值(小根堆)的复杂度为 O(1)。堆是一个完全二叉树,基本存储方式是一个数组
Libevent 实现的堆还是比较轻巧的,虽然我不喜欢这种编码方式(搞一些复杂的表达
式)。轻巧到什么地方呢,就以插入元素为例,来对比说明,下面伪代码中的 size 表示当前堆的元素个数,典型的代码逻辑如下:
Heap[size++] Å new; // 先放到数组末尾,元素个数+1
// 下面就是 shift_up()的代码逻辑,不断的将 new 向上调整
_child = size;
while(_child>0) // 循环
{
_parent Å (_child-1)/2; // 计算 parent
if(Heap[_parent].key < Heap[_child].key)
break; // 调整结束,跳出循环
swap(_parent, _child); // 交换 parent 和 child
}
而 libevent 的 heap 代码对这一过程做了优化,在插入新元素时,只是为新元素预留了
一个位置 hole(初始时 hole 位于数组尾部),但并不立刻将新元素插入到 hole 上,而是不断向上调整 hole 的值,将父节点向下调整,最后确认 hole 就是新元素的所在位置时,才会真正的将新元素插入到 hole 上,因此在调整过程中就比上面的代码少了一次赋值的操作,代码逻辑是:
// 下面就是 shift_up()的代码逻辑,不断的将 new 的“预留位置”向上调整
_hole = size; // _hole 就是为 new 预留的位置,但并不立刻将 new 放上
while(_hole>0) // 循环
{
_parent Å (_hole-1)/2; // 计算 parent
if(Heap[_parent].key < new.key)
break; // 调整结束,跳出循环
Heap[_hole] = Heap[_parent]; // 将 parent 向下调整
_hole = _parent; // 将_hole 调整到_parent
}
Heap[_hole] = new; // 调
由于每次调整都少做一次赋值操作,在调整路径比较长时,调整效率会比第一种有所
提高。libevent 中的 min_heap_shift_up_()函数就是上面逻辑的具体实现,对应的向下调整函数是 min_heap_shift_down_()。


通过设置系统 I/O 机制的 wait 时间,从而简捷的集成 Timer 事件;主要分析了 libevent
对堆调整操作的优化。
Libevent 的核心是事件驱动、同步非阻塞,为了达到这一目标,必须采用系统提供的 I/O
多路复用技术,而这些在 Windows、Linux、Unix 等不同平台上却各有不同,如何能提供优
雅而统一的支持方式,是首要关键的问题,这其实不难,本节就来分析一下。
Libevent支持多种I/O多路复用技术的关键就在于结构体eventop,这个结构体前面也曾
提到过,它的成员是一系列的函数指针, 定义在event-internal.h文件中:
struct eventop {
const char *name;
void *(*init)(struct event_base *); // 初始化
int (*add)(void *, struct event *); // 注册事件
int (*del)(void *, struct event *); // 删除事件
int (*dispatch)(struct event_base *, void *, struct timeval *); //事件分发
void (*dealloc)(struct event_base *, void *); // 注销,释放资源
/* set if we need to reinitialize the event base */
int need_reinit;
};
在 libevent 中,每种 I/O demultiplex 机制的实现都必须提供这五个函数接口,来完成自
身的初始化、销毁释放;对事件的注册、注销和分发.比如对于 epoll,libevent 实现了 5 个对应的接口函数,并在初始化时并将 eventop 的 5个函数指针指向这 5 个函数,那么程序就可以使用 epoll 作为 I/O demultiplex 机制了。
Libevent 把所有支持的 I/O demultiplex 机制存储在一个全局静态数组 eventops 中,并在
初始化时选择使用何种机制,数组内容根据优先级顺序声明如下:
/* In order of preference */
static const struct eventop *eventops[] = {
#ifdef HAVE_EVENT_PORTS
&evportops,
#endif
#ifdef HAVE_WORKING_KQUEUE
&kqops,
#endif
#ifdef HAVE_EPOLL
&epollops,
#endif
#ifdef HAVE_DEVPOLL
&devpollops,
#endif
#ifdef HAVE_POLL
&pollops,
#endif
#ifdef HAVE_SELECT
&selectops,
#endif
#ifdef WIN32
&win32ops,
#endif
NULL
};
然后 libevent 根据系统配置和编译选项决定使用哪一种 I/O demultiplex 机制,这段代码
在函数 event_base_new()中:
base->evbase = NULL;
for (i = 0; eventops[i] && !base->evbase; i++) {
base->evsel = eventops[i];
base->evbase = base->evsel->init(base);
}
可以看出,libevent 在编译阶段选择系统的 I/O demultiplex 机制,而不支持在运行阶段
根据配置再次选择。以 Linux 下面的 epoll 为例,实现在源文件 epoll.c 中,eventops 对象 epollops 定义如下:
const struct eventop epollops = {
"epoll",
epoll_init,
epoll_add,
epoll_del,
epoll_dispatch,
epoll_dealloc,
1 /* need reinit */
};
变量 epollops 中的函数指针具体声明如下,注意到其返回值和参数都和 eventop 中的定
义严格一致,这是函数指针的语法限制。
static void *epoll_init (struct event_base *);
static int epoll_add (void *, struct event *);
static int epoll_del (void *, struct event *);
static int epoll_dispatch(struct event_base *, void *, struct timeval *);
static void epoll_dealloc (struct event_base *, void *);
那么如果选择的是 epoll,那么调用结构体 eventop 的 init 和 dispatch 函数指针时,实际
调用的函数就是 epoll 的初始化函数 epoll_init()和事件分发函数 epoll_dispatch()了;
关于 epoll 的具体用法这里就不多说了,可以参见介绍 epoll 的文章
C++语言提供了虚函数来实现多态,在 C 语言中,这是通过函数指针实现的。对于各类
函数指针的详细说明可以参见文章:
函数指针讲解
同样的,上面 epollops 以及 epoll 的各种函数都直接定义在了 epoll.c 源文件中,对外都
是不可见的。对于 libevent 的使用者而言,完全不会知道它们的存在,对 epoll 的使用也是
通过 eventop 来完成的,达到了信息隐藏的目的。
Libevent 本身不是多线程安全的,在多核的时代,如何能充分利用 CPU 的能力呢,这
一节来说说如何在多线程环境中使用 libevent,跟源代码并没有太大的关系,纯粹是使用上
的技巧。
在多核的 CPU 上只使用一个线程始终是对不起 CPU 的处理能力啊,那好吧,那就多创
建几个线程,比如下面的简单服务器场景。
1 主线程创建工作线程 1;
2 接着主线程监听在端口上,等待新的连接;
3 在线程 1 中执行 event 事件循环,等待事件到来;
4 新连接到来,主线程调用 libevent 接口 event_add 将新连接注册到 libevent 上;
......
上面的逻辑看起来没什么错误,在很多服务器设计中都可能用到主线程和工作线程的模
式…. 可是就在线程 1 注册事件时,主线程很可能也在操作事件,比如删除,修改,通过 libevent的源代码也能看到,没有同步保护机制,问题麻烦了,看起来不能这样做啊,难道只能使用单线程不成!?
Libevent 并不是线程安全的,但这不代表 libevent 不支持多线程模式,其实方法在前面
已经将 signal 事件处理时就接触到了,那就是消息通知机制。
一句话,“你发消息通知我,然后再由我在合适的时间来处理”;
说到这就再多说几句,再打个比方,把你自己比作一个工作线程,而你的头是主线程,
你有一个消息信箱来接收别人发给你的消息,当时头有个新任务要指派给你。
那么第一节中使用的多线程方法相当下面的流程:
1 当时你正在做事,比如在写文档;
2 你的头找到了一个任务,要指派给你,比如帮他搞个 PPT,哈;
3 头命令你马上搞 PPT,你这是不得不停止手头的工作,把 PPT 搞定了再接着写文档;
。。。。。。
那么基于纯粹的消息通知机制的多线程方式就像下面这样:
1 当时你正在写文档;
2 你的头找到了一个任务,要指派给你,帮他搞个 PPT;
3 头发个消息到你信箱,有个 PPT 要帮他搞定,这时你并不鸟他;
4 你写好文档,接着检查消息发现头有个 PPT 要你搞定,你开始搞 PPT;
。。。。。。
…
第一种的好处是消息可以立即得到处理,但是很方法很粗暴,你必须立即处理这个消息,
所以你必须处理好切换问题,省得把文档上的内容不小心写到 PPT 里。在操作系统的进程
通信中,消息队列(消息信箱)都是操作系统维护的,你不必关心。
第二种的优点是通过消息通知,切换问题省心了,不过消息是不能立即处理的(基于消
息通知机制,这个总是难免的),而且所有的内容都通过消息发送,比如 PPT 的格式、内容等等信息,这无疑增加了通信开销。
有个折中机制可以减少消息通信的开销,就是提取一个同步层,还拿上面的例子来说,
你把工作安排都存放在一个工作队列中,而且你能够保证“任何人把新任务扔到这个队列”,
“自己取出当前第一个任务”等这些操作都能够保证不会把队列搞乱(其实就是个加锁的队
列容器)。
再来看看处理过程和上面有什么不同:
1 当时你正在写文档;
2 你的头找到了一个任务,要指派给你,帮他搞个 PPT;
2 头有个 PPT 要你搞定,他把任务 push 到你的工作队列中,包括了 PPT 的格式、内容
等信息;
3 头发个消息(一个字节)到你信箱,有个 PPT 要帮他搞定,这时你并不鸟他;
4 你写好文档,发现有新消息(这预示着有新任务来了),检查工作队列知道头有个 PPT
要你搞定,你开始搞 PPT;
…
工作队列其实就是一个加锁的容器(队列、链表等等),这个很容易实现实现;而消息
通知仅需要一个字节,具体的任务都 push 到了在工作队列中,因此想比 2.2 减少了不少通
信开销。多线程编程有很多陷阱,线程间资源的同步互斥不是一两句能说得清的,而且出现 bug很难跟踪调试;这也有很多的经验和教训,因此如果让我选择,在绝大多数情况下都会选择机制 3 作为实现多线程的方法。
Memcached 中的网络部分就是基于 libevent 完成的,其中的多线程模型就是典型的消息
通知+同步层机制。下面的图足够说明其多线程模型了,其中有详细的文字说明。

前提
基于event和event_base已经可以写一个CS模型了。但是对于服务器端来说,仍然需要用户自行调用socket、bind、listen、accept等步骤。这个过程有点繁琐,为此在2.0.2alpha版本的Libevent推出了一些对应的封装函数。用户只需初始化struct sockaddr_in结构体变量,然后把它作为参数传给函数evconnlistener_new_bind即可。该函数会完成上面说到的那4个过程。下面的代码是一个使用例子。
用evconnlistener的原因
对于服务器端来说,仍然需要用户自行调用socket、bind、listen、accept等步骤。这个过程有点繁琐
使用例子
#include
#include
#include
#include
#include
#include
#include
#include
#include
void listener_cb(evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sock, int socklen, void *arg);
void socket_read_cb(bufferevent *bev, void *arg);
void socket_error_cb(bufferevent *bev, short events, void *arg);
int main()
{
evthread_use_pthreads();//enable threads
struct sockaddr_in sin;
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = htons(8989);
event_base *base = event_base_new();
evconnlistener *listener
= evconnlistener_new_bind(base, listener_cb, base,
LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_
FREE | LEV_OPT_THREADSAFE,
10, (struct sockaddr*)&sin,
sizeof(struct sockaddr_in));
event_base_dispatch(base);
evconnlistener_free(listener);
event_base_free(base);
return 0;
}
//有新的客户端连接到服务器
//当此函数被调用时,libevent已经帮我们accept了这个客户端。该客户端的
//文件描述符为fd
void listener_cb(evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sock, int socklen, void *arg)
{
event_base *base = (event_base*)arg;
//下面代码是为这个fd创建一个bufferevent
bufferevent *bev = bufferevent_socket_new(base, fd,
BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, socket_read_cb, NULL, socket_error_cb, NULL);
bufferevent_enable(bev, EV_READ | EV_PERSIST);
}
void socket_read_cb(bufferevent *bev, void *arg)
{
char msg[4096];
size_t len = bufferevent_read(bev, msg, sizeof(msg)-1 );
msg[len] = '\0';
printf("server read the data %s\n", msg);
char reply[] = "I has read your data";
bufferevent_write(bev, reply, strlen(reply) );
}
void socket_error_cb(bufferevent *bev, short events, void *arg)
{
if (events & BEV_EVENT_EOF)
printf("connection closed\n");
else if (events & BEV_EVENT_ERROR)
printf("some other error\n");
//这将自动close套接字和free读写缓冲区
bufferevent_free(bev);
}
上面的代码是一个服务器端的例子,客户端代码可以使用《Libevent使用例子,从简单到复杂》博文中的客户端。这里就不贴客户端代码了。
从上面代码可以看到,当服务器端监听到一个客户端的连接请求后,就会调用listener_cb这个回调函数。这个回调函数是在evconnlistener_new_bind函数中设置的。现在来看一下这个函数的参数有哪些,下面是其函数原型。
//listener.h文件
typedef void (*evconnlistener_cb)(struct evconnlistener *, evutil_socket_
t, struct sockaddr *, int socklen, void *);
struct evconnlistener *evconnlistener_new_bind(struct event_base *base,
evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
const struct sockaddr *sa, int socklen);
有一个参数是文件描述符fd。我们直接使用这个fd即可。真是方便。这个参数是可以为NULL的,此时用户并不能接收到客户端。当用户调用evconnlistener_set_cb函数设置回调函数后,就可以了。1)LEV_OPT_LEAVE_SOCKETS_BLOCKING:默认情况下,当连接监听器接收到新的客户端
socket连接后,会把该socket设置为非阻塞的。如果设置该选项,那么就把之客户端socket
保留为阻塞的
2)LEV_OPT_CLOSE_ON_FREE:当连接监听器释放时,会自动关闭底层的socket
3)LEV_OPT_CLOSE_ON_EXEC:为底层的socket设置closeonexec标志
4)LEV_OPT_REUSEABLE: 在某些平台,默认情况下当一个监听socket被关闭时,其他socket
不能马上绑定到同一个端口,要等一会儿才行。设置该标志后,Libevent会把该socket设置
成reuseable。这样,关闭该socket后,其他socket就能马上使用同一个端口
5)LEV_OPT_THREADSAFE:为连接监听器分配锁。这样可以确保线程安全