• ZMQ之面向服务的可靠队列(管家模式)


            管家模式协议(MDP)在扩展PPP协议时引入了一个有趣的特性:client发送的每一个请求都有一个“服务名称”,而worker在像队列装置注册时需要告知自己的服务类型。MDP的优势在于它来源于现实编程,协议简单,且容易提升。

            引入“服务名称”的机制,是对偏执海盗队列的一个简单补充,而结果是让其成为一个面向服务的代理。

            在实施管家模式之前,我们需要为client和worker编写一个框架。如果程序员可以通过简单的API来实现这种模式,那就没有必要让他们去了解管家模式的协议内容和实现方法了。
            所以,我们第一个协议(即管家模式协议)定义了分布式架构中节点是如何互相交互的,第二个协议则要定义应用程序应该如何通过框架来使用这一协议。
            管家模式有两个端点,客户端和服务端。因为我们要为client和worker都撰写框架,所以就需要提供两套API。以下是用简单的面向对象方法设计的client端API雏形,使用的是C语言的ZFL library

    1. mdcli_t *mdcli_new (char *broker);
    2. void mdcli_destroy (mdcli_t **self_p);
    3. zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);

            就这么简单。我们创建了一个会话来和代理通信,发送并接收一个请求,最后关闭连接。以下是worker端API的雏形。

    1. mdwrk_t *mdwrk_new (char *broker,char *service);
    2. void mdwrk_destroy (mdwrk_t **self_p);
    3. zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);

            上面两段代码看起来差不多,但是worker端API略有不同。worker第一次执行recv()后会传递一个空的应答,之后才传递当前的应答,并获得新的请求。

            两段的API都很容易开发,只需在偏执海盗模式代码的基础上修改即可。以下是client API:

            mdcliapi: Majordomo client API in C

    1. /* =====================================================================
    2. mdcliapi.c
    3. Majordomo Protocol Client API
    4. Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
    5. ---------------------------------------------------------------------
    6. Copyright (c) 1991-2011 iMatix Corporation
    7. Copyright other contributors as noted in the AUTHORS file.
    8. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
    9. This is free software; you can redistribute it and/or modify it under
    10. the terms of the GNU Lesser General Public License as published by
    11. the Free Software Foundation; either version 3 of the License, or (at
    12. your option) any later version.
    13. This software is distributed in the hope that it will be useful, but
    14. WITHOUT ANY WARRANTY; without even the implied warranty of
    15. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    16. Lesser General Public License for more details.
    17. You should have received a copy of the GNU Lesser General Public
    18. License along with this program. If not, see
    19. .
    20. =====================================================================
    21. */
    22. #include "mdcliapi.h"
    23. // 类结构
    24. // 我们会通过成员方法来访问这些属性
    25. struct _mdcli_t {
    26. zctx_t *ctx; // 上下文
    27. char *broker;
    28. void *client; // 连接至代理的套接字
    29. int verbose; // 使用标准输出打印当前活动
    30. int timeout; // 请求超时时间
    31. int retries; // 请求重试次数
    32. };
    33. // ---------------------------------------------------------------------
    34. // 连接或重连代理
    35. void s_mdcli_connect_to_broker (mdcli_t *self)
    36. {
    37. if (self->client)
    38. zsocket_destroy (self->ctx, self->client);
    39. self->client = zsocket_new (self->ctx, ZMQ_REQ);
    40. zmq_connect (self->client, self->broker);
    41. if (self->verbose)
    42. zclock_log ("I: 正在连接至代理 %s...", self->broker);
    43. }
    44. // ---------------------------------------------------------------------
    45. // 构造函数
    46. mdcli_t *
    47. mdcli_new (char *broker, int verbose)
    48. {
    49. assert (broker);
    50. mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
    51. self->ctx = zctx_new ();
    52. self->broker = strdup (broker);
    53. self->verbose = verbose;
    54. self->timeout = 2500; // 毫秒
    55. self->retries = 3; // 尝试次数
    56. s_mdcli_connect_to_broker (self);
    57. return self;
    58. }
    59. // ---------------------------------------------------------------------
    60. // 析构函数
    61. void
    62. mdcli_destroy (mdcli_t **self_p)
    63. {
    64. assert (self_p);
    65. if (*self_p) {
    66. mdcli_t *self = *self_p;
    67. zctx_destroy (&self->ctx);
    68. free (self->broker);
    69. free (self);
    70. *self_p = NULL;
    71. }
    72. }
    73. // ---------------------------------------------------------------------
    74. // 设定请求超时时间
    75. void
    76. mdcli_set_timeout (mdcli_t *self, int timeout)
    77. {
    78. assert (self);
    79. self->timeout = timeout;
    80. }
    81. // ---------------------------------------------------------------------
    82. // 设定请求重试次数
    83. void
    84. mdcli_set_retries (mdcli_t *self, int retries)
    85. {
    86. assert (self);
    87. self->retries = retries;
    88. }
    89. // ---------------------------------------------------------------------
    90. // 向代理发送请求,并尝试获取应答;
    91. // 对消息保持所有权,发送后销毁;
    92. // 返回应答消息,或NULL。
    93. zmsg_t *
    94. mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
    95. {
    96. assert (self);
    97. assert (request_p);
    98. zmsg_t *request = *request_p;
    99. // 用协议前缀包装消息
    100. // Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
    101. // Frame 2: 服务名称 (可打印字符串)
    102. zmsg_pushstr (request, service);
    103. zmsg_pushstr (request, MDPC_CLIENT);
    104. if (self->verbose) {
    105. zclock_log ("I: 发送请求给 '%s' 服务:", service);
    106. zmsg_dump (request);
    107. }
    108. int retries_left = self->retries;
    109. while (retries_left && !zctx_interrupted) {
    110. zmsg_t *msg = zmsg_dup (request);
    111. zmsg_send (&msg, self->client);
    112. while (TRUE) {
    113. // 轮询套接字以接收应答,有超时时间
    114. zmq_pollitem_t items [] = {
    115. { self->client, 0, ZMQ_POLLIN, 0 } };
    116. int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
    117. if (rc == -1)
    118. break; // 中断
    119. // 收到应答后进行处理
    120. if (items [0].revents & ZMQ_POLLIN) {
    121. zmsg_t *msg = zmsg_recv (self->client);
    122. if (self->verbose) {
    123. zclock_log ("I: received reply:");
    124. zmsg_dump (msg);
    125. }
    126. // 不要尝试处理错误,直接报错即可
    127. assert (zmsg_size (msg) >= 3);
    128. zframe_t *header = zmsg_pop (msg);
    129. assert (zframe_streq (header, MDPC_CLIENT));
    130. zframe_destroy (&header);
    131. zframe_t *reply_service = zmsg_pop (msg);
    132. assert (zframe_streq (reply_service, service));
    133. zframe_destroy (&reply_service);
    134. zmsg_destroy (&request);
    135. return msg; // 成功
    136. }
    137. else
    138. if (--retries_left) {
    139. if (self->verbose)
    140. zclock_log ("W: no reply, reconnecting...");
    141. // 重连并重发消息
    142. s_mdcli_connect_to_broker (self);
    143. zmsg_t *msg = zmsg_dup (request);
    144. zmsg_send (&msg, self->client);
    145. }
    146. else {
    147. if (self->verbose)
    148. zclock_log ("W: 发生严重错误,放弃重试。");
    149. break; // 放弃
    150. }
    151. }
    152. }
    153. if (zctx_interrupted)
    154. printf ("W: 收到中断消息,结束client进程...\n");
    155. zmsg_destroy (&request);
    156. return NULL;
    157. }

            以下测试程序会执行10万次请求应答:

            mdclient: Majordomo client application in C

    1. //
    2. // 管家模式协议 - 客户端示例
    3. // 使用mdcli API隐藏管家模式协议的内部实现
    4. //
    5. // 让我们直接编译这段代码,不生成类库
    6. #include "mdcliapi.c"
    7. int main (int argc, char *argv [])
    8. {
    9. int verbose = (argc > 1 && streq (argv [1], "-v"));
    10. mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
    11. int count;
    12. for (count = 0; count < 100000; count++) {
    13. zmsg_t *request = zmsg_new ();
    14. zmsg_pushstr (request, "Hello world");
    15. zmsg_t *reply = mdcli_send (session, "echo", &request);
    16. if (reply)
    17. zmsg_destroy (&reply);
    18. else
    19. break; // 中断或停止
    20. }
    21. printf ("已处理 %d 次请求-应答\n", count);
    22. mdcli_destroy (&session);
    23. return 0;
    24. }

            下面是worker的API:

            mdwrkapi: Majordomo worker API in C

    1. /* =====================================================================
    2. mdwrkapi.c
    3. Majordomo Protocol Worker API
    4. Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
    5. ---------------------------------------------------------------------
    6. Copyright (c) 1991-2011 iMatix Corporation
    7. Copyright other contributors as noted in the AUTHORS file.
    8. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
    9. This is free software; you can redistribute it and/or modify it under
    10. the terms of the GNU Lesser General Public License as published by
    11. the Free Software Foundation; either version 3 of the License, or (at
    12. your option) any later version.
    13. This software is distributed in the hope that it will be useful, but
    14. WITHOUT ANY WARRANTY; without even the implied warranty of
    15. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    16. Lesser General Public License for more details.
    17. You should have received a copy of the GNU Lesser General Public
    18. License along with this program. If not, see
    19. .
    20. =====================================================================
    21. */
    22. #include "mdwrkapi.h"
    23. // 可靠性参数
    24. #define HEARTBEAT_LIVENESS 3 // 合理值:3-5
    25. // 类结构
    26. // 使用成员函数访问属性
    27. struct _mdwrk_t {
    28. zctx_t *ctx; // 上下文
    29. char *broker;
    30. char *service;
    31. void *worker; // 连接至代理的套接字
    32. int verbose; // 使用标准输出打印活动
    33. // 心跳设置
    34. uint64_t heartbeat_at; // 发送心跳的时间
    35. size_t liveness; // 尝试次数
    36. int heartbeat; // 心跳延时,单位:毫秒
    37. int reconnect; // 重连延时,单位:毫秒
    38. // 内部状态
    39. int expect_reply; // 初始值为0
    40. // 应答地址,如果存在的话
    41. zframe_t *reply_to;
    42. };
    43. // ---------------------------------------------------------------------
    44. // 发送消息给代理
    45. // 如果没有提供消息,则内部创建一个
    46. static void
    47. s_mdwrk_send_to_broker (mdwrk_t *self, char *command, char *option,
    48. zmsg_t *msg)
    49. {
    50. msg = msg? zmsg_dup (msg): zmsg_new ();
    51. // 将协议信封压入消息顶部
    52. if (option)
    53. zmsg_pushstr (msg, option);
    54. zmsg_pushstr (msg, command);
    55. zmsg_pushstr (msg, MDPW_WORKER);
    56. zmsg_pushstr (msg, "");
    57. if (self->verbose) {
    58. zclock_log ("I: sending %s to broker",
    59. mdps_commands [(int) *command]);
    60. zmsg_dump (msg);
    61. }
    62. zmsg_send (&msg, self->worker);
    63. }
    64. // ---------------------------------------------------------------------
    65. // 连接或重连代理
    66. void s_mdwrk_connect_to_broker (mdwrk_t *self)
    67. {
    68. if (self->worker)
    69. zsocket_destroy (self->ctx, self->worker);
    70. self->worker = zsocket_new (self->ctx, ZMQ_DEALER);
    71. zmq_connect (self->worker, self->broker);
    72. if (self->verbose)
    73. zclock_log ("I: 正在连接代理 %s...", self->broker);
    74. // 向代理注册服务类型
    75. s_mdwrk_send_to_broker (self, MDPW_READY, self->service, NULL);
    76. // 当心跳健康度为零,表示代理已断开连接
    77. self->liveness = HEARTBEAT_LIVENESS;
    78. self->heartbeat_at = zclock_time () + self->heartbeat;
    79. }
    80. // ---------------------------------------------------------------------
    81. // 构造函数
    82. mdwrk_t *
    83. mdwrk_new (char *broker,char *service, int verbose)
    84. {
    85. assert (broker);
    86. assert (service);
    87. mdwrk_t *self = (mdwrk_t *) zmalloc (sizeof (mdwrk_t));
    88. self->ctx = zctx_new ();
    89. self->broker = strdup (broker);
    90. self->service = strdup (service);
    91. self->verbose = verbose;
    92. self->heartbeat = 2500; // 毫秒
    93. self->reconnect = 2500; // 毫秒
    94. s_mdwrk_connect_to_broker (self);
    95. return self;
    96. }
    97. // ---------------------------------------------------------------------
    98. // 析构函数
    99. void
    100. mdwrk_destroy (mdwrk_t **self_p)
    101. {
    102. assert (self_p);
    103. if (*self_p) {
    104. mdwrk_t *self = *self_p;
    105. zctx_destroy (&self->ctx);
    106. free (self->broker);
    107. free (self->service);
    108. free (self);
    109. *self_p = NULL;
    110. }
    111. }
    112. // ---------------------------------------------------------------------
    113. // 设置心跳延迟
    114. void
    115. mdwrk_set_heartbeat (mdwrk_t *self, int heartbeat)
    116. {
    117. self->heartbeat = heartbeat;
    118. }
    119. // ---------------------------------------------------------------------
    120. // 设置重连延迟
    121. void
    122. mdwrk_set_reconnect (mdwrk_t *self, int reconnect)
    123. {
    124. self->reconnect = reconnect;
    125. }
    126. // ---------------------------------------------------------------------
    127. // 若有应答则发送给代理,并等待新的请求
    128. zmsg_t *
    129. mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p)
    130. {
    131. // 格式化并发送请求传入的应答
    132. assert (reply_p);
    133. zmsg_t *reply = *reply_p;
    134. assert (reply || !self->expect_reply);
    135. if (reply) {
    136. assert (self->reply_to);
    137. zmsg_wrap (reply, self->reply_to);
    138. s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply);
    139. zmsg_destroy (reply_p);
    140. }
    141. self->expect_reply = 1;
    142. while (TRUE) {
    143. zmq_pollitem_t items [] = {
    144. { self->worker, 0, ZMQ_POLLIN, 0 } };
    145. int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC);
    146. if (rc == -1)
    147. break; // 中断
    148. if (items [0].revents & ZMQ_POLLIN) {
    149. zmsg_t *msg = zmsg_recv (self->worker);
    150. if (!msg)
    151. break; // 中断
    152. if (self->verbose) {
    153. zclock_log ("I: 从代理处获得消息:");
    154. zmsg_dump (msg);
    155. }
    156. self->liveness = HEARTBEAT_LIVENESS;
    157. // 不要处理错误,直接报错即可
    158. assert (zmsg_size (msg) >= 3);
    159. zframe_t *empty = zmsg_pop (msg);
    160. assert (zframe_streq (empty, ""));
    161. zframe_destroy (&empty);
    162. zframe_t *header = zmsg_pop (msg);
    163. assert (zframe_streq (header, MDPW_WORKER));
    164. zframe_destroy (&header);
    165. zframe_t *command = zmsg_pop (msg);
    166. if (zframe_streq (command, MDPW_REQUEST)) {
    167. // 这里需要将消息中空帧之前的所有地址都保存起来,
    168. // 但在这里我们暂时只保存一个
    169. self->reply_to = zmsg_unwrap (msg);
    170. zframe_destroy (&command);
    171. return msg; // 处理请求
    172. }
    173. else
    174. if (zframe_streq (command, MDPW_HEARTBEAT))
    175. ; // 不对心跳做任何处理
    176. else
    177. if (zframe_streq (command, MDPW_DISCONNECT))
    178. s_mdwrk_connect_to_broker (self);
    179. else {
    180. zclock_log ("E: 消息不合法");
    181. zmsg_dump (msg);
    182. }
    183. zframe_destroy (&command);
    184. zmsg_destroy (&msg);
    185. }
    186. else
    187. if (--self->liveness == 0) {
    188. if (self->verbose)
    189. zclock_log ("W: 失去与代理的连接 - 正在重试...");
    190. zclock_sleep (self->reconnect);
    191. s_mdwrk_connect_to_broker (self);
    192. }
    193. // 适时地发送消息
    194. if (zclock_time () > self->heartbeat_at) {
    195. s_mdwrk_send_to_broker (self, MDPW_HEARTBEAT, NULL, NULL);
    196. self->heartbeat_at = zclock_time () + self->heartbeat;
    197. }
    198. }
    199. if (zctx_interrupted)
    200. printf ("W: 收到中断消息,中止worker...\n");
    201. return NULL;
    202. }

            以下测试程序实现了名为echo的服务:

            mdworker: Majordomo worker application in C

    1. //
    2. // 管家模式协议 - worker示例
    3. // 使用mdwrk API隐藏MDP协议的内部实现
    4. //
    5. // 让我们直接编译代码,而不创建类库
    6. #include "mdwrkapi.c"
    7. int main (int argc, char *argv [])
    8. {
    9. int verbose = (argc > 1 && streq (argv [1], "-v"));
    10. mdwrk_t *session = mdwrk_new (
    11. "tcp://localhost:5555", "echo", verbose);
    12. zmsg_t *reply = NULL;
    13. while (1) {
    14. zmsg_t *request = mdwrk_recv (session, &reply);
    15. if (request == NULL)
    16. break; // worker被中止
    17. reply = request; // echo服务……其实很复杂:)
    18. }
    19. mdwrk_destroy (&session);
    20. return 0;
    21. }

            几点说明:

                    1、API是单线程的,所以说worker不会再后台发送心跳,而这也是我们所期望的:如果worker应用程序停止了,心跳就会跟着中止,代理便会停止向该worker发送新的请求。

                    2、wroker API没有做回退算法的设置,因为这里不值得使用这一复杂的机制。

                    3、API没有提供任何报错机制,如果出现问题,它会直接报断言(或异常,依语言而定)。这一做法对实验性的编程是有用的,这样可以立刻看到执行结果。但在真实编程环境中,API应该足够健壮,合适地处理非法消息。

            也许你会问,worker API为什么要关闭它的套接字并新开一个呢?特别是ZMQ是有重连机制的,能够在节点归来后进行重连。我们可以回顾一下简单海盗模式中的worker,以及偏执海盗模式中的worker来加以理解。ZMQ确实会进行自动重连,但如果代理死亡并重连,worker并不会重新进行注册。这个问题有两种解决方案:一是我们这里用到的较为简便的方案,即当worker判断代理已经死亡时,关闭它的套接字并重头来过;另一个方案是当代理收到未知worker的心跳时要求该worker对其提供的服务类型进行注册,这样一来就需要在协议中说明这一规则。

            下面让我们设计管家模式的代理,它的核心代码是一组队列,每种服务对应一个队列。我们会在worker出现时创建相应的队列(worker消失时应该销毁对应的队列,不过我们这里暂时不考虑)。额外的,我们会为每种服务维护一个worker的队列。

            为了让C语言代码更为易读易写,我使用了ZFL项目提供的哈希和链表容器,并命名为zhashzlist。如果使用现代语言编写,那自然可以使用其内置的容器。

    1. //
    2. // 管家模式协议 - 代理
    3. // 协议 http://rfc.zeromq.org/spec:7 和 spec:8 的最简实现
    4. //
    5. #include "czmq.h"
    6. #include "mdp.h"
    7. // 一般我们会从配置文件中获取以下值
    8. #define HEARTBEAT_LIVENESS 3 // 合理值:3-5
    9. #define HEARTBEAT_INTERVAL 2500 // 单位:毫秒
    10. #define HEARTBEAT_EXPIRY HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
    11. // 定义一个代理
    12. typedef struct {
    13. zctx_t *ctx; // 上下文
    14. void *socket; // 用于连接client和worker的套接字
    15. int verbose; // 使用标准输出打印活动信息
    16. char *endpoint; // 代理绑定到的端点
    17. zhash_t *services; // 已知服务的哈希表
    18. zhash_t *workers; // 已知worker的哈希表
    19. zlist_t *waiting; // 正在等待的worker队列
    20. uint64_t heartbeat_at; // 发送心跳的时间
    21. } broker_t;
    22. // 定义一个服务
    23. typedef struct {
    24. char *name; // 服务名称
    25. zlist_t *requests; // 客户端请求队列
    26. zlist_t *waiting; // 正在等待的worker队列
    27. size_t workers; // 可用worker数
    28. } service_t;
    29. // 定义一个worker,状态为空闲或占用
    30. typedef struct {
    31. char *identity; // worker的标识
    32. zframe_t *address; // 地址帧
    33. service_t *service; // 所属服务
    34. int64_t expiry; // 过期时间,从未收到心跳起计时
    35. } worker_t;
    36. // ---------------------------------------------------------------------
    37. // 代理使用的函数
    38. static broker_t *
    39. s_broker_new (int verbose);
    40. static void
    41. s_broker_destroy (broker_t **self_p);
    42. static void
    43. s_broker_bind (broker_t *self, char *endpoint);
    44. static void
    45. s_broker_purge_workers (broker_t *self);
    46. // 服务使用的函数
    47. static service_t *
    48. s_service_require (broker_t *self, zframe_t *service_frame);
    49. static void
    50. s_service_destroy (void *argument);
    51. static void
    52. s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg);
    53. static void
    54. s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg);
    55. // worker使用的函数
    56. static worker_t *
    57. s_worker_require (broker_t *self, zframe_t *address);
    58. static void
    59. s_worker_delete (broker_t *self, worker_t *worker, int disconnect);
    60. static void
    61. s_worker_destroy (void *argument);
    62. static void
    63. s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
    64. static void
    65. s_worker_send (broker_t *self, worker_t *worker, char *command,
    66. char *option, zmsg_t *msg);
    67. static void
    68. s_worker_waiting (broker_t *self, worker_t *worker);
    69. // 客户端使用的函数
    70. static void
    71. s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
    72. // ---------------------------------------------------------------------
    73. // 主程序
    74. int main (int argc, char *argv [])
    75. {
    76. int verbose = (argc > 1 && streq (argv [1], "-v"));
    77. broker_t *self = s_broker_new (verbose);
    78. s_broker_bind (self, "tcp://*:5555");
    79. // 接受并处理消息,直至程序被中止
    80. while (TRUE) {
    81. zmq_pollitem_t items [] = {
    82. { self->socket, 0, ZMQ_POLLIN, 0 } };
    83. int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
    84. if (rc == -1)
    85. break; // 中断
    86. // Process next input message, if any
    87. if (items [0].revents & ZMQ_POLLIN) {
    88. zmsg_t *msg = zmsg_recv (self->socket);
    89. if (!msg)
    90. break; // 中断
    91. if (self->verbose) {
    92. zclock_log ("I: 收到消息:");
    93. zmsg_dump (msg);
    94. }
    95. zframe_t *sender = zmsg_pop (msg);
    96. zframe_t *empty = zmsg_pop (msg);
    97. zframe_t *header = zmsg_pop (msg);
    98. if (zframe_streq (header, MDPC_CLIENT))
    99. s_client_process (self, sender, msg);
    100. else
    101. if (zframe_streq (header, MDPW_WORKER))
    102. s_worker_process (self, sender, msg);
    103. else {
    104. zclock_log ("E: 非法消息:");
    105. zmsg_dump (msg);
    106. zmsg_destroy (&msg);
    107. }
    108. zframe_destroy (&sender);
    109. zframe_destroy (&empty);
    110. zframe_destroy (&header);
    111. }
    112. // 断开并删除过期的worker
    113. // 适时地发送心跳给worker
    114. if (zclock_time () > self->heartbeat_at) {
    115. s_broker_purge_workers (self);
    116. worker_t *worker = (worker_t *) zlist_first (self->waiting);
    117. while (worker) {
    118. s_worker_send (self, worker, MDPW_HEARTBEAT, NULL, NULL);
    119. worker = (worker_t *) zlist_next (self->waiting);
    120. }
    121. self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
    122. }
    123. }
    124. if (zctx_interrupted)
    125. printf ("W: 收到中断消息,关闭中...\n");
    126. s_broker_destroy (&self);
    127. return 0;
    128. }
    129. // ---------------------------------------------------------------------
    130. // 代理对象的构造函数
    131. static broker_t *
    132. s_broker_new (int verbose)
    133. {
    134. broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
    135. // 初始化代理状态
    136. self->ctx = zctx_new ();
    137. self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
    138. self->verbose = verbose;
    139. self->services = zhash_new ();
    140. self->workers = zhash_new ();
    141. self->waiting = zlist_new ();
    142. self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
    143. return self;
    144. }
    145. // ---------------------------------------------------------------------
    146. // 代理对象的析构函数
    147. static void
    148. s_broker_destroy (broker_t **self_p)
    149. {
    150. assert (self_p);
    151. if (*self_p) {
    152. broker_t *self = *self_p;
    153. zctx_destroy (&self->ctx);
    154. zhash_destroy (&self->services);
    155. zhash_destroy (&self->workers);
    156. zlist_destroy (&self->waiting);
    157. free (self);
    158. *self_p = NULL;
    159. }
    160. }
    161. // ---------------------------------------------------------------------
    162. // 将代理套接字绑定至端点,可以重复调用该函数
    163. // 我们使用一个套接字来同时处理client和worker
    164. void
    165. s_broker_bind (broker_t *self, char *endpoint)
    166. {
    167. zsocket_bind (self->socket, endpoint);
    168. zclock_log ("I: MDP broker/0.1.1 is active at %s", endpoint);
    169. }
    170. // ---------------------------------------------------------------------
    171. // 删除空闲状态中过期的worker
    172. static void
    173. s_broker_purge_workers (broker_t *self)
    174. {
    175. worker_t *worker = (worker_t *) zlist_first (self->waiting);
    176. while (worker) {
    177. if (zclock_time () < worker->expiry)
    178. continue; // 该worker未过期,停止搜索
    179. if (self->verbose)
    180. zclock_log ("I: 正在删除过期的worker: %s",
    181. worker->identity);
    182. s_worker_delete (self, worker, 0);
    183. worker = (worker_t *) zlist_first (self->waiting);
    184. }
    185. }
    186. // ---------------------------------------------------------------------
    187. // 定位或创建新的服务项
    188. static service_t *
    189. s_service_require (broker_t *self, zframe_t *service_frame)
    190. {
    191. assert (service_frame);
    192. char *name = zframe_strdup (service_frame);
    193. service_t *service =
    194. (service_t *) zhash_lookup (self->services, name);
    195. if (service == NULL) {
    196. service = (service_t *) zmalloc (sizeof (service_t));
    197. service->name = name;
    198. service->requests = zlist_new ();
    199. service->waiting = zlist_new ();
    200. zhash_insert (self->services, name, service);
    201. zhash_freefn (self->services, name, s_service_destroy);
    202. if (self->verbose)
    203. zclock_log ("I: 收到消息:");
    204. }
    205. else
    206. free (name);
    207. return service;
    208. }
    209. // ---------------------------------------------------------------------
    210. // 当服务从broker->services中移除时销毁该服务对象
    211. static void
    212. s_service_destroy (void *argument)
    213. {
    214. service_t *service = (service_t *) argument;
    215. // 销毁请求队列中的所有项目
    216. while (zlist_size (service->requests)) {
    217. zmsg_t *msg = zlist_pop (service->requests);
    218. zmsg_destroy (&msg);
    219. }
    220. zlist_destroy (&service->requests);
    221. zlist_destroy (&service->waiting);
    222. free (service->name);
    223. free (service);
    224. }
    225. // ---------------------------------------------------------------------
    226. // 可能时,分发请求给等待中的worker
    227. static void
    228. s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg)
    229. {
    230. assert (service);
    231. if (msg) // 将消息加入队列
    232. zlist_append (service->requests, msg);
    233. s_broker_purge_workers (self);
    234. while (zlist_size (service->waiting)
    235. && zlist_size (service->requests))
    236. {
    237. worker_t *worker = zlist_pop (service->waiting);
    238. zlist_remove (self->waiting, worker);
    239. zmsg_t *msg = zlist_pop (service->requests);
    240. s_worker_send (self, worker, MDPW_REQUEST, NULL, msg);
    241. zmsg_destroy (&msg);
    242. }
    243. }
    244. // ---------------------------------------------------------------------
    245. // 使用8/MMI协定处理内部服务
    246. static void
    247. s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg)
    248. {
    249. char *return_code;
    250. if (zframe_streq (service_frame, "mmi.service")) {
    251. char *name = zframe_strdup (zmsg_last (msg));
    252. service_t *service =
    253. (service_t *) zhash_lookup (self->services, name);
    254. return_code = service && service->workers? "200": "404";
    255. free (name);
    256. }
    257. else
    258. return_code = "501";
    259. zframe_reset (zmsg_last (msg), return_code, strlen (return_code));
    260. // 移除并保存返回给client的信封,插入协议头信息和服务名称,并重新包装信封
    261. zframe_t *client = zmsg_unwrap (msg);
    262. zmsg_push (msg, zframe_dup (service_frame));
    263. zmsg_pushstr (msg, MDPC_CLIENT);
    264. zmsg_wrap (msg, client);
    265. zmsg_send (&msg, self->socket);
    266. }
    267. // ---------------------------------------------------------------------
    268. // 按需创建worker
    269. static worker_t *
    270. s_worker_require (broker_t *self, zframe_t *address)
    271. {
    272. assert (address);
    273. // self->workers使用wroker的标识为键
    274. char *identity = zframe_strhex (address);
    275. worker_t *worker =
    276. (worker_t *) zhash_lookup (self->workers, identity);
    277. if (worker == NULL) {
    278. worker = (worker_t *) zmalloc (sizeof (worker_t));
    279. worker->identity = identity;
    280. worker->address = zframe_dup (address);
    281. zhash_insert (self->workers, identity, worker);
    282. zhash_freefn (self->workers, identity, s_worker_destroy);
    283. if (self->verbose)
    284. zclock_log ("I: 正在注册新的worker: %s", identity);
    285. }
    286. else
    287. free (identity);
    288. return worker;
    289. }
    290. // ---------------------------------------------------------------------
    291. // 从所有数据结构中删除wroker,并销毁worker对象
    292. static void
    293. s_worker_delete (broker_t *self, worker_t *worker, int disconnect)
    294. {
    295. assert (worker);
    296. if (disconnect)
    297. s_worker_send (self, worker, MDPW_DISCONNECT, NULL, NULL);
    298. if (worker->service) {
    299. zlist_remove (worker->service->waiting, worker);
    300. worker->service->workers--;
    301. }
    302. zlist_remove (self->waiting, worker);
    303. // 以下方法间接调用了s_worker_destroy()方法
    304. zhash_delete (self->workers, worker->identity);
    305. }
    306. // ---------------------------------------------------------------------
    307. // 当worker从broker->workers中移除时,销毁worker对象
    308. static void
    309. s_worker_destroy (void *argument)
    310. {
    311. worker_t *worker = (worker_t *) argument;
    312. zframe_destroy (&worker->address);
    313. free (worker->identity);
    314. free (worker);
    315. }
    316. // ---------------------------------------------------------------------
    317. // 处理worker发送来的消息
    318. static void
    319. s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
    320. {
    321. assert (zmsg_size (msg) >= 1); // 消息中至少包含命令帧
    322. zframe_t *command = zmsg_pop (msg);
    323. char *identity = zframe_strhex (sender);
    324. int worker_ready = (zhash_lookup (self->workers, identity) != NULL);
    325. free (identity);
    326. worker_t *worker = s_worker_require (self, sender);
    327. if (zframe_streq (command, MDPW_READY)) {
    328. // 若worker队列中已有该worker,但仍收到了它的“已就绪”消息,则删除这个worker。
    329. if (worker_ready)
    330. s_worker_delete (self, worker, 1);
    331. else
    332. if (zframe_size (sender) >= 4 // 服务名称为保留的服务
    333. && memcmp (zframe_data (sender), "mmi.", 4) == 0)
    334. s_worker_delete (self, worker, 1);
    335. else {
    336. // 将worker对应到服务,并置为空闲状态
    337. zframe_t *service_frame = zmsg_pop (msg);
    338. worker->service = s_service_require (self, service_frame);
    339. worker->service->workers++;
    340. s_worker_waiting (self, worker);
    341. zframe_destroy (&service_frame);
    342. }
    343. }
    344. else
    345. if (zframe_streq (command, MDPW_REPLY)) {
    346. if (worker_ready) {
    347. // 移除并保存返回给client的信封,插入协议头信息和服务名称,并重新包装信封
    348. zframe_t *client = zmsg_unwrap (msg);
    349. zmsg_pushstr (msg, worker->service->name);
    350. zmsg_pushstr (msg, MDPC_CLIENT);
    351. zmsg_wrap (msg, client);
    352. zmsg_send (&msg, self->socket);
    353. s_worker_waiting (self, worker);
    354. }
    355. else
    356. s_worker_delete (self, worker, 1);
    357. }
    358. else
    359. if (zframe_streq (command, MDPW_HEARTBEAT)) {
    360. if (worker_ready)
    361. worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
    362. else
    363. s_worker_delete (self, worker, 1);
    364. }
    365. else
    366. if (zframe_streq (command, MDPW_DISCONNECT))
    367. s_worker_delete (self, worker, 0);
    368. else {
    369. zclock_log ("E: 非法消息");
    370. zmsg_dump (msg);
    371. }
    372. free (command);
    373. zmsg_destroy (&msg);
    374. }
    375. // ---------------------------------------------------------------------
    376. // 发送消息给worker
    377. // 如果指针指向了一条消息,发送它,但不销毁它,因为这是调用者的工作
    378. static void
    379. s_worker_send (broker_t *self, worker_t *worker, char *command,
    380. char *option, zmsg_t *msg)
    381. {
    382. msg = msg? zmsg_dup (msg): zmsg_new ();
    383. // 将协议信封压入消息顶部
    384. if (option)
    385. zmsg_pushstr (msg, option);
    386. zmsg_pushstr (msg, command);
    387. zmsg_pushstr (msg, MDPW_WORKER);
    388. // 在消息顶部插入路由帧
    389. zmsg_wrap (msg, zframe_dup (worker->address));
    390. if (self->verbose) {
    391. zclock_log ("I: 正在发送消息给worker %s",
    392. mdps_commands [(int) *command]);
    393. zmsg_dump (msg);
    394. }
    395. zmsg_send (&msg, self->socket);
    396. }
    397. // ---------------------------------------------------------------------
    398. // 正在等待的worker
    399. static void
    400. s_worker_waiting (broker_t *self, worker_t *worker)
    401. {
    402. // 将worker加入代理和服务的等待队列
    403. zlist_append (self->waiting, worker);
    404. zlist_append (worker->service->waiting, worker);
    405. worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
    406. s_service_dispatch (self, worker->service, NULL);
    407. }
    408. // ---------------------------------------------------------------------
    409. // 处理client发来的请求
    410. static void
    411. s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
    412. {
    413. assert (zmsg_size (msg) >= 2); // 服务名称 + 请求内容
    414. zframe_t *service_frame = zmsg_pop (msg);
    415. service_t *service = s_service_require (self, service_frame);
    416. // 为应答内容设置请求方的地址
    417. zmsg_wrap (msg, zframe_dup (sender));
    418. if (zframe_size (service_frame) >= 4
    419. && memcmp (zframe_data (service_frame), "mmi.", 4) == 0)
    420. s_service_internal (self, service_frame, msg);
    421. else
    422. s_service_dispatch (self, service, msg);
    423. zframe_destroy (&service_frame);
    424. }

            这个例子应该是我们见过最复杂的一个示例了,大约有500行代码。编写这段代码并让其变的健壮,大约花费了两天的时间。但是,这也仅仅是一个完整的面向服务代理的一部分。

            几点说明:

                    1、管家模式协议要求我们在一个套接字中同时处理client和worker,这一点对部署和管理代理很有益处:它只会在一个ZMQ端点上收发请求,而不是两个。

                    2、代理很好地实现了MDP/0.1协议中规范的内容,包括当代理发送非法命令和心跳时断开的机制。

                    3、可以将这段代码扩充为多线程,每个线程管理一个套接字、一组client和worker。这种做法在大型架构的拆分中显得很有趣。C语言代码已经是这样的格式了,因此很容易实现。

                    4、还可以将这段代码扩充为主备模式、双在线模式,进一步提高可靠性。因为从本质上来说,代理是无状态的,只是保存了服务的存在与否,因此client和worker可以自行选择除此之外的代理来进行通信。

                    5、示例代码中心跳的间隔为5秒,主要是为了减少调试时的输出。现实中的值应该设得低一些,但是,重试的过程应该设置得稍长一些,让服务有足够的时间启动,如10秒钟。

  • 相关阅读:
    【SpringBoot】静态资源导入探究
    企业为什么要数字化转型?数字化转型成功的案例有哪些?
    golang list 遍历
    QT软件开发: 获取CPU序列号、硬盘序列号、主板序列号 (采用wmic命令)
    3BHE003855R0001 UNS2882A 用于嵌入式/工业用途的人工智能盒
    Nginx:尚硅谷2022版Nginx笔记
    Spring源码分析(八)依赖注入源码解析1:autowire自动注入 和 @Autowired注入
    基于Java的厨艺交流平台设计与实现(源码+lw+部署文档+讲解等)
    [oeasy]python0024_ 输出时间_time_模块_module_函数_function
    Util 应用框架 UI 全新升级
  • 原文地址:https://blog.csdn.net/code_lyb/article/details/128117020