上文那种实现管家模式的方法比较简单,client还是简单海盗模式中的,仅仅是用API重写了一下。我在测试机上运行了程序,处理10万条请求大约需要14秒的时间,这和代码也有一些关系,因为复制消息帧的时间浪费了CPU处理时间。但真正的问题在于,我们总是逐个循环进行处理(round-trip),即发送-接收-发送-接收……ZMQ内部禁用了TCP发包优化算法(Nagle's algorithm),但逐个处理循环还是比较浪费。
tripping: Round-trip demonstrator in C
- //
- // Round-trip 模拟
- //
- // 本示例程序使用多线程的方式启动client、worker、以及代理,
- // 当client处理完毕时会发送信号给主程序。
- //
- #include "czmq.h"
- static void
- client_task (void *args, zctx_t *ctx, void *pipe)
- {
- void *client = zsocket_new (ctx, ZMQ_DEALER);
- zmq_setsockopt (client, ZMQ_IDENTITY, "C", 1);
- zsocket_connect (client, "tcp://localhost:5555");
- printf ("开始测试...\n");
- zclock_sleep (100);
- int requests;
- int64_t start;
- printf ("同步 round-trip 测试...\n");
- start = zclock_time ();
- for (requests = 0; requests < 10000; requests++) {
- zstr_send (client, "hello");
- char *reply = zstr_recv (client);
- free (reply);
- }
- printf (" %d 次/秒\n",
- (1000 * 10000) / (int) (zclock_time () - start));
- printf ("异步 round-trip 测试...\n");
- start = zclock_time ();
- for (requests = 0; requests < 100000; requests++)
- zstr_send (client, "hello");
- for (requests = 0; requests < 100000; requests++) {
- char *reply = zstr_recv (client);
- free (reply);
- }
- printf (" %d 次/秒\n",
- (1000 * 100000) / (int) (zclock_time () - start));
- zstr_send (pipe, "完成");
- }
- static void *
- worker_task (void *args)
- {
- zctx_t *ctx = zctx_new ();
- void *worker = zsocket_new (ctx, ZMQ_DEALER);
- zmq_setsockopt (worker, ZMQ_IDENTITY, "W", 1);
- zsocket_connect (worker, "tcp://localhost:5556");
- while (1) {
- zmsg_t *msg = zmsg_recv (worker);
- zmsg_send (&msg, worker);
- }
- zctx_destroy (&ctx);
- return NULL;
- }
- static void *
- broker_task (void *args)
- {
- // 准备上下文和套接字
- zctx_t *ctx = zctx_new ();
- void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
- void *backend = zsocket_new (ctx, ZMQ_ROUTER);
- zsocket_bind (frontend, "tcp://*:5555");
- zsocket_bind (backend, "tcp://*:5556");
- // 初始化轮询对象
- zmq_pollitem_t items [] = {
- { frontend, 0, ZMQ_POLLIN, 0 },
- { backend, 0, ZMQ_POLLIN, 0 }
- };
- while (1) {
- int rc = zmq_poll (items, 2, -1);
- if (rc == -1)
- break; // 中断
- if (items [0].revents & ZMQ_POLLIN) {
- zmsg_t *msg = zmsg_recv (frontend);
- zframe_t *address = zmsg_pop (msg);
- zframe_destroy (&address);
- zmsg_pushstr (msg, "W");
- zmsg_send (&msg, backend);
- }
- if (items [1].revents & ZMQ_POLLIN) {
- zmsg_t *msg = zmsg_recv (backend);
- zframe_t *address = zmsg_pop (msg);
- zframe_destroy (&address);
- zmsg_pushstr (msg, "C");
- zmsg_send (&msg, frontend);
- }
- }
- zctx_destroy (&ctx);
- return NULL;
- }
- int main (void)
- {
- // 创建线程
- zctx_t *ctx = zctx_new ();
- void *client = zthread_fork (ctx, client_task, NULL);
- zthread_new (ctx, worker_task, NULL);
- zthread_new (ctx, broker_task, NULL);
- // 等待client端管道的信号
- char *signal = zstr_recv (client);
- free (signal);
- zctx_destroy (&ctx);
- return 0;
- }
- Setting up test...
- Synchronous round-trip test...
- 9057 calls/second
- Asynchronous round-trip test...
- 173010 calls/second
- mdcli_t *mdcli_new (char *broker);
- void mdcli_destroy (mdcli_t **self_p);
- int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
- zmsg_t *mdcli_recv (mdcli_t *self);
然后花很短的时间就能将同步的client API改造成异步的API:
mdcliapi2: Majordomo asynchronous client API in C
- /* =====================================================================
- mdcliapi2.c
- Majordomo Protocol Client API (async version)
- Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
- ---------------------------------------------------------------------
- Copyright (c) 1991-2011 iMatix Corporation
- Copyright other contributors as noted in the AUTHORS file.
- This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
- This is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or (at
- your option) any later version.
- This software is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this program. If not, see
. - =====================================================================
- */
- #include "mdcliapi2.h"
- // 类结构
- // 使用成员函数访问属性
- struct _mdcli_t {
- zctx_t *ctx; // 上下文
- char *broker;
- void *client; // 连接至代理的套接字
- int verbose; // 在标准输出打印运行状态
- int timeout; // 请求超时时间
- };
- // ---------------------------------------------------------------------
- // 连接或重连代理
- void s_mdcli_connect_to_broker (mdcli_t *self)
- {
- if (self->client)
- zsocket_destroy (self->ctx, self->client);
- self->client = zsocket_new (self->ctx, ZMQ_DEALER);
- zmq_connect (self->client, self->broker);
- if (self->verbose)
- zclock_log ("I: 正在连接代理 %s...", self->broker);
- }
- // ---------------------------------------------------------------------
- // 构造函数
- mdcli_t *
- mdcli_new (char *broker, int verbose)
- {
- assert (broker);
- mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
- self->ctx = zctx_new ();
- self->broker = strdup (broker);
- self->verbose = verbose;
- self->timeout = 2500; // 毫秒
- s_mdcli_connect_to_broker (self);
- return self;
- }
- // ---------------------------------------------------------------------
- // 析构函数
- void
- mdcli_destroy (mdcli_t **self_p)
- {
- assert (self_p);
- if (*self_p) {
- mdcli_t *self = *self_p;
- zctx_destroy (&self->ctx);
- free (self->broker);
- free (self);
- *self_p = NULL;
- }
- }
- // ---------------------------------------------------------------------
- // 设置请求超时时间
- void
- mdcli_set_timeout (mdcli_t *self, int timeout)
- {
- assert (self);
- self->timeout = timeout;
- }
- // ---------------------------------------------------------------------
- // 发送请求给代理
- // 取得请求消息的所有权,发送后销毁
- int
- mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
- {
- assert (self);
- assert (request_p);
- zmsg_t *request = *request_p;
- // 在消息顶部加入协议规定的帧
- // Frame 0: empty (模拟REQ套接字的行为)
- // Frame 1: "MDPCxy" (6个字节, MDP/Client x.y)
- // Frame 2: Service name (看打印字符串)
- zmsg_pushstr (request, service);
- zmsg_pushstr (request, MDPC_CLIENT);
- zmsg_pushstr (request, "");
- if (self->verbose) {
- zclock_log ("I: 发送请求给 '%s' 服务:", service);
- zmsg_dump (request);
- }
- zmsg_send (&request, self->client);
- return 0;
- }
- // ---------------------------------------------------------------------
- // 获取应答消息,若无则返回NULL;
- // 该函数不会尝试从代理的崩溃中恢复,
- // 因为我们没有记录那些未收到应答的请求,所以也无法重发。
- zmsg_t *
- mdcli_recv (mdcli_t *self)
- {
- assert (self);
- // 轮询套接字以获取应答
- zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } };
- int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
- if (rc == -1)
- return NULL; // 中断
- // 收到应答后进行处理
- if (items [0].revents & ZMQ_POLLIN) {
- zmsg_t *msg = zmsg_recv (self->client);
- if (self->verbose) {
- zclock_log ("I: received reply:");
- zmsg_dump (msg);
- }
- // 不要处理错误,直接报出
- assert (zmsg_size (msg) >= 4);
- zframe_t *empty = zmsg_pop (msg);
- assert (zframe_streq (empty, ""));
- zframe_destroy (&empty);
- zframe_t *header = zmsg_pop (msg);
- assert (zframe_streq (header, MDPC_CLIENT));
- zframe_destroy (&header);
- zframe_t *service = zmsg_pop (msg);
- zframe_destroy (&service);
- return msg; // Success
- }
- if (zctx_interrupted)
- printf ("W: 收到中断消息,正在中止client...\n");
- else
- if (self->verbose)
- zclock_log ("W: 严重错误,放弃请求");
- return NULL;
- }
mdclient2: Majordomo client application in C
- //
- // 异步管家模式 - client示例程序
- // 使用mdcli API隐藏MDP协议的具体实现
- //
- // 直接编译源码,而不创建类库
- #include "mdcliapi2.c"
- int main (int argc, char *argv [])
- {
- int verbose = (argc > 1 && streq (argv [1], "-v"));
- mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
- int count;
- for (count = 0; count < 100000; count++) {
- zmsg_t *request = zmsg_new ();
- zmsg_pushstr (request, "Hello world");
- mdcli_send (session, "echo", &request);
- }
- for (count = 0; count < 100000; count++) {
- zmsg_t *reply = mdcli_recv (session);
- if (reply)
- zmsg_destroy (&reply);
- else
- break; // 使用Ctrl-C中断
- }
- printf ("收到 %d 个应答\n", count);
- mdcli_destroy (&session);
- return 0;
- }
- $ time mdclient
- 100000 requests/replies processed
- real 0m14.088s
- user 0m1.310s
- sys 0m2.670s
- $ time mdclient2
- 100000 replies received
- real 0m8.730s
- user 0m0.920s
- sys 0m1.550s
- $ time mdclient2
- 100000 replies received
- real 0m3.863s
- user 0m0.730s
- sys 0m0.470s