• ZMQ请求应答模式之无中间件的可靠性--自由者模式


    一、引言

            我们讲了那么多关于中间件的示例,好像有些违背“ZMQ是无中间件”的说法。但要知道在现实生活中,中间件一直是让人又爱又恨的东西。实践中的很多消息架构能都在使用中间件进行分布式架构的搭建,所以说最终的决定还是需要你自己去权衡的。这也是为什么虽然我能驾车10分钟到一个大型商场里购买五箱音量,但我还是会选择走10分钟到楼下的便利店里去买。这种出于经济方面的考虑(时间、精力、成本等)不仅在日常生活中很常见,在软件架构中也很重要。

            这就是为什么ZMQ不会强制使用带有中间件的架构,但仍提供了像内置装置这样的中间件供编程人员自由选用。

            这一节我们会打破以往使用中间件进行可靠性设计的架构,转而使用点对点架构,即自由者模式,来进行可靠的消息传输。我们的示例程序会是一个名称解析服务。ZMQ中的一个常见问题是:我们如何得知需要连接的端点?在代码中直接写入TCP/IP地址肯定是不合适的;使用配置文件会造成管理上的不便。试想一下,你要在上百台计算机中进行配置,只是为了让它们知道google.com的IP地址是74.125.230.82。

            一个ZMQ的名称解析服务需要实现的功能有:

                    1、将逻辑名称解析为一个或多个端点地址,包括绑定端和连接端。实际使用时,名称服务会提供一组端点。

                    2、允许我们在不同的环境下,即开发环境和生产环境,进行解析。

                    3、该服务必须是可靠的,否则应用程序将无法连接到网络。

            为管家模式提供名称解析服务会很有用,虽然将代理程序的端点对外暴露也很简单,但是如果用好名称解析服务,那它将成为唯一一个对外暴露的接口,将更便于管理。

            我们需要处理的故障类型有:服务崩溃或重启、服务过载、网络因素等。为获取可靠性,我们必须建立一个服务群,当某个服务端崩溃后,客户端可以连接其他的服务端。实践中,两个服务端就已经足够了,但事实上服务端的数量可以是任意个。

            在这个架构中,大量客户端和少量服务端进行通信,服务端将套接字绑定至单独的端口,这和管家模式中的代理有很大不同。对于客户端来说,它有这样几种选择:

                    1、客户端可以使用REQ套接字和懒惰海盗模式,但需要有一个机制防止客户端不断地请求已停止的服务端。

                    2、客户端可以使用DEALER套接字,向所有的服务端发送请求。很简单,但并不太妙;

                    3、客户端使用ROUTER套接字,连接特定的服务端。但客户端如何得知服务端的套接字标识呢?一种方式是让服务端主动连接客户端(很复杂),或者将服务端标识写入代码进行固化(很混乱)。

    二、模型一:简单重试

            让我们先尝试简单的方案,重写懒惰海盗模式,让其能够和多个服务端进行通信。启动服务端时用命令行参数指定端口。然后启动多个服务端。

            flserver1: Freelance server, Model One in C

    1. //
    2. // 自由者模式 - 服务端 - 模型1
    3. // 提供echo服务
    4. //
    5. #include "czmq.h"
    6. int main (int argc, char *argv [])
    7. {
    8. if (argc < 2) {
    9. printf ("I: syntax: %s \n", argv [0]);
    10. exit (EXIT_SUCCESS);
    11. }
    12. zctx_t *ctx = zctx_new ();
    13. void *server = zsocket_new (ctx, ZMQ_REP);
    14. zsocket_bind (server, argv [1]);
    15. printf ("I: echo服务端点: %s\n", argv [1]);
    16. while (TRUE) {
    17. zmsg_t *msg = zmsg_recv (server);
    18. if (!msg)
    19. break; // 中断
    20. zmsg_send (&msg, server);
    21. }
    22. if (zctx_interrupted)
    23. printf ("W: 中断\n");
    24. zctx_destroy (&ctx);
    25. return 0;
    26. }

            启动客户端,指定一个或多个端点:

            flclient1: Freelance client, Model One in C

    1. //
    2. // 自由者模式 - 客户端 - 模型1
    3. // 使用REQ套接字请求一个或多个服务端
    4. //
    5. #include "czmq.h"
    6. #define REQUEST_TIMEOUT 1000
    7. #define MAX_RETRIES 3 // 尝试次数
    8. static zmsg_t *
    9. s_try_request (zctx_t *ctx, char *endpoint, zmsg_t *request)
    10. {
    11. printf ("I: 在端点 %s 上尝试请求echo服务...\n", endpoint);
    12. void *client = zsocket_new (ctx, ZMQ_REQ);
    13. zsocket_connect (client, endpoint);
    14. // 发送请求,并等待应答
    15. zmsg_t *msg = zmsg_dup (request);
    16. zmsg_send (&msg, client);
    17. zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
    18. zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
    19. zmsg_t *reply = NULL;
    20. if (items [0].revents & ZMQ_POLLIN)
    21. reply = zmsg_recv (client);
    22. // 关闭套接字
    23. zsocket_destroy (ctx, client);
    24. return reply;
    25. }
    26. int main (int argc, char *argv [])
    27. {
    28. zctx_t *ctx = zctx_new ();
    29. zmsg_t *request = zmsg_new ();
    30. zmsg_addstr (request, "Hello world");
    31. zmsg_t *reply = NULL;
    32. int endpoints = argc - 1;
    33. if (endpoints == 0)
    34. printf ("I: syntax: %s ...\n", argv [0]);
    35. else
    36. if (endpoints == 1) {
    37. // 若只有一个端点,则尝试N次
    38. int retries;
    39. for (retries = 0; retries < MAX_RETRIES; retries++) {
    40. char *endpoint = argv [1];
    41. reply = s_try_request (ctx, endpoint, request);
    42. if (reply)
    43. break; // 成功
    44. printf ("W: 没有收到 %s 的应答, 准备重试...\n", endpoint);
    45. }
    46. }
    47. else {
    48. // 若有多个端点,则每个尝试一次
    49. int endpoint_nbr;
    50. for (endpoint_nbr = 0; endpoint_nbr < endpoints; endpoint_nbr++) {
    51. char *endpoint = argv [endpoint_nbr + 1];
    52. reply = s_try_request (ctx, endpoint, request);
    53. if (reply)
    54. break; // Successful
    55. printf ("W: 没有收到 %s 的应答\n", endpoint);
    56. }
    57. }
    58. if (reply)
    59. printf ("服务运作正常\n");
    60. zmsg_destroy (&request);
    61. zmsg_destroy (&reply);
    62. zctx_destroy (&ctx);
    63. return 0;
    64. }

            可用如下命令运行:

    1. flserver1 tcp://*:5555 &
    2. flserver1 tcp://*:5556 &
    3. flclient1 tcp://localhost:5555 tcp://localhost:5556

            客户端的核心机制是懒惰海盗模式,即获得一次成功的应答后就结束。会有两种情况:

                    1、如果只有一个服务端,客户端会再尝试N次后停止,这和懒惰海盗模式的逻辑一致。

                    2、如果有多个服务端,客户端会每个尝试一次,收到应答后停止。

            这种机制补充了海盗模式,使其能够克服只有一个服务端的情况。

            但是,这种设计无法在现实程序中使用:当有很多客户端连接了服务端,而主服务端崩溃了,那所有客户端都需要在超时后才能继续执行。

    三、模型二:批量发送

            下面让我们使用DEALER套接字。我们的目标是能再最短的时间里收到一个应答,不能受主服务端崩溃的影响。可以采取以下措施:

                    1、连接所有的服务端。

                    2、当有请求时,一次性发送给所有的服务端。

                    3、等待第一个应答。

                    4、忽略其他应答。

            这样设计客户端时,当发送请求后,所有的服务端都会收到这个请求,并返回应答。如果某个服务端断开连接了,ZMQ可能会将请求发给其他服务端,导致某些服务端会收到两次请求。

            更麻烦的是客户端无法得知应答的数量,容易发生混乱。

            我们可以为请求进行编号,忽略不匹配的应答。我们要对服务端进行改造,返回的消息中需要包含请求编号:
            flserver2: Freelance server, Model Two in C

    1. //
    2. // 自由者模式 - 服务端 - 模型2
    3. // 返回带有请求编号的OK信息
    4. //
    5. #include "czmq.h"
    6. int main (int argc, char *argv [])
    7. {
    8. if (argc < 2) {
    9. printf ("I: syntax: %s \n", argv [0]);
    10. exit (EXIT_SUCCESS);
    11. }
    12. zctx_t *ctx = zctx_new ();
    13. void *server = zsocket_new (ctx, ZMQ_REP);
    14. zsocket_bind (server, argv [1]);
    15. printf ("I: 服务已就绪 %s\n", argv [1]);
    16. while (TRUE) {
    17. zmsg_t *request = zmsg_recv (server);
    18. if (!request)
    19. break; // 中断
    20. // 判断请求内容是否正确
    21. assert (zmsg_size (request) == 2);
    22. zframe_t *address = zmsg_pop (request);
    23. zmsg_destroy (&request);
    24. zmsg_t *reply = zmsg_new ();
    25. zmsg_add (reply, address);
    26. zmsg_addstr (reply, "OK");
    27. zmsg_send (&reply, server);
    28. }
    29. if (zctx_interrupted)
    30. printf ("W: interrupted\n");
    31. zctx_destroy (&ctx);
    32. return 0;
    33. }

            客户端代码:

            flclient2: Freelance client, Model Two in C

    1. //
    2. // 自由者模式 - 客户端 - 模型2
    3. // 使用DEALER套接字发送批量消息
    4. //
    5. #include "czmq.h"
    6. // 超时时间
    7. #define GLOBAL_TIMEOUT 2500
    8. // 将客户端API封装成一个类
    9. #ifdef __cplusplus
    10. extern "C" {
    11. #endif
    12. // 声明类结构
    13. typedef struct _flclient_t flclient_t;
    14. flclient_t *
    15. flclient_new (void);
    16. void
    17. flclient_destroy (flclient_t **self_p);
    18. void
    19. flclient_connect (flclient_t *self, char *endpoint);
    20. zmsg_t *
    21. flclient_request (flclient_t *self, zmsg_t **request_p);
    22. #ifdef __cplusplus
    23. }
    24. #endif
    25. int main (int argc, char *argv [])
    26. {
    27. if (argc == 1) {
    28. printf ("I: syntax: %s ...\n", argv [0]);
    29. exit (EXIT_SUCCESS);
    30. }
    31. // 创建自由者模式客户端
    32. flclient_t *client = flclient_new ();
    33. // 连接至各个端点
    34. int argn;
    35. for (argn = 1; argn < argc; argn++)
    36. flclient_connect (client, argv [argn]);
    37. // 发送一组请求,并记录时间
    38. int requests = 10000;
    39. uint64_t start = zclock_time ();
    40. while (requests--) {
    41. zmsg_t *request = zmsg_new ();
    42. zmsg_addstr (request, "random name");
    43. zmsg_t *reply = flclient_request (client, &request);
    44. if (!reply) {
    45. printf ("E: 名称解析服务不可用,正在退出\n");
    46. break;
    47. }
    48. zmsg_destroy (&reply);
    49. }
    50. printf ("平均请求时间: %d 微秒\n",
    51. (int) (zclock_time () - start) / 10);
    52. flclient_destroy (&client);
    53. return 0;
    54. }
    55. // --------------------------------------------------------------------
    56. // 类结构
    57. struct _flclient_t {
    58. zctx_t *ctx; // 上下文
    59. void *socket; // 用于和服务端通信的DEALER套接字
    60. size_t servers; // 以连接的服务端数量
    61. uint sequence; // 已发送的请求数
    62. };
    63. // --------------------------------------------------------------------
    64. // Constructor
    65. flclient_t *
    66. flclient_new (void)
    67. {
    68. flclient_t
    69. *self;
    70. self = (flclient_t *) zmalloc (sizeof (flclient_t));
    71. self->ctx = zctx_new ();
    72. self->socket = zsocket_new (self->ctx, ZMQ_DEALER);
    73. return self;
    74. }
    75. // --------------------------------------------------------------------
    76. // 析构函数
    77. void
    78. flclient_destroy (flclient_t **self_p)
    79. {
    80. assert (self_p);
    81. if (*self_p) {
    82. flclient_t *self = *self_p;
    83. zctx_destroy (&self->ctx);
    84. free (self);
    85. *self_p = NULL;
    86. }
    87. }
    88. // --------------------------------------------------------------------
    89. // 连接至新的服务端端点
    90. void
    91. flclient_connect (flclient_t *self, char *endpoint)
    92. {
    93. assert (self);
    94. zsocket_connect (self->socket, endpoint);
    95. self->servers++;
    96. }
    97. // --------------------------------------------------------------------
    98. // 发送请求,接收应答
    99. // 发送后销毁请求
    100. zmsg_t *
    101. flclient_request (flclient_t *self, zmsg_t **request_p)
    102. {
    103. assert (self);
    104. assert (*request_p);
    105. zmsg_t *request = *request_p;
    106. // 向消息添加编号和空帧
    107. char sequence_text [10];
    108. sprintf (sequence_text, "%u", ++self->sequence);
    109. zmsg_pushstr (request, sequence_text);
    110. zmsg_pushstr (request, "");
    111. // 向所有已连接的服务端发送请求
    112. int server;
    113. for (server = 0; server < self->servers; server++) {
    114. zmsg_t *msg = zmsg_dup (request);
    115. zmsg_send (&msg, self->socket);
    116. }
    117. // 接收来自任何服务端的应答
    118. // 因为我们可能poll多次,所以每次都进行计算
    119. zmsg_t *reply = NULL;
    120. uint64_t endtime = zclock_time () + GLOBAL_TIMEOUT;
    121. while (zclock_time () < endtime) {
    122. zmq_pollitem_t items [] = { { self->socket, 0, ZMQ_POLLIN, 0 } };
    123. zmq_poll (items, 1, (endtime - zclock_time ()) * ZMQ_POLL_MSEC);
    124. if (items [0].revents & ZMQ_POLLIN) {
    125. // 应答内容是 [empty][sequence][OK]
    126. reply = zmsg_recv (self->socket);
    127. assert (zmsg_size (reply) == 3);
    128. free (zmsg_popstr (reply));
    129. char *sequence = zmsg_popstr (reply);
    130. int sequence_nbr = atoi (sequence);
    131. free (sequence);
    132. if (sequence_nbr == self->sequence)
    133. break;
    134. }
    135. }
    136. zmsg_destroy (request_p);
    137. return reply;
    138. }

            几点说明:

                    1、客户端被封装成了一个API类,将复杂的代码都包装了起来。

                    2、户端会在几秒之后放弃寻找可用的服务端。

                    3、客户端需要创建一个合法的REP信封,所以需要添加一个空帧。

            程序中,客户端发出了1万次名称解析请求(虽然是假的),并计算平均耗费时间。在我的测试机上,有一个服务端时,耗时60微妙;三个时80微妙。

            该模型的优缺点是:

                    1、优点:简单,容易理解和编写。

                    2、优点:它工作迅速,有重试机制。

                    3、缺点:占用了额外的网络带宽。

                    4、缺点:我们不能为服务端设置优先级,如主服务、次服务等。

                    5、缺点:服务端不能同时处理多个请求。

    四、模型三:Complex and Nasty

            批量发送模型看起来不太真实,那就让我们来探索最后这个极度复杂的模型。很有可能在编写完之后我们又会转而使用批量发送,哈哈,这就是我的作风。

            我们可以将客户端使用的套接字更换为ROUTER,让我们能够向特定的服务端发送请求,停止向已死亡的服务端发送请求,从而做得尽可能地智能。我们还可以将服务端的套接字更换为ROUTER,从而突破单线程的瓶颈。

            但是,使用ROUTER-ROUTER套接字连接两个瞬时套接字是不可行的,节点只有在收到第一条消息时才会为对方生成套接字标识。唯一的方法是让其中一个节点使用持久化的套接字,比较好的方式是让客户端知道服务端的标识,即服务端作为持久化的套接字。

            为了避免产生新的配置项,我们直接使用服务端的端点作为套接字标识。

            回想一下ZMQ套接字标识是如何工作的。服务端的ROUTER套接字为自己设置一个标识(在绑定之前),当客户端连接时,通过一个握手的过程来交换双方的标识。客户端的ROUTER套接字会先发送一条空消息,服务端为客户端生成一个随机的UUID。然后,服务端会向客户端发送自己的标识。

            这样一来,客户端就可以将消息发送给特定的服务端了。不过还有一个问题:我们不知道服务端会在什么时候完成这个握手的过程。如果服务端是在线的,那可能几毫秒就能完成。如果不在线,那可能需要很久很久。

            这里有一个矛盾:我们需要知道服务端何时连接成功且能够开始工作。自由者模式不像中间件模式,它的服务端必须要先发送请求后才能的应答。所以在服务端发送消息给客户端之前,客户端必须要先请求服务端,这看似是不可能的。

            我有一个解决方法,那就是批量发送。这里发送的不是真正的请求,而是一个试探性的心跳(PING-PONG)。当收到应答时,就说明对方是在线的。

            下面让我们制定一个协议,来定义自由者模式是如何传递这种心跳的:10/FLP | ZeroMQ RFC

            实现这个协议的服务端很方便,下面就是经过改造的echo服务:

            flserver3: Freelance server, Model Three in C

    1. //
    2. // 自由者模式 - 服务端 - 模型3
    3. // 使用ROUTER-ROUTER套接字进行通信;单线程。
    4. //
    5. #include "czmq.h"
    6. int main (int argc, char *argv [])
    7. {
    8. int verbose = (argc > 1 && streq (argv [1], "-v"));
    9. zctx_t *ctx = zctx_new ();
    10. // 准备服务端套接字,其标识和端点名相同
    11. char *bind_endpoint = "tcp://*:5555";
    12. char *connect_endpoint = "tcp://localhost:5555";
    13. void *server = zsocket_new (ctx, ZMQ_ROUTER);
    14. zmq_setsockopt (server,
    15. ZMQ_IDENTITY, connect_endpoint, strlen (connect_endpoint));
    16. zsocket_bind (server, bind_endpoint);
    17. printf ("I: 服务端已准备就绪 %s\n", bind_endpoint);
    18. while (!zctx_interrupted) {
    19. zmsg_t *request = zmsg_recv (server);
    20. if (verbose && request)
    21. zmsg_dump (request);
    22. if (!request)
    23. break; // 中断
    24. // Frame 0: 客户端标识
    25. // Frame 1: 心跳,或客户端控制信息帧
    26. // Frame 2: 请求内容
    27. zframe_t *address = zmsg_pop (request);
    28. zframe_t *control = zmsg_pop (request);
    29. zmsg_t *reply = zmsg_new ();
    30. if (zframe_streq (control, "PONG"))
    31. zmsg_addstr (reply, "PONG");
    32. else {
    33. zmsg_add (reply, control);
    34. zmsg_addstr (reply, "OK");
    35. }
    36. zmsg_destroy (&request);
    37. zmsg_push (reply, address);
    38. if (verbose && reply)
    39. zmsg_dump (reply);
    40. zmsg_send (&reply, server);
    41. }
    42. if (zctx_interrupted)
    43. printf ("W: 中断\n");
    44. zctx_destroy (&ctx);
    45. return 0;
    46. }

            但是,自由者模式的客户端会变得大一写。为了清晰期间,我们将其拆分为两个类来实现。首先是在上层使用的程序:

            flclient3: Freelance client, Model Three in C

    1. //
    2. // 自由者模式 - 客户端 - 模型3
    3. // 使用flcliapi类来封装自由者模式
    4. //
    5. // 直接编译,不建类库
    6. #include "flcliapi.c"
    7. int main (void)
    8. {
    9. // 创建自由者模式实例
    10. flcliapi_t *client = flcliapi_new ();
    11. // 链接至服务器端点
    12. flcliapi_connect (client, "tcp://localhost:5555");
    13. flcliapi_connect (client, "tcp://localhost:5556");
    14. flcliapi_connect (client, "tcp://localhost:5557");
    15. // 发送随机请求,计算时间
    16. int requests = 1000;
    17. uint64_t start = zclock_time ();
    18. while (requests--) {
    19. zmsg_t *request = zmsg_new ();
    20. zmsg_addstr (request, "random name");
    21. zmsg_t *reply = flcliapi_request (client, &request);
    22. if (!reply) {
    23. printf ("E: 名称解析服务不可用,正在退出\n");
    24. break;
    25. }
    26. zmsg_destroy (&reply);
    27. }
    28. printf ("平均执行时间: %d usec\n",
    29. (int) (zclock_time () - start) / 10);
    30. flcliapi_destroy (&client);
    31. return 0;
    32. }

            下面是该模式复杂的实现过程:

            flcliapi: Freelance client API in C

    1. /* =====================================================================
    2. flcliapi - Freelance Pattern agent class
    3. Model 3: uses ROUTER socket to address specific services
    4. ---------------------------------------------------------------------
    5. Copyright (c) 1991-2011 iMatix Corporation
    6. Copyright other contributors as noted in the AUTHORS file.
    7. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
    8. This is free software; you can redistribute it and/or modify it under
    9. the terms of the GNU Lesser General Public License as published by
    10. the Free Software Foundation; either version 3 of the License, or (at
    11. your option) any later version.
    12. This software is distributed in the hope that it will be useful, but
    13. WITHOUT ANY WARRANTY; without even the implied warranty of
    14. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    15. Lesser General Public License for more details.
    16. You should have received a copy of the GNU Lesser General Public
    17. License along with this program. If not, see
    18. .
    19. =====================================================================
    20. */
    21. #include "flcliapi.h"
    22. // 请求超时时间
    23. #define GLOBAL_TIMEOUT 3000 // msecs
    24. // 心跳间隔
    25. #define PING_INTERVAL 2000 // msecs
    26. // 判定服务死亡的时间
    27. #define SERVER_TTL 6000 // msecs
    28. // =====================================================================
    29. // 同步部分,在应用程序层面运行
    30. // ---------------------------------------------------------------------
    31. // 类结构
    32. struct _flcliapi_t {
    33. zctx_t *ctx; // 上下文
    34. void *pipe; // 用于和主线程通信的套接字
    35. };
    36. // 这是运行后台代理程序的线程
    37. static void flcliapi_agent (void *args, zctx_t *ctx, void *pipe);
    38. // ---------------------------------------------------------------------
    39. // 构造函数
    40. flcliapi_t *
    41. flcliapi_new (void)
    42. {
    43. flcliapi_t
    44. *self;
    45. self = (flcliapi_t *) zmalloc (sizeof (flcliapi_t));
    46. self->ctx = zctx_new ();
    47. self->pipe = zthread_fork (self->ctx, flcliapi_agent, NULL);
    48. return self;
    49. }
    50. // ---------------------------------------------------------------------
    51. // 析构函数
    52. void
    53. flcliapi_destroy (flcliapi_t **self_p)
    54. {
    55. assert (self_p);
    56. if (*self_p) {
    57. flcliapi_t *self = *self_p;
    58. zctx_destroy (&self->ctx);
    59. free (self);
    60. *self_p = NULL;
    61. }
    62. }
    63. // ---------------------------------------------------------------------
    64. // 连接至新服务器端点
    65. // 消息内容:[CONNECT][endpoint]
    66. void
    67. flcliapi_connect (flcliapi_t *self, char *endpoint)
    68. {
    69. assert (self);
    70. assert (endpoint);
    71. zmsg_t *msg = zmsg_new ();
    72. zmsg_addstr (msg, "CONNECT");
    73. zmsg_addstr (msg, endpoint);
    74. zmsg_send (&msg, self->pipe);
    75. zclock_sleep (100); // 等待连接
    76. }
    77. // ---------------------------------------------------------------------
    78. // 发送并销毁请求,接收应答
    79. zmsg_t *
    80. flcliapi_request (flcliapi_t *self, zmsg_t **request_p)
    81. {
    82. assert (self);
    83. assert (*request_p);
    84. zmsg_pushstr (*request_p, "REQUEST");
    85. zmsg_send (request_p, self->pipe);
    86. zmsg_t *reply = zmsg_recv (self->pipe);
    87. if (reply) {
    88. char *status = zmsg_popstr (reply);
    89. if (streq (status, "FAILED"))
    90. zmsg_destroy (&reply);
    91. free (status);
    92. }
    93. return reply;
    94. }
    95. // =====================================================================
    96. // 异步部分,在后台运行
    97. // ---------------------------------------------------------------------
    98. // 单个服务端信息
    99. typedef struct {
    100. char *endpoint; // 服务端端点/套接字标识
    101. uint alive; // 是否在线
    102. int64_t ping_at; // 下一次心跳时间
    103. int64_t expires; // 过期时间
    104. } server_t;
    105. server_t *
    106. server_new (char *endpoint)
    107. {
    108. server_t *self = (server_t *) zmalloc (sizeof (server_t));
    109. self->endpoint = strdup (endpoint);
    110. self->alive = 0;
    111. self->ping_at = zclock_time () + PING_INTERVAL;
    112. self->expires = zclock_time () + SERVER_TTL;
    113. return self;
    114. }
    115. void
    116. server_destroy (server_t **self_p)
    117. {
    118. assert (self_p);
    119. if (*self_p) {
    120. server_t *self = *self_p;
    121. free (self->endpoint);
    122. free (self);
    123. *self_p = NULL;
    124. }
    125. }
    126. int
    127. server_ping (char *key, void *server, void *socket)
    128. {
    129. server_t *self = (server_t *) server;
    130. if (zclock_time () >= self->ping_at) {
    131. zmsg_t *ping = zmsg_new ();
    132. zmsg_addstr (ping, self->endpoint);
    133. zmsg_addstr (ping, "PING");
    134. zmsg_send (&ping, socket);
    135. self->ping_at = zclock_time () + PING_INTERVAL;
    136. }
    137. return 0;
    138. }
    139. int
    140. server_tickless (char *key, void *server, void *arg)
    141. {
    142. server_t *self = (server_t *) server;
    143. uint64_t *tickless = (uint64_t *) arg;
    144. if (*tickless > self->ping_at)
    145. *tickless = self->ping_at;
    146. return 0;
    147. }
    148. // ---------------------------------------------------------------------
    149. // 后台处理程序信息
    150. typedef struct {
    151. zctx_t *ctx; // 上下文
    152. void *pipe; // 用于应用程序通信的套接字
    153. void *router; // 用于服务端通信的套接字
    154. zhash_t *servers; // 已连接的服务端
    155. zlist_t *actives; // 在线的服务端
    156. uint sequence; // 请求编号
    157. zmsg_t *request; // 当前请求
    158. zmsg_t *reply; // 当前应答
    159. int64_t expires; // 请求过期时间
    160. } agent_t;
    161. agent_t *
    162. agent_new (zctx_t *ctx, void *pipe)
    163. {
    164. agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
    165. self->ctx = ctx;
    166. self->pipe = pipe;
    167. self->router = zsocket_new (self->ctx, ZMQ_ROUTER);
    168. self->servers = zhash_new ();
    169. self->actives = zlist_new ();
    170. return self;
    171. }
    172. void
    173. agent_destroy (agent_t **self_p)
    174. {
    175. assert (self_p);
    176. if (*self_p) {
    177. agent_t *self = *self_p;
    178. zhash_destroy (&self->servers);
    179. zlist_destroy (&self->actives);
    180. zmsg_destroy (&self->request);
    181. zmsg_destroy (&self->reply);
    182. free (self);
    183. *self_p = NULL;
    184. }
    185. }
    186. // 当服务端从列表中移除时,回调该函数。
    187. static void
    188. s_server_free (void *argument)
    189. {
    190. server_t *server = (server_t *) argument;
    191. server_destroy (&server);
    192. }
    193. void
    194. agent_control_message (agent_t *self)
    195. {
    196. zmsg_t *msg = zmsg_recv (self->pipe);
    197. char *command = zmsg_popstr (msg);
    198. if (streq (command, "CONNECT")) {
    199. char *endpoint = zmsg_popstr (msg);
    200. printf ("I: connecting to %s...\n", endpoint);
    201. int rc = zmq_connect (self->router, endpoint);
    202. assert (rc == 0);
    203. server_t *server = server_new (endpoint);
    204. zhash_insert (self->servers, endpoint, server);
    205. zhash_freefn (self->servers, endpoint, s_server_free);
    206. zlist_append (self->actives, server);
    207. server->ping_at = zclock_time () + PING_INTERVAL;
    208. server->expires = zclock_time () + SERVER_TTL;
    209. free (endpoint);
    210. }
    211. else
    212. if (streq (command, "REQUEST")) {
    213. assert (!self->request); // 遵循请求-应答循环
    214. // 将请求编号和空帧加入消息顶部
    215. char sequence_text [10];
    216. sprintf (sequence_text, "%u", ++self->sequence);
    217. zmsg_pushstr (msg, sequence_text);
    218. // 获取请求消息的所有权
    219. self->request = msg;
    220. msg = NULL;
    221. // 设置请求过期时间
    222. self->expires = zclock_time () + GLOBAL_TIMEOUT;
    223. }
    224. free (command);
    225. zmsg_destroy (&msg);
    226. }
    227. void
    228. agent_router_message (agent_t *self)
    229. {
    230. zmsg_t *reply = zmsg_recv (self->router);
    231. // 第一帧是应答的服务端标识
    232. char *endpoint = zmsg_popstr (reply);
    233. server_t *server =
    234. (server_t *) zhash_lookup (self->servers, endpoint);
    235. assert (server);
    236. free (endpoint);
    237. if (!server->alive) {
    238. zlist_append (self->actives, server);
    239. server->alive = 1;
    240. }
    241. server->ping_at = zclock_time () + PING_INTERVAL;
    242. server->expires = zclock_time () + SERVER_TTL;
    243. // 第二帧是应答的编号
    244. char *sequence = zmsg_popstr (reply);
    245. if (atoi (sequence) == self->sequence) {
    246. zmsg_pushstr (reply, "OK");
    247. zmsg_send (&reply, self->pipe);
    248. zmsg_destroy (&self->request);
    249. }
    250. else
    251. zmsg_destroy (&reply);
    252. }
    253. // ---------------------------------------------------------------------
    254. // 异步的后台代理会维护一个服务端池,处理请求和应答。
    255. static void
    256. flcliapi_agent (void *args, zctx_t *ctx, void *pipe)
    257. {
    258. agent_t *self = agent_new (ctx, pipe);
    259. zmq_pollitem_t items [] = {
    260. { self->pipe, 0, ZMQ_POLLIN, 0 },
    261. { self->router, 0, ZMQ_POLLIN, 0 }
    262. };
    263. while (!zctx_interrupted) {
    264. // 计算超时时间
    265. uint64_t tickless = zclock_time () + 1000 * 3600;
    266. if (self->request
    267. && tickless > self->expires)
    268. tickless = self->expires;
    269. zhash_foreach (self->servers, server_tickless, &tickless);
    270. int rc = zmq_poll (items, 2,
    271. (tickless - zclock_time ()) * ZMQ_POLL_MSEC);
    272. if (rc == -1)
    273. break; // 上下文对象被关闭
    274. if (items [0].revents & ZMQ_POLLIN)
    275. agent_control_message (self);
    276. if (items [1].revents & ZMQ_POLLIN)
    277. agent_router_message (self);
    278. // 如果我们需要处理一项请求,将其发送给下一个可用的服务端
    279. if (self->request) {
    280. if (zclock_time () >= self->expires) {
    281. // 请求超时
    282. zstr_send (self->pipe, "FAILED");
    283. zmsg_destroy (&self->request);
    284. }
    285. else {
    286. // 寻找可用的服务端
    287. while (zlist_size (self->actives)) {
    288. server_t *server =
    289. (server_t *) zlist_first (self->actives);
    290. if (zclock_time () >= server->expires) {
    291. zlist_pop (self->actives);
    292. server->alive = 0;
    293. }
    294. else {
    295. zmsg_t *request = zmsg_dup (self->request);
    296. zmsg_pushstr (request, server->endpoint);
    297. zmsg_send (&request, self->router);
    298. break;
    299. }
    300. }
    301. }
    302. }
    303. // 断开并删除已过期的服务端
    304. // 发送心跳给空闲服务器
    305. zhash_foreach (self->servers, server_ping, self->router);
    306. }
    307. agent_destroy (&self);
    308. }

  • 相关阅读:
    路由vue-route的使用
    【微信开发第四章】SpringBoot实现微信H5支付
    git基本命令
    看万山红遍
    [附源码]计算机毕业设计springboot第三方游戏零售平台
    Listener(监听器)-ServletContextListener
    接口自动化之测试数据动态生成并替换
    JAVA代码 企业人力资源管理系统(详细带截图) 毕业设计
    Unity UI Toolkit学习笔记-EditorWindow
    想当领导,有哪些能力是需要培训的?
  • 原文地址:https://blog.csdn.net/code_lyb/article/details/128119372