• ZMQ之脱机可靠性--巨人模式


            当你意识到管家模式是一种非常可靠的消息代理时,你可能会想要使用磁盘做一下消息中转,从而进一步提升可靠性。这种方式虽然在很多企业级消息系统中应用,但我还是有些反对的,原因有:

                    1、我们可以看到,懒惰海盗模式的client可以工作得非常好,能够在多种架构中运行。唯一的问题是它会假设worker是无状态的,且提供的服务是幂等的。但这个问题我们可以通过其他方式解决,而不是添加磁盘。

                    2、添加磁盘会带来新的问题,需要额外的管理和维护费用。海盗模式的最大优点就是简单明了,不会崩溃。如果你还是担心硬件会出问题,可以改用点对点的通信模式,这会在本章最后一节讲到。

            虽然有以上原因,但还是有一个合理的场景可以用到磁盘中转的——异步脱机网络。海盗模式有一个问题,那就是client发送请求后会一直等待应答。如果client和worker并不是长连接(可以拿电子邮箱做个类比),我们就无法在client和worker之间建立一个无状态的网络,因此需要将这种状态保存起来。

            于是我们就有了巨人模式,该模式下会将消息写到磁盘中,确保不会丢失。当我们进行服务查询时,会转向巨人这一层进行。巨人是建立在管家之上的,而不是改写了MDP协议。这样做的好处是我们可以在一个特定的worker中实现这种可靠性,而不用去增加代理的逻辑。

            实现更为简单:

                    1、代理用一种语言编写,worker使用另一种语言编写。

                    2、可以自由升级这种模式。

            唯一的缺点是,代理和磁盘之间会有一层额外的联系,不过这也是值得的。

            我们有很多方法来实现一种持久化的请求-应答架构,而目标当然是越简单越好。我能想到的最简单的方式是提供一种成为“巨人”的代理服务,它不会影响现有worker的工作,若client想要立即得到应答,它可以和代理进行通信;如果它不是那么着急,那就可以和巨人通信:“嗨,巨人,麻烦帮我处理下这个请求,我去买些菜。”

             这样一来,巨人就既是worker又是client。client和巨人之间的对话一般是:

                    1、Client: 请帮我处理这个请求。巨人:好的。

                    2、Client: 有要给我的应答吗?巨人:有的。(或者没有)

                    3、Client: OK,你可以释放那个请求了,工作已经完成。巨人:好的。

            巨人和代理之间的对话一般是:

                    1、巨人:嗨,代理程序,你这里有个叫echo的服务吗?代理:恩,好像有。

                    2、巨人:嗨,echo服务,请帮我处理一下这个请求。Echo: 好了,这是应答。

                    3、巨人:谢谢!

            你可以想象一些发生故障的情形,看看上述模式是否能解决?worker在处理请求的时候崩溃,巨人会不断地重新发送请求;应答在传输过程中丢失了,巨人也会重试;如果请求已经处理,但client没有得到应答,那它会再次询问巨人;如果巨人在处理请求或进行应答的时候崩溃了,客户端会进行重试;只要请求是被保存在磁盘上的,那它就不会丢失。

            这个机制中,握手的过程是比较漫长的,但client可以使用异步的管家模式,一次发送多个请求,并一起等待应答。

            我们需要一种方法,让client会去请求应答内容。不同的client会访问到相同的服务,且client是来去自由的,有着不同的标识。一个简单、合理、安全的解决方案是:

                    1、当巨人收到请求时,它会为每个请求生成唯一的编号(UUID),并将这个编号返回给client;

                    2、client在请求应答内容时需要提供这个编号。

            这样一来client就需要负责将UUID安全地保存起来,不过这就省去了验证的过程。有其他方案吗?我们可以使用持久化的套接字,即显式声明客户端的套接字标识。然而,这会造成管理上的麻烦,而且万一两个client的套接字标识相同,那会引来无穷的麻烦。

            在我们开始制定一个新的协议之前,我们先思考一下client如何和巨人通信。一种方案是提供一种服务,配合三个不同的命令;另一种方案则更为简单,提供三种独立的服务:

                    1、titanic.request - 保存一个请求,并返回UUID

                    2、titanic.reply - 根据UUID获取应答内容

                    3、titanic.close - 确认某个请求已被正确地处理

            我们需要创建一个多线程的worker,正如我们之前用ZMQ进行多线程编程一样,很简单。但是,在我们开始编写代码之前,先讲巨人模式的一些定义写下来:http://rfc.zeromq.org/spec:9 。我们称之为“巨人服务协议”,或TSP。

            使用TSP协议自然会让client多出额外的工作,下面是一个简单但足够健壮的client:

            ticlient: Titanic client example in C

    1. //
    2. // 巨人模式client示例
    3. // 实现 http://rfc.zeromq.org/spec:9 协议中的client端
    4. // 让我们直接编译,不创建类库
    5. #include "mdcliapi.c"
    6. // 请求TSP协议下的服务
    7. // 如果成功则返回应答(状态码:200),否则返回NULL
    8. //
    9. static zmsg_t *
    10. s_service_call (mdcli_t *session, char *service, zmsg_t **request_p)
    11. {
    12. zmsg_t *reply = mdcli_send (session, service, request_p);
    13. if (reply) {
    14. zframe_t *status = zmsg_pop (reply);
    15. if (zframe_streq (status, "200")) {
    16. zframe_destroy (&status);
    17. return reply;
    18. }
    19. else
    20. if (zframe_streq (status, "400")) {
    21. printf ("E: 客户端发生严重错误,取消请求\n");
    22. exit (EXIT_FAILURE);
    23. }
    24. else
    25. if (zframe_streq (status, "500")) {
    26. printf ("E: 服务端发生严重错误,取消请求\n");
    27. exit (EXIT_FAILURE);
    28. }
    29. }
    30. else
    31. exit (EXIT_SUCCESS); // 中断或发生错误
    32. zmsg_destroy (&reply);
    33. return NULL; // 请求不成功,但不返回失败原因
    34. }
    35. int main (int argc, char *argv [])
    36. {
    37. int verbose = (argc > 1 && streq (argv [1], "-v"));
    38. mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
    39. // 1. 发送echo服务的请求给巨人
    40. zmsg_t *request = zmsg_new ();
    41. zmsg_addstr (request, "echo");
    42. zmsg_addstr (request, "Hello world");
    43. zmsg_t *reply = s_service_call (
    44. session, "titanic.request", &request);
    45. zframe_t *uuid = NULL;
    46. if (reply) {
    47. uuid = zmsg_pop (reply);
    48. zmsg_destroy (&reply);
    49. zframe_print (uuid, "I: request UUID ");
    50. }
    51. // 2. 等待应答
    52. while (!zctx_interrupted) {
    53. zclock_sleep (100);
    54. request = zmsg_new ();
    55. zmsg_add (request, zframe_dup (uuid));
    56. zmsg_t *reply = s_service_call (
    57. session, "titanic.reply", &request);
    58. if (reply) {
    59. char *reply_string = zframe_strdup (zmsg_last (reply));
    60. printf ("Reply: %s\n", reply_string);
    61. free (reply_string);
    62. zmsg_destroy (&reply);
    63. // 3. 关闭请求
    64. request = zmsg_new ();
    65. zmsg_add (request, zframe_dup (uuid));
    66. reply = s_service_call (session, "titanic.close", &request);
    67. zmsg_destroy (&reply);
    68. break;
    69. }
    70. else {
    71. printf ("I: 尚未收到应答,准备稍后重试...\n");
    72. zclock_sleep (5000); // 5秒后重试
    73. }
    74. }
    75. zframe_destroy (&uuid);
    76. mdcli_destroy (&session);
    77. return 0;
    78. }

            当然,上面的代码可以整合到一个框架中,程序员不需要了解其中的细节。如果我有时间的话,我会尝试写一个这样的API的,让应用程序又变回短短的几行。这种理念和MDP中的一致:不要做重复的事。

            下面是巨人的实现。这个服务端会使用三个线程来处理三种服务。它使用最原始的持久化方法来保存请求:为每个请求创建一个磁盘文件。虽然简单,但也挺恐怖的。比较复杂的部分是,巨人会维护一个队列来保存这些请求,从而避免重复地扫描目录。

            titanic: Titanic broker example in C

    1. //
    2. // 巨人模式 - 服务
    3. //
    4. // 实现 http://rfc.zeromq.org/spec:9 协议的服务端
    5. // 让我们直接编译,不创建类库
    6. #include "mdwrkapi.c"
    7. #include "mdcliapi.c"
    8. #include "zfile.h"
    9. #include
    10. // 返回一个可打印的唯一编号(UUID)
    11. // 调用者负责释放UUID字符串的内存
    12. static char *
    13. s_generate_uuid (void)
    14. {
    15. char hex_char [] = "0123456789ABCDEF";
    16. char *uuidstr = zmalloc (sizeof (uuid_t) * 2 + 1);
    17. uuid_t uuid;
    18. uuid_generate (uuid);
    19. int byte_nbr;
    20. for (byte_nbr = 0; byte_nbr < sizeof (uuid_t); byte_nbr++) {
    21. uuidstr [byte_nbr * 2 + 0] = hex_char [uuid [byte_nbr] >> 4];
    22. uuidstr [byte_nbr * 2 + 1] = hex_char [uuid [byte_nbr] & 15];
    23. }
    24. return uuidstr;
    25. }
    26. // 根据UUID生成用于保存请求内容的文件名,并返回
    27. #define TITANIC_DIR ".titanic"
    28. static char *
    29. s_request_filename (char *uuid) {
    30. char *filename = malloc (256);
    31. snprintf (filename, 256, TITANIC_DIR "/%s.req", uuid);
    32. return filename;
    33. }
    34. // 根据UUID生成用于保存应答内容的文件名,并返回
    35. static char *
    36. s_reply_filename (char *uuid) {
    37. char *filename = malloc (256);
    38. snprintf (filename, 256, TITANIC_DIR "/%s.rep", uuid);
    39. return filename;
    40. }
    41. // ---------------------------------------------------------------------
    42. // 巨人模式 - 请求服务
    43. static void
    44. titanic_request (void *args, zctx_t *ctx, void *pipe)
    45. {
    46. mdwrk_t *worker = mdwrk_new (
    47. "tcp://localhost:5555", "titanic.request", 0);
    48. zmsg_t *reply = NULL;
    49. while (TRUE) {
    50. // 若应答非空则发送,再从代理处获得新的请求
    51. zmsg_t *request = mdwrk_recv (worker, &reply);
    52. if (!request)
    53. break; // 中断并退出
    54. // 确保消息目录是存在的
    55. file_mkdir (TITANIC_DIR);
    56. // 生成UUID,并将消息保存至磁盘
    57. char *uuid = s_generate_uuid ();
    58. char *filename = s_request_filename (uuid);
    59. FILE *file = fopen (filename, "w");
    60. assert (file);
    61. zmsg_save (request, file);
    62. fclose (file);
    63. free (filename);
    64. zmsg_destroy (&request);
    65. // 将UUID加入队列
    66. reply = zmsg_new ();
    67. zmsg_addstr (reply, uuid);
    68. zmsg_send (&reply, pipe);
    69. // 将UUID返回给客户端
    70. // 将由循环顶部的mdwrk_recv()函数完成
    71. reply = zmsg_new ();
    72. zmsg_addstr (reply, "200");
    73. zmsg_addstr (reply, uuid);
    74. free (uuid);
    75. }
    76. mdwrk_destroy (&worker);
    77. }
    78. // ---------------------------------------------------------------------
    79. // 巨人模式 - 应答服务
    80. static void *
    81. titanic_reply (void *context)
    82. {
    83. mdwrk_t *worker = mdwrk_new (
    84. "tcp://localhost:5555", "titanic.reply", 0);
    85. zmsg_t *reply = NULL;
    86. while (TRUE) {
    87. zmsg_t *request = mdwrk_recv (worker, &reply);
    88. if (!request)
    89. break; // 中断并退出
    90. char *uuid = zmsg_popstr (request);
    91. char *req_filename = s_request_filename (uuid);
    92. char *rep_filename = s_reply_filename (uuid);
    93. if (file_exists (rep_filename)) {
    94. FILE *file = fopen (rep_filename, "r");
    95. assert (file);
    96. reply = zmsg_load (file);
    97. zmsg_pushstr (reply, "200");
    98. fclose (file);
    99. }
    100. else {
    101. reply = zmsg_new ();
    102. if (file_exists (req_filename))
    103. zmsg_pushstr (reply, "300"); //挂起
    104. else
    105. zmsg_pushstr (reply, "400"); //未知
    106. }
    107. zmsg_destroy (&request);
    108. free (uuid);
    109. free (req_filename);
    110. free (rep_filename);
    111. }
    112. mdwrk_destroy (&worker);
    113. return 0;
    114. }
    115. // ---------------------------------------------------------------------
    116. // 巨人模式 - 关闭请求
    117. static void *
    118. titanic_close (void *context)
    119. {
    120. mdwrk_t *worker = mdwrk_new (
    121. "tcp://localhost:5555", "titanic.close", 0);
    122. zmsg_t *reply = NULL;
    123. while (TRUE) {
    124. zmsg_t *request = mdwrk_recv (worker, &reply);
    125. if (!request)
    126. break; // 中断并退出
    127. char *uuid = zmsg_popstr (request);
    128. char *req_filename = s_request_filename (uuid);
    129. char *rep_filename = s_reply_filename (uuid);
    130. file_delete (req_filename);
    131. file_delete (rep_filename);
    132. free (uuid);
    133. free (req_filename);
    134. free (rep_filename);
    135. zmsg_destroy (&request);
    136. reply = zmsg_new ();
    137. zmsg_addstr (reply, "200");
    138. }
    139. mdwrk_destroy (&worker);
    140. return 0;
    141. }
    142. // 处理某个请求,成功则返回1
    143. static int
    144. s_service_success (mdcli_t *client, char *uuid)
    145. {
    146. // 读取请求内容,第一帧为服务名称
    147. char *filename = s_request_filename (uuid);
    148. FILE *file = fopen (filename, "r");
    149. free (filename);
    150. // 如果client已经关闭了该请求,则返回1
    151. if (!file)
    152. return 1;
    153. zmsg_t *request = zmsg_load (file);
    154. fclose (file);
    155. zframe_t *service = zmsg_pop (request);
    156. char *service_name = zframe_strdup (service);
    157. // 使用MMI协议检查服务是否可用
    158. zmsg_t *mmi_request = zmsg_new ();
    159. zmsg_add (mmi_request, service);
    160. zmsg_t *mmi_reply = mdcli_send (client, "mmi.service", &mmi_request);
    161. int service_ok = (mmi_reply
    162. && zframe_streq (zmsg_first (mmi_reply), "200"));
    163. zmsg_destroy (&mmi_reply);
    164. if (service_ok) {
    165. zmsg_t *reply = mdcli_send (client, service_name, &request);
    166. if (reply) {
    167. filename = s_reply_filename (uuid);
    168. FILE *file = fopen (filename, "w");
    169. assert (file);
    170. zmsg_save (reply, file);
    171. fclose (file);
    172. free (filename);
    173. return 1;
    174. }
    175. zmsg_destroy (&reply);
    176. }
    177. else
    178. zmsg_destroy (&request);
    179. free (service_name);
    180. return 0;
    181. }
    182. int main (int argc, char *argv [])
    183. {
    184. int verbose = (argc > 1 && streq (argv [1], "-v"));
    185. zctx_t *ctx = zctx_new ();
    186. // 创建MDP客户端会话
    187. mdcli_t *client = mdcli_new ("tcp://localhost:5555", verbose);
    188. mdcli_set_timeout (client, 1000); // 1 秒
    189. mdcli_set_retries (client, 1); // 只尝试一次
    190. void *request_pipe = zthread_fork (ctx, titanic_request, NULL);
    191. zthread_new (ctx, titanic_reply, NULL);
    192. zthread_new (ctx, titanic_close, NULL);
    193. // 主循环
    194. while (TRUE) {
    195. // 如果没有活动,我们将每秒循环一次
    196. zmq_pollitem_t items [] = { { request_pipe, 0, ZMQ_POLLIN, 0 } };
    197. int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC);
    198. if (rc == -1)
    199. break; // 中断
    200. if (items [0].revents & ZMQ_POLLIN) {
    201. // 确保消息目录是存在的
    202. file_mkdir (TITANIC_DIR);
    203. // 将UUID添加到队列中,使用“-”号标识等待中的请求
    204. zmsg_t *msg = zmsg_recv (request_pipe);
    205. if (!msg)
    206. break; // 中断
    207. FILE *file = fopen (TITANIC_DIR "/queue", "a");
    208. char *uuid = zmsg_popstr (msg);
    209. fprintf (file, "-%s\n", uuid);
    210. fclose (file);
    211. free (uuid);
    212. zmsg_destroy (&msg);
    213. }
    214. // 分派
    215. //
    216. char entry [] = "?.......:.......:.......:.......:";
    217. FILE *file = fopen (TITANIC_DIR "/queue", "r+");
    218. while (file && fread (entry, 33, 1, file) == 1) {
    219. // 处理UUID前缀为“-”的请求
    220. if (entry [0] == '-') {
    221. if (verbose)
    222. printf ("I: 开始处理请求 %s\n", entry + 1);
    223. if (s_service_success (client, entry + 1)) {
    224. // 标记为已处理
    225. fseek (file, -33, SEEK_CUR);
    226. fwrite ("+", 1, 1, file);
    227. fseek (file, 32, SEEK_CUR);
    228. }
    229. }
    230. // 跳过最后一行
    231. if (fgetc (file) == '\r')
    232. fgetc (file);
    233. if (zctx_interrupted)
    234. break;
    235. }
    236. if (file)
    237. fclose (file);
    238. }
    239. mdcli_destroy (&client);
    240. return 0;
    241. }

            测试时,打开mdbroker和titanic,再运行ticlient,然后开启任意个mdworker,就可以看到client获得了应答。

            几点说明:

                    1、我们使用MMI协议去向代理询问某项服务是否可用,这一点和MDP中的逻辑一致;

                    2、我们使用inproc(进程内)协议建立主循环和titanic.request服务间的联系,保存新的请求信息。这样可以避免主循环不断扫描磁盘目录,读取所有请求文件,并按照时间日期排序。

            这个示例程序不应关注它的性能(一定会非常糟糕,虽然我没有测试过),而是应该看到它是如何提供一种可靠的通信模式的。你可以测试一下,打开代理、巨人、worker和client,使用-v参数显示跟踪信息,然后随意地开关代理、巨人、或worker(client不能关闭),可以看到所有的请求都能获得应答。

            如果你想在真实环境中使用巨人模式,你肯定会问怎样才能让速度快起来。以下是我的做法:

                    1、使用一个磁盘文件保存所有数据。操作系统处理大文件的效率要比处理许多小文件来的高。

                    2、使用一种循环的机制来组织该磁盘文件的结构,这样新的请求可以被连续地写入这个文件。单个线程在全速写入磁盘时的效率是比较高的。

                    3、将索引保存在内存中,可以在启动程序时重建这个索引。这样做可以节省磁盘缓存,让索引安全地保存在磁盘上。你需要用到fsync的机制来保存每一条数据;或者可以等待几毫秒,如果不怕丢失上千条数据的话。

                    4、如果条件允许,应选择使用固态硬盘;

                    5、提前分配该磁盘文件的空间,或者将每次分配的空间调大一些,这样可以避免磁盘碎片的产生,并保证读写是连续的。

            另外,我不建议将消息保存在数据库中,甚至不建议交给那些所谓的高速键值缓存,它们比起一个磁盘文件要来得昂贵。

            如果你想让巨人模式变得更为可靠,你可以将请求复制到另一台服务器上,这样就不需要担心主程序遭到核武器袭击了。

            如果你想让巨人模式变得更为快速,但可以牺牲一些可靠性,那你可以将请求和应答都保存在内存中。这样做可以让该服务作为脱机网络运行,不过若巨人服务本身崩溃了,我也无能为力。

  • 相关阅读:
    互联网控制报文协议ICMP(计算机网络)
    序列模型之循环神经网络(二)
    用docker部署公司的一个web系统
    Tomcat优化
    Linux:进程间通信
    206 - 211.MySQL中的完整性约束
    allatori8.0文档翻译-第四步-单一jar打包
    docker应用部署---nginx部署的配置
    Redis集群-哨兵模式原理(Sentinel)
    小白也能通俗易懂的Mac环境变量配置教程
  • 原文地址:https://blog.csdn.net/code_lyb/article/details/128118860