• ZMQ之异步管家模式


            上文那种实现管家模式的方法比较简单,client还是简单海盗模式中的,仅仅是用API重写了一下。我在测试机上运行了程序,处理10万条请求大约需要14秒的时间,这和代码也有一些关系,因为复制消息帧的时间浪费了CPU处理时间。但真正的问题在于,我们总是逐个循环进行处理(round-trip),即发送-接收-发送-接收……ZMQ内部禁用了TCP发包优化算法(Nagle's algorithm),但逐个处理循环还是比较浪费。

            理论归理论,还是需要由实践来检验。我们用一个简单的测试程序来看看逐个处理循环是否真的耗时。这个测试程序会发送一组消息,第一次它发一条收一条,第二次则一起发送再一起接收。两次结果应该是一样的,但速度截然不同。

            tripping: Round-trip demonstrator in C

    1. //
    2. // Round-trip 模拟
    3. //
    4. // 本示例程序使用多线程的方式启动client、worker、以及代理,
    5. // 当client处理完毕时会发送信号给主程序。
    6. //
    7. #include "czmq.h"
    8. static void
    9. client_task (void *args, zctx_t *ctx, void *pipe)
    10. {
    11. void *client = zsocket_new (ctx, ZMQ_DEALER);
    12. zmq_setsockopt (client, ZMQ_IDENTITY, "C", 1);
    13. zsocket_connect (client, "tcp://localhost:5555");
    14. printf ("开始测试...\n");
    15. zclock_sleep (100);
    16. int requests;
    17. int64_t start;
    18. printf ("同步 round-trip 测试...\n");
    19. start = zclock_time ();
    20. for (requests = 0; requests < 10000; requests++) {
    21. zstr_send (client, "hello");
    22. char *reply = zstr_recv (client);
    23. free (reply);
    24. }
    25. printf (" %d 次/秒\n",
    26. (1000 * 10000) / (int) (zclock_time () - start));
    27. printf ("异步 round-trip 测试...\n");
    28. start = zclock_time ();
    29. for (requests = 0; requests < 100000; requests++)
    30. zstr_send (client, "hello");
    31. for (requests = 0; requests < 100000; requests++) {
    32. char *reply = zstr_recv (client);
    33. free (reply);
    34. }
    35. printf (" %d 次/秒\n",
    36. (1000 * 100000) / (int) (zclock_time () - start));
    37. zstr_send (pipe, "完成");
    38. }
    39. static void *
    40. worker_task (void *args)
    41. {
    42. zctx_t *ctx = zctx_new ();
    43. void *worker = zsocket_new (ctx, ZMQ_DEALER);
    44. zmq_setsockopt (worker, ZMQ_IDENTITY, "W", 1);
    45. zsocket_connect (worker, "tcp://localhost:5556");
    46. while (1) {
    47. zmsg_t *msg = zmsg_recv (worker);
    48. zmsg_send (&msg, worker);
    49. }
    50. zctx_destroy (&ctx);
    51. return NULL;
    52. }
    53. static void *
    54. broker_task (void *args)
    55. {
    56. // 准备上下文和套接字
    57. zctx_t *ctx = zctx_new ();
    58. void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    59. void *backend = zsocket_new (ctx, ZMQ_ROUTER);
    60. zsocket_bind (frontend, "tcp://*:5555");
    61. zsocket_bind (backend, "tcp://*:5556");
    62. // 初始化轮询对象
    63. zmq_pollitem_t items [] = {
    64. { frontend, 0, ZMQ_POLLIN, 0 },
    65. { backend, 0, ZMQ_POLLIN, 0 }
    66. };
    67. while (1) {
    68. int rc = zmq_poll (items, 2, -1);
    69. if (rc == -1)
    70. break; // 中断
    71. if (items [0].revents & ZMQ_POLLIN) {
    72. zmsg_t *msg = zmsg_recv (frontend);
    73. zframe_t *address = zmsg_pop (msg);
    74. zframe_destroy (&address);
    75. zmsg_pushstr (msg, "W");
    76. zmsg_send (&msg, backend);
    77. }
    78. if (items [1].revents & ZMQ_POLLIN) {
    79. zmsg_t *msg = zmsg_recv (backend);
    80. zframe_t *address = zmsg_pop (msg);
    81. zframe_destroy (&address);
    82. zmsg_pushstr (msg, "C");
    83. zmsg_send (&msg, frontend);
    84. }
    85. }
    86. zctx_destroy (&ctx);
    87. return NULL;
    88. }
    89. int main (void)
    90. {
    91. // 创建线程
    92. zctx_t *ctx = zctx_new ();
    93. void *client = zthread_fork (ctx, client_task, NULL);
    94. zthread_new (ctx, worker_task, NULL);
    95. zthread_new (ctx, broker_task, NULL);
    96. // 等待client端管道的信号
    97. char *signal = zstr_recv (client);
    98. free (signal);
    99. zctx_destroy (&ctx);
    100. return 0;
    101. }

            在我的开发环境中运行结果如下:

    1. Setting up test...
    2. Synchronous round-trip test...
    3. 9057 calls/second
    4. Asynchronous round-trip test...
    5. 173010 calls/second

            需要注意的是client在运行开始会暂停一段时间,这是因为在向ROUTER套接字发送消息时,若指定标识的套接字没有连接,那么ROUTER会直接丢弃该消息。这个示例中我们没有使用LRU算法,所以当worker连接速度稍慢时就有可能丢失数据,影响测试结果。

            我们可以看到,逐个处理循环比异步处理要慢将近20倍,让我们把它应用到管家模式中去。

            首先,让我们修改client的API,添加独立的发送和接收方法:

    1. mdcli_t *mdcli_new (char *broker);
    2. void mdcli_destroy (mdcli_t **self_p);
    3. int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
    4. zmsg_t *mdcli_recv (mdcli_t *self);

            然后花很短的时间就能将同步的client API改造成异步的API:

            mdcliapi2: Majordomo asynchronous client API in C

    1. /* =====================================================================
    2. mdcliapi2.c
    3. Majordomo Protocol Client API (async version)
    4. Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
    5. ---------------------------------------------------------------------
    6. Copyright (c) 1991-2011 iMatix Corporation
    7. Copyright other contributors as noted in the AUTHORS file.
    8. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
    9. This is free software; you can redistribute it and/or modify it under
    10. the terms of the GNU Lesser General Public License as published by
    11. the Free Software Foundation; either version 3 of the License, or (at
    12. your option) any later version.
    13. This software is distributed in the hope that it will be useful, but
    14. WITHOUT ANY WARRANTY; without even the implied warranty of
    15. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    16. Lesser General Public License for more details.
    17. You should have received a copy of the GNU Lesser General Public
    18. License along with this program. If not, see
    19. .
    20. =====================================================================
    21. */
    22. #include "mdcliapi2.h"
    23. // 类结构
    24. // 使用成员函数访问属性
    25. struct _mdcli_t {
    26. zctx_t *ctx; // 上下文
    27. char *broker;
    28. void *client; // 连接至代理的套接字
    29. int verbose; // 在标准输出打印运行状态
    30. int timeout; // 请求超时时间
    31. };
    32. // ---------------------------------------------------------------------
    33. // 连接或重连代理
    34. void s_mdcli_connect_to_broker (mdcli_t *self)
    35. {
    36. if (self->client)
    37. zsocket_destroy (self->ctx, self->client);
    38. self->client = zsocket_new (self->ctx, ZMQ_DEALER);
    39. zmq_connect (self->client, self->broker);
    40. if (self->verbose)
    41. zclock_log ("I: 正在连接代理 %s...", self->broker);
    42. }
    43. // ---------------------------------------------------------------------
    44. // 构造函数
    45. mdcli_t *
    46. mdcli_new (char *broker, int verbose)
    47. {
    48. assert (broker);
    49. mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
    50. self->ctx = zctx_new ();
    51. self->broker = strdup (broker);
    52. self->verbose = verbose;
    53. self->timeout = 2500; // 毫秒
    54. s_mdcli_connect_to_broker (self);
    55. return self;
    56. }
    57. // ---------------------------------------------------------------------
    58. // 析构函数
    59. void
    60. mdcli_destroy (mdcli_t **self_p)
    61. {
    62. assert (self_p);
    63. if (*self_p) {
    64. mdcli_t *self = *self_p;
    65. zctx_destroy (&self->ctx);
    66. free (self->broker);
    67. free (self);
    68. *self_p = NULL;
    69. }
    70. }
    71. // ---------------------------------------------------------------------
    72. // 设置请求超时时间
    73. void
    74. mdcli_set_timeout (mdcli_t *self, int timeout)
    75. {
    76. assert (self);
    77. self->timeout = timeout;
    78. }
    79. // ---------------------------------------------------------------------
    80. // 发送请求给代理
    81. // 取得请求消息的所有权,发送后销毁
    82. int
    83. mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
    84. {
    85. assert (self);
    86. assert (request_p);
    87. zmsg_t *request = *request_p;
    88. // 在消息顶部加入协议规定的帧
    89. // Frame 0: empty (模拟REQ套接字的行为)
    90. // Frame 1: "MDPCxy" (6个字节, MDP/Client x.y)
    91. // Frame 2: Service name (看打印字符串)
    92. zmsg_pushstr (request, service);
    93. zmsg_pushstr (request, MDPC_CLIENT);
    94. zmsg_pushstr (request, "");
    95. if (self->verbose) {
    96. zclock_log ("I: 发送请求给 '%s' 服务:", service);
    97. zmsg_dump (request);
    98. }
    99. zmsg_send (&request, self->client);
    100. return 0;
    101. }
    102. // ---------------------------------------------------------------------
    103. // 获取应答消息,若无则返回NULL;
    104. // 该函数不会尝试从代理的崩溃中恢复,
    105. // 因为我们没有记录那些未收到应答的请求,所以也无法重发。
    106. zmsg_t *
    107. mdcli_recv (mdcli_t *self)
    108. {
    109. assert (self);
    110. // 轮询套接字以获取应答
    111. zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } };
    112. int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
    113. if (rc == -1)
    114. return NULL; // 中断
    115. // 收到应答后进行处理
    116. if (items [0].revents & ZMQ_POLLIN) {
    117. zmsg_t *msg = zmsg_recv (self->client);
    118. if (self->verbose) {
    119. zclock_log ("I: received reply:");
    120. zmsg_dump (msg);
    121. }
    122. // 不要处理错误,直接报出
    123. assert (zmsg_size (msg) >= 4);
    124. zframe_t *empty = zmsg_pop (msg);
    125. assert (zframe_streq (empty, ""));
    126. zframe_destroy (&empty);
    127. zframe_t *header = zmsg_pop (msg);
    128. assert (zframe_streq (header, MDPC_CLIENT));
    129. zframe_destroy (&header);
    130. zframe_t *service = zmsg_pop (msg);
    131. zframe_destroy (&service);
    132. return msg; // Success
    133. }
    134. if (zctx_interrupted)
    135. printf ("W: 收到中断消息,正在中止client...\n");
    136. else
    137. if (self->verbose)
    138. zclock_log ("W: 严重错误,放弃请求");
    139. return NULL;
    140. }

            下面是对应的测试代码:

            mdclient2: Majordomo client application in C

    1. //
    2. // 异步管家模式 - client示例程序
    3. // 使用mdcli API隐藏MDP协议的具体实现
    4. //
    5. // 直接编译源码,而不创建类库
    6. #include "mdcliapi2.c"
    7. int main (int argc, char *argv [])
    8. {
    9. int verbose = (argc > 1 && streq (argv [1], "-v"));
    10. mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
    11. int count;
    12. for (count = 0; count < 100000; count++) {
    13. zmsg_t *request = zmsg_new ();
    14. zmsg_pushstr (request, "Hello world");
    15. mdcli_send (session, "echo", &request);
    16. }
    17. for (count = 0; count < 100000; count++) {
    18. zmsg_t *reply = mdcli_recv (session);
    19. if (reply)
    20. zmsg_destroy (&reply);
    21. else
    22. break; // 使用Ctrl-C中断
    23. }
    24. printf ("收到 %d 个应答\n", count);
    25. mdcli_destroy (&session);
    26. return 0;
    27. }

            代理和worker的代码没有变,因为我们并没有改变MDP协议。经过对client的改造,我们可以明显看到速度的提升。如以下是同步状况下处理10万条请求的时间:

    1. $ time mdclient
    2. 100000 requests/replies processed
    3. real 0m14.088s
    4. user 0m1.310s
    5. sys 0m2.670s

            以下是异步请求的情况:

    1. $ time mdclient2
    2. 100000 replies received
    3. real 0m8.730s
    4. user 0m0.920s
    5. sys 0m1.550s

            让我们建立10个worker,看看效果如何:

    1. $ time mdclient2
    2. 100000 replies received
    3. real 0m3.863s
    4. user 0m0.730s
    5. sys 0m0.470s

            由于worker获得消息需要通过LRU队列机制,所以并不能做到完全的异步。但是,worker越多其效果也会越好。在我的测试机上,当worker的数量达到8个时,速度就不再提升了——四核处理器只能做这么多。但是,我们仍然获得了近四倍的速度提升,而改造过程只有几分钟而已。此外,代理其实还没有进行优化,它仍会复制消息,而没有实现零拷贝。不过,我们已经做到每秒处理2.5万次请求-应答,已经很不错了。

            当然,异步的管家模式也并不完美,有一个显著的缺点:它无法从代理的崩溃中恢复。可以看到mdcliapi2的代码中并没有恢复连接的代码,重新连接需要有以下几点作为前提:

                    1、每个请求都做了编号,每次应答也含有相应的编号,这就需要修改协议,明确定义。

                    2、client的API需要保留并跟踪所有已发送、但仍未收到应答的请求。

                    3、如果代理发生崩溃,client会重发所有消息。

            可以看到,高可靠性往往和复杂度成正比,值得在管家模式中应用这一机制吗?这就要看应用场景了。如果是一个名称查询服务,每次会话会调用一次,那不需要应用这一机制;如果是一个位于前端的网页服务,有数千个客户端相连,那可能就需要了。

  • 相关阅读:
    革命性创新:RFID技术引领汽车零部件加工新时代
    c++模板小例子
    行高的继承和消除内外边距
    opencv基础: 视频,摄像头读取与保存的常用方法
    Pytorch CUDA CPP简易教程,在Windows上操作
    ▶《强化学习的数学原理》(2024春)_西湖大学赵世钰 Ch0 一张图讲完强化学习原理
    低成本简易信号幅值调节/信号叠加电路
    一篇文章掌握C++操作Access数据库
    JavaScript 63 JavaScript 对象 63.6 JavaScript 对象构造器
    图片划过缩放
  • 原文地址:https://blog.csdn.net/code_lyb/article/details/128117191