很多时候,除了响应事件之外,应用还希望做一定的数据缓冲。比如说,写入数据的时候 ,通常的运行模式是:
libevent为此IO机制提供了一种通用机制,即bufferevent。
bufferevent由一个底层的传输端口(如套接字 ),一个读取缓冲区和一个写入缓冲区组成。
与通常的事件在底层传输端口已经就绪,可以读取或者写入的时候执行回调不同的是,bufferevent 在读取或者写入了足够量的数据之后调用用户提供的回调。
有多种共享公用接口的bufferevent类型,编写本文时已存在以下类型:
关于evbuffer
每个 bufferevent 都有一个输入缓冲区和一个输出缓冲区 ,它们的类型都是“struct evbuffer”。 数据要写入到 bufferevent 时,添加数据到输出缓冲区 ;bufferevent 中有数据供读取的时候,从输入缓冲区抽取(drain)数据。
evbuffer支持部分接口可以对缓冲区进行操作。下面的内容会对其接口进行介绍
每个 bufferevent 有两个数据相关的回调:一个读取回调和一个写入回调。默认情况下,从底层传输端口读取了任意量的数据之后会调用读取回调 ;输出缓冲区中足够量的数据被清空到底层传输端口后写入回调会被调用。通过调整 bufferevent 的读取和写入 “水位 (watermarks )”可以覆盖这些函数的默认行为。
每个bufferevent有下面四个水位
创建 bufferevent 时可以使用一个或者多个标志修改其行为。可识别的标志有:
创建套接字
struct bufferevent * bufferevent_socket_new(
struct event_base *base,
evutil_socket_t fd,
enum bufferevent_options options);
参数:
int bufferevent_socket_connect(struct bufferevent *bev,struct sockaddr *address, int addrlen);
address 和 addrlen 参数跟标准调用 connect()的参数相同。如果还没有为 bufferevent 设置套接字,调用函数将为其分配一个新的流套接字,并且设置为非阻塞的。
如果已经为 bufferevent 设置套接字,调用bufferevent_socket_connect() 将告知 libevent 套接字还未连接,直到连接成功之前不应该对其进行读取或者写入操作。
连接完成之前可以向输出缓冲区添加数据。
实例
#include
#include
#include
#include
void eventcb(struct bufferevent *bev, short events, void *ptr)
{
if (events & BEV_EVENT_CONNECTED) {
/* connect to 127.0.0.1:8080. */
} else if (events & BEV_EVENT_ERROR) {//发生错误
}
}
int main_loop(void)
{
struct event_base *base;
struct bufferevent *bev;
struct sockaddr_in sin;
base = event_base_new();
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
sin.sin_port = htons(8080); /* Port 8080 */
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, NULL, NULL, eventcb, NULL);
if (bufferevent_socket_connect(bev,
(struct sockaddr *)&sin, sizeof(sin)) < 0) {
/* Error starting connection */
bufferevent_free(bev);
return -1;
}
event_base_dispatch(base); //x
return 0;
}
void bufferevent_free(struct bufferevent *bev);
这个函数释放 bufferevent。bufferevent 内部具有引用计数,所以,如果释放 时还有未决的延迟回调,则在回调完成之前 bufferevent 不会被删除。
typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
typedef void (*bufferevent_event_cb)(struct bufferevent *bev,short events, void *ctx);
void bufferevent_setcb(struct bufferevent *bufev,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg);
void bufferevent_getcb(struct bufferevent *bufev,
bufferevent_data_cb *readcb_ptr,
bufferevent_data_cb *writecb_ptr,
bufferevent_event_cb *eventcb_ptr,
void **cbarg_ptr);
struct evbuffer *bufferevent_get_input(struct bufferevent *bufev);
struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);
这两个函数提供了非常强大的基础 :它们分别返回输入和输出缓冲区 。关于可以对 evbuffer 类型进行的所有操作的完整信息。
如果写入操作因为数据量太少而停止(或者读取操作因为太多数据而停止 ),则向输出缓冲区添加数据(或者从输入缓冲区移除数据)将自动重启操作。
int bufferevent_write(struct bufferevent *bufev,
const void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev,
struct evbuffer *buf);
size_t bufferevent_read(struct bufferevent *bufev,
void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev,
struct evbuffer *buf);
实例
void read_callback_uppercase(struct bufferevent *bev, void *ctx)
{
char tmp[128];
size_t n;
int i;
while (1) {
n = bufferevent_read(bev, tmp, sizeof(tmp));
if (n <= 0)
break; /* No more data. */
for (i=0; i<n; ++i)
tmp[i] = toupper(tmp[i]);
bufferevent_write(bev, tmp, n);
}
}
struct proxy_info {
struct bufferevent *other_bev;
};
void read_callback_proxy(struct bufferevent *bev, void *ctx)
{
struct proxy_info *inf = ctx;
bufferevent_read_buffer(bev,
bufferevent_get_output(inf->other_bev));
}
struct count {
unsigned long last_fib[2];
};
void write_callback_fibonacci(struct bufferevent *bev, void *ctx)
{
struct count *c = ctx;
struct evbuffer *tmp = evbuffer_new();
while (evbuffer_get_length(tmp) < 1024) {
unsigned long next = c->last_fib[0] + c->last_fib[1];
c->last_fib[0] = c->last_fib[1];
c->last_fib[1] = next;
evbuffer_add_printf(tmp, "%lu", next);
}
bufferevent_write_buffer(bev, tmp);
evbuffer_free(tmp);
}
int bufferevent_flush(struct bufferevent *bufev,short iotype, enum bufferevent_flush_mode state);
libevent 的 evbuffer 实现了为向后面添加数据和从前面移除数据而优化的字节队列。
evbuffer 用于处理缓冲网络 IO 的“缓冲”部分。它不提供调度 IO 或者当 IO 就绪时触发IO 的功能:这是 bufferevent 的工作。
struct evbuffer *evbuffer_new(void);
void evbuffer_free(struct evbuffer *buf);
int evbuffer_enable_locking(struct evbuffer *buf, void *lock);
void evbuffer_lock(struct evbuffer *buf);
void evbuffer_unlock(struct evbuffer *buf);
size_t evbuffer_get_length(const struct evbuffer *buf);
int evbuffer_add(struct evbuffer *buf, const void *data, size_t datlen);
int evbuffer_add_printf(struct evbuffer *buf, const char *fmt, ...)
int evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap);
int evbuffer_expand(struct evbuffer *buf, size_t datlen);
实例
evbuffer_add(buf, "Hello world 2.0.1", 17);
evbuffer_add_printf(buf, "Hello %s %d.%d.%d", "world", 2, 0, 1);
为提高效率,libevent 具有将数据从一个evbuffer移动到另一个的优化函数。
int evbuffer_add_buffer(struct evbuffer *dst, struct evbuffer *src);
int evbuffer_remove_buffer(struct evbuffer *src,
struct evbuffer *dst,
size_t datlen);
evconnlistener 机制提供了监听和接受 TCP 连接的方法。
struct evconnlistener *
evconnlistener_new(struct event_base *base,
evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
evutil_socket_t fd);
struct evconnlistener *
evconnlistener_new_bind(struct event_base *base,
evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
const struct sockaddr *sa, int socklen);
释放监听器
void evconnlistener_free(struct evconnlistener *lev);
参数说明
注意:两个函数的不同在于如何建立监听套接字
flags参数
typedef void (*evconnlistener_cb)(struct evconnlistener *listener,
evutil_socket_t sock, struct sockaddr *addr, int len, void *ptr);
int evconnlistener_disable(struct evconnlistener *lev);
int evconnlistener_enable(struct evconnlistener *lev);
void evconnlistener_set_cb(struct evconnlistener *lev,
evconnlistener_cb cb, void *arg);
调整evconnlistener的回调函数和回调函数的参数
typedef void (*evconnlistener_errorcb)(struct evconnlistener *lis, void *ptr);
void evconnlistener_set_error_cb(struct evconnlistener *lev,
evconnlistener_errorcb errorcb);
libevent 可以记录内部错误和警告。如果编译进日志支持,还会记录调试信息。默认配置下
这些信息被写到stderr。通过提供定制的日志函数可以覆盖默认行为。
#define EVENT_LOG_DEBUG 0
#define EVENT_LOG_MSG 1
#define EVENT_LOG_WARN 2
#define EVENT_LOG_ERR 3
/* Deprecated; see note at the end of this section */
#define _EVENT_LOG_DEBUG EVENT_LOG_DEBUG
#define _EVENT_LOG_MSG EVENT_LOG_MSG
#define _EVENT_LOG_WARN EVENT_LOG_WARN
#define _EVENT_LOG_ERR EVENT_LOG_ERR
typedef void (*event_log_cb)(int severity, const char *msg);
void event_set_log_callback(event_log_cb cb);
配置实例
#include
#include
static void discard_cb(int severity, const char *msg)
{
/* do nothing */
}
static void write_to_file_cb(int severity, const char *msg)
{
const char *s;
if (!logfile)
return;
switch (severity) {
case _EVENT_LOG_DEBUG: s = "debug"; break;
case _EVENT_LOG_MSG: s = "msg"; break;
case _EVENT_LOG_WARN: s = "warn"; break;
case _EVENT_LOG_ERR: s = "error"; break;
default: s = "?"; break;
}
fprintf(logfile, "[%s] %s\n", s, msg);
}
void suppress_logging(void)
{
event_set_log_callback(discard_cb);
}
static FILE *logfile = NULL;
void set_logfile(FILE *f)
{
logfile = f;
event_set_log_callback(write_to_file_cb);
}
在用户提供的event_log_cb 回调函数中调用libevent 函数是不安全的。
默认情况下,libevent 使用C 库的内存管理函数在堆上分配内存。
通过提供malloc、realloc和free 的替代函数,可以让libevent 使用其他的内存管理器。
如果需要自定义内存管理函数,需要进行下面的操作
void event_set_mem_functions(void *(*malloc_fn)(size_t sz),
void *(*realloc_fn)(void *ptr, size_t sz),
void (*free_fn)(void *ptr));
实例
union alignment {
size_t sz;
void *ptr;
double dbl;
};
#define ALIGNMENT sizeof(union alignment)
#define OUTPTR(ptr) (((char*)ptr)+ALIGNMENT)
#define INPTR(ptr) (((char*)ptr)-ALIGNMENT)
static size_t total_allocated = 0;
static void *replacement_malloc(size_t sz)
{
void *chunk = malloc(sz + ALIGNMENT);
if (!chunk) return chunk;
total_allocated += sz;
*(size_t*)chunk = sz;
return OUTPTR(chunk);
}
static void *replacement_realloc(void *ptr, size_t sz)
{
size_t old_size = 0;
if (ptr) {
ptr = INPTR(ptr);
old_size = *(size_t*)ptr;
}
ptr = realloc(ptr, sz + ALIGNMENT);
if (!ptr)
return NULL;
*(size_t*)ptr = sz;
total_allocated = total_allocated - old_size + sz;
return OUTPTR(ptr);
}
static void replacement_free(void *ptr)
{
ptr = INPTR(ptr);
total_allocated -= *(size_t*)ptr;
free(ptr);
}
void start_counting_bytes(void)
{
event_set_mem_functions(replacement_malloc,
replacement_realloc,
replacement_free);
}
注意