• 高性能网络通信框架--Mercury


    Mercury是一个专门设计用于HPC系统的RPC框架,允许异步传输参数和执行请求,以及直接支持大型数据参数。网络实现是抽象的,允许轻松移植到未来系统,并有效使用现有的本地传输机制。Mercury的接口是通用的,允许对任何函数调用进行序列化。汞是微服务Mochi生态系统的核心组成部分。

    一、支持的架构

    MPI实现支持的架构通常由网络抽象层支持。

    网络抽象(NA)层在内部由RPC层和批量层使用。NA层使用插件机制,以便在运行时轻松添加和选择对各种网络协议的支持。

    NA支持不同的后端实现。推荐OFI/libfabric用于节点间通信的插件,而SM(共享内存)则用于节点内通信。

     

    在libfabric不可用或不建议使用的平台上,UCX插件也可用作替代传输,目前支持的协议是tcp和verbs。
    MPI和BMI(tcp)插件仍然受支持,但逐渐被移动为不推荐使用的插件。

     

    二、软件需求

    要使用OFI libfabric插件,参考后面的libfabrib构建说明。

    要使用UCX插件,参考后面的UCX构建说明。

    要在Linux上使用本机NA SM(共享内存)插件,需要内核v3.2中引入的跨内存连接(CMA)功能。

    要使用BMI插件,最方便的方法是通过spack安装,也可以:

    1. git clone https://github.com/radix-io/bmi.git && cd bmi
    2. ./prepare && ./configure --enable-shared --enable-bmi-only
    3. make && make install

    要使用MPI插件,Mercury需要一个配置良好的MPI实现(MPICH2 v1.4.1或更高版本/OpenMPI v1.6或更高),并在接受远程连接的目标上提供MPI_THREAD_MULTIPLE。不接受传入连接的进程不需要具有多线程执行级别。

    三、构建

    1. #bzip2 -dc mercury-X.tar.bz2 | tar xvf -
    2. #cd mercury-X
    3. #mkdir build
    4. #cd build
    5. #ccmake .. //(".." 是mercury-X目录的相对路径)

    多次按“c”,然后选择合适的选项。建议的选项有:

    1. BUILD_SHARED_LIBS ON (or OFF if the library you link
    2. against requires static libraries)
    3. BUILD_TESTING ON/OFF
    4. Boost_INCLUDE_DIR /path/to/include/directory
    5. CMAKE_INSTALL_PREFIX /path/to/install/directory
    6. MERCURY_ENABLE_DEBUG ON/OFF
    7. MERCURY_TESTING_ENABLE_PARALLEL ON/OFF
    8. MERCURY_USE_BOOST_PP ON
    9. MERCURY_USE_CHECKSUMS ON/OFF
    10. MERCURY_USE_SYSTEM_BOOST ON/OFF
    11. MERCURY_USE_SYSTEM_MCHECKSUM ON/OFF
    12. MERCURY_USE_XDR OFF
    13. NA_USE_BMI ON/OFF
    14. NA_USE_MPI ON/OFF
    15. NA_USE_OFI ON/OFF
    16. NA_USE_PSM ON/OFF
    17. NA_USE_PSM2 ON/OFF
    18. NA_USE_SM ON/OFF
    19. NA_USE_UCX ON/OFF

    设置包含目录和库路径可能需要您通过键入“t”切换到高级模式。完成后,如果没有看到任何错误,请键入“g”以生成生成生成文件。退出CMake配置屏幕并准备构建目标后,请执行以下操作:

    #make

    详细编译/生成输出是通过在make命令中插入VERBOSE=1来实现的

    make VERBOSE=1

    四、安装

    假设已设置CMAKE_INSTALL_PREFIX(见上一步),并且对目标目录具有写入权限,请从构建目录执行:

    make install

    可以运行测试以检查基本RPC功能(请求和批量数据传输)是否正常工作。CTest用于运行测试,只需从构建目录运行:

    ctest .

    详细测试是通过在ctest命令中插入-V来完成的:

    ctest -V .

    通过插入-VV可以显示额外的详细信息:

    ctest -VV .

    五、实例分析

    客户端

    1. int
    2. main(void)
    3. {
    4. const char *info_string = NULL;
    5. char target_addr_string[PATH_MAX], *p;
    6. FILE *na_config = NULL;
    7. hg_class_t *hg_class;
    8. hg_context_t *hg_context;
    9. hg_addr_t hg_target_addr;
    10. hg_return_t hg_ret;
    11. /* Get info string */
    12. info_string = getenv("HG_PORT_NAME");
    13. if (!info_string) {
    14. fprintf(stderr, "HG_PORT_NAME environment variable must be set\n");
    15. exit(0);
    16. }
    17. printf("Using %s\n", info_string);
    18. HG_Set_log_level("warning");
    19. /* 使用所需的网络抽象类初始化Mercury */
    20. hg_class = HG_Init(info_string, HG_FALSE);
    21. /* Create HG context */
    22. hg_context = HG_Context_create(hg_class);
    23. /* 连接字符串是在NA_Addr_self()/NA_Addr_to_string()后生成, 得到字符串并将其传递给 NA_Addr_lookup() */
    24. na_config = fopen(TEMP_DIRECTORY CONFIG_FILE_NAME, "r");
    25. if (!na_config) {
    26. fprintf(stderr, "Could not open config file from: %s\n",
    27. TEMP_DIRECTORY CONFIG_FILE_NAME);
    28. exit(0);
    29. }
    30. fgets(target_addr_string, PATH_MAX, na_config);
    31. p = strrchr(target_addr_string, '\n');
    32. if (p != NULL)
    33. *p = '\0';
    34. printf("Target address is: %s\n", target_addr_string);
    35. fclose(na_config);
    36. /* 查找 target address */
    37. HG_Addr_lookup2(hg_class, target_addr_string, &hg_target_addr);
    38. /* Register RPC */
    39. snappy_compress_id_g = snappy_compress_register(hg_class);
    40. /* Send RPC to target */
    41. snappy_compress_rpc(hg_class, hg_context, hg_target_addr);
    42. /* Poke progress engine and check for events */
    43. do {
    44. unsigned int actual_count = 0;
    45. do {
    46. hg_ret = HG_Trigger(
    47. hg_context, 0 /* timeout */, 1 /* max count */, &actual_count);
    48. } while ((hg_ret == HG_SUCCESS) && actual_count);
    49. /* Do not try to make progress anymore if we're done */
    50. if (snappy_compress_done_g)
    51. break;
    52. hg_ret = HG_Progress(hg_context, HG_MAX_IDLE_TIME);
    53. } while (hg_ret == HG_SUCCESS);
    54. /* Finalize */
    55. HG_Addr_free(hg_class, hg_target_addr);
    56. HG_Context_destroy(hg_context);
    57. HG_Finalize(hg_class);
    58. return EXIT_SUCCESS;
    59. }
    60. static int
    61. snappy_compress_rpc(
    62. hg_class_t *hg_class, hg_context_t *hg_context, hg_addr_t hg_target_addr)
    63. {
    64. int *input;
    65. size_t source_length = NR_ITEMS * sizeof(int);
    66. hg_bulk_t input_bulk_handle;
    67. void *compressed;
    68. size_t max_compressed_length;
    69. hg_bulk_t compressed_bulk_handle;
    70. snappy_compress_in_t snappy_compress_input;
    71. struct snappy_compress_rpc_args *snappy_compress_rpc_args;
    72. hg_handle_t handle;
    73. int i;
    74. /**
    75. * 我们将取一个缓冲区并将其发送到服务器进行压缩。
    76. */
    77. max_compressed_length = snappy_max_compressed_length(source_length);
    78. printf("Input buffer length is: %zu\n", source_length);
    79. printf("Max compressed length is: %zu\n", max_compressed_length);
    80. /* 生成 input buffer */
    81. input = (int *) malloc(source_length);
    82. for (i = 0; i < NR_ITEMS; i++) {
    83. input[i] = rand() % 10;
    84. }
    85. print_buf(20, input);
    86. /* Allocate compressed buffer */
    87. compressed = malloc(max_compressed_length);
    88. memset(compressed, '\0', max_compressed_length);
    89. /* Create HG handle bound to target */
    90. HG_Create(hg_context, hg_target_addr, snappy_compress_id_g, &handle);
    91. /**
    92. * 将“handle”与内存区域关联。Mercury的批量传输将从该区域获取/输入数据
    93. */
    94. HG_Bulk_create(hg_class, 1, (void **) &input, &source_length,
    95. HG_BULK_READ_ONLY, &input_bulk_handle);
    96. HG_Bulk_create(hg_class, 1, &compressed, &max_compressed_length,
    97. HG_BULK_READWRITE, &compressed_bulk_handle);
    98. /* 创建结构以保存参数,因为调用将异步执行 */
    99. snappy_compress_rpc_args = (struct snappy_compress_rpc_args *) malloc(
    100. sizeof(struct snappy_compress_rpc_args));
    101. snappy_compress_rpc_args->input = input;
    102. snappy_compress_rpc_args->input_length = source_length;
    103. snappy_compress_rpc_args->input_bulk_handle = input_bulk_handle;
    104. snappy_compress_rpc_args->compressed = compressed;
    105. snappy_compress_rpc_args->compressed_bulk_handle = compressed_bulk_handle;
    106. /* Set input arguments that will be passed to HG_Forward */
    107. snappy_compress_input.input_bulk_handle = input_bulk_handle;
    108. snappy_compress_input.compressed_bulk_handle = compressed_bulk_handle;
    109. /* Forward the call */
    110. printf("Sending input to target\n");
    111. HG_Forward(handle, snappy_compress_rpc_cb, snappy_compress_rpc_args,
    112. &snappy_compress_input);
    113. /* Handle will be destroyed when call completes (reference count) */
    114. HG_Destroy(handle);
    115. return 0;
    116. }
    117. /* 该例程在调用HG_Trigger和RPC完成后执行 */
    118. static hg_return_t
    119. snappy_compress_rpc_cb(const struct hg_cb_info *callback_info)
    120. {
    121. struct snappy_compress_rpc_args *snappy_compress_rpc_args =
    122. (struct snappy_compress_rpc_args *) callback_info->arg;
    123. hg_handle_t handle = callback_info->info.forward.handle;
    124. int *input;
    125. size_t source_length;
    126. void *compressed;
    127. size_t compressed_length;
    128. int *uncompressed;
    129. size_t uncompressed_length;
    130. snappy_compress_out_t snappy_compress_output;
    131. snappy_status ret;
    132. /* 获取 output */
    133. printf("Received output from target\n");
    134. HG_Get_output(handle, &snappy_compress_output);
    135. /*获取 output parameters */
    136. ret = snappy_compress_output.ret;
    137. compressed_length = snappy_compress_output.compressed_length;
    138. compressed = snappy_compress_rpc_args->compressed;
    139. input = snappy_compress_rpc_args->input;
    140. source_length = snappy_compress_rpc_args->input_length;
    141. /* Check ret */
    142. if (ret != SNAPPY_OK) {
    143. fprintf(stderr, "Error: snappy_compressed failed with ret %d\n", ret);
    144. }
    145. /* 输出数据现在在bulk缓冲区中 */
    146. printf("Compressed buffer length is: %zu\n", compressed_length);
    147. print_buf(5, (int *) compressed);
    148. if (snappy_validate_compressed_buffer(compressed, compressed_length) ==
    149. SNAPPY_OK) {
    150. printf("Compressed buffer validated: compressed successfully\n");
    151. }
    152. uncompressed_length = source_length * sizeof(int);
    153. uncompressed = (int *) malloc(uncompressed_length);
    154. /* Uncompress data and check uncompressed_length */
    155. printf("Uncompressing buffer...\n");
    156. snappy_uncompress(compressed, compressed_length, (char *) uncompressed,
    157. &uncompressed_length);
    158. printf("Uncompressed buffer length is: %zu\n", uncompressed_length);
    159. print_buf(20, uncompressed);
    160. /* Free output and handles */
    161. HG_Free_output(handle, &snappy_compress_output);
    162. HG_Bulk_free(snappy_compress_rpc_args->input_bulk_handle);
    163. HG_Bulk_free(snappy_compress_rpc_args->compressed_bulk_handle);
    164. /* Free data */
    165. free(uncompressed);
    166. free(compressed);
    167. free(input);
    168. free(snappy_compress_rpc_args);
    169. /* We're done */
    170. snappy_compress_done_g = HG_TRUE;
    171. return HG_SUCCESS;
    172. }

    服务端

    1. int
    2. main(void)
    3. {
    4. const char *info_string = NULL;
    5. char self_addr_string[PATH_MAX];
    6. hg_addr_t self_addr;
    7. FILE *na_config = NULL;
    8. hg_class_t *hg_class;
    9. hg_context_t *hg_context;
    10. unsigned major;
    11. unsigned minor;
    12. unsigned patch;
    13. hg_return_t hg_ret;
    14. hg_size_t self_addr_string_size = PATH_MAX;
    15. HG_Version_get(&major, &minor, &patch);
    16. printf("Server running mercury version %u.%u.%u\n", major, minor, patch);
    17. /* Get info string */
    18. /* bmi+tcp://localhost:port */
    19. info_string = getenv("HG_PORT_NAME");
    20. if (!info_string) {
    21. fprintf(stderr, "HG_PORT_NAME environment variable must be set, "
    22. "e.g.:\nHG_PORT_NAME=\"tcp://127.0.0.1:22222\"\n");
    23. exit(0);
    24. }
    25. HG_Set_log_level("warning");
    26. /* Initialize Mercury with the desired network abstraction class */
    27. hg_class = HG_Init(info_string, HG_TRUE);
    28. /* Get self addr to tell client about */
    29. HG_Addr_self(hg_class, &self_addr);
    30. HG_Addr_to_string(
    31. hg_class, self_addr_string, &self_addr_string_size, self_addr);
    32. HG_Addr_free(hg_class, self_addr);
    33. printf("Server address is: %s\n", self_addr_string);
    34. /* Write addr to a file */
    35. na_config = fopen(TEMP_DIRECTORY CONFIG_FILE_NAME, "w+");
    36. if (!na_config) {
    37. fprintf(stderr, "Could not open config file from: %s\n",
    38. TEMP_DIRECTORY CONFIG_FILE_NAME);
    39. exit(0);
    40. }
    41. fprintf(na_config, "%s\n", self_addr_string);
    42. fclose(na_config);
    43. /* Create HG context */
    44. hg_context = HG_Context_create(hg_class);
    45. /* Register RPC */
    46. snappy_compress_register(hg_class);
    47. /* Poke progress engine and check for events */
    48. do {
    49. unsigned int actual_count = 0;
    50. do {
    51. hg_ret = HG_Trigger(
    52. hg_context, 0 /* timeout */, 1 /* max count */, &actual_count);
    53. } while ((hg_ret == HG_SUCCESS) && actual_count);
    54. /* Do not try to make progress anymore if we're done */
    55. if (snappy_compress_done_target_g)
    56. break;
    57. hg_ret = HG_Progress(hg_context, HG_MAX_IDLE_TIME);
    58. } while (hg_ret == HG_SUCCESS);
    59. /* Finalize */
    60. HG_Context_destroy(hg_context);
    61. HG_Finalize(hg_class);
    62. return EXIT_SUCCESS;
    63. }
    64. hg_id_t
    65. snappy_compress_register(hg_class_t *hg_class)
    66. {
    67. return MERCURY_REGISTER(hg_class, "snappy_compress", snappy_compress_in_t,
    68. snappy_compress_out_t, snappy_compress_cb);
    69. }
    70. /**
    71. * 设置实际执行工作的例程的例程.
    72. * 这个“handle”参数是传递给这个回调的唯一值,但Mercury例程允许我们查询有关调用上下文的信息. */
    73. static hg_return_t
    74. snappy_compress_cb(hg_handle_t handle)
    75. {
    76. struct snappy_transfer_args *snappy_transfer_args;
    77. size_t input_length;
    78. snappy_transfer_args = (struct snappy_transfer_args *) malloc(
    79. sizeof(struct snappy_transfer_args));
    80. snappy_transfer_args->handle = handle;
    81. /* 获取从源经过HG_Forward() 发送过来的输入参数 */
    82. HG_Get_input(handle, &snappy_transfer_args->snappy_compress_input);
    83. /* Now set up the bulk transfer and get the input length */
    84. input_length = HG_Bulk_get_size(
    85. snappy_transfer_args->snappy_compress_input.input_bulk_handle);
    86. /* bulk handle 基本上是一个指针,另外“handle”可以引用多个内存区域. */
    87. HG_Bulk_create(HG_Get_info(handle)->hg_class, 1, NULL, &input_length,
    88. HG_BULK_READWRITE, &snappy_transfer_args->local_input_bulk_handle);
    89. /* 将数据从origin的内存中拉到本端的内存中 */
    90. /* 另一种方法是通过HG_Bulk_access,这将允许mercury在“co-resident”的情况下避免复制数据 */
    91. HG_Bulk_transfer(HG_Get_info(handle)->context, snappy_pull_cb,
    92. snappy_transfer_args, HG_BULK_PULL, HG_Get_info(handle)->addr,
    93. snappy_transfer_args->snappy_compress_input.input_bulk_handle,
    94. 0, /* origin */
    95. snappy_transfer_args->local_input_bulk_handle, 0, /* local */
    96. input_length, HG_OP_ID_IGNORE);
    97. return HG_SUCCESS;
    98. }
    99. static hg_return_t
    100. snappy_pull_cb(const struct hg_cb_info *hg_cb_info)
    101. {
    102. struct snappy_transfer_args *snappy_transfer_args =
    103. (struct snappy_transfer_args *) hg_cb_info->arg;
    104. hg_return_t ret = HG_SUCCESS;
    105. void *input;
    106. size_t input_length;
    107. size_t source_length =
    108. HG_Bulk_get_size(snappy_transfer_args->local_input_bulk_handle);
    109. /* 从本地handle获取指向输入缓冲区的指针input */
    110. HG_Bulk_access(hg_cb_info->info.bulk.local_handle, 0, source_length,
    111. HG_BULK_READ_ONLY, 1, &input, &input_length, NULL);
    112. printf("Transferred input buffer of length: %zu\n", input_length);
    113. print_buf(20, (int *) input);
    114. /* 为压缩输入数据分配压缩缓冲区 */
    115. snappy_transfer_args->compressed_length =
    116. snappy_max_compressed_length(input_length);
    117. snappy_transfer_args->compressed =
    118. malloc(snappy_transfer_args->compressed_length);
    119. /* Compress data */
    120. printf("Compressing buffer...\n");
    121. snappy_transfer_args->ret =
    122. snappy_compress(input, input_length, snappy_transfer_args->compressed,
    123. &snappy_transfer_args->compressed_length);
    124. printf(
    125. "Return value of snappy_compress is: %d\n", snappy_transfer_args->ret);
    126. printf("Compressed buffer length is: %zu\n",
    127. snappy_transfer_args->compressed_length);
    128. print_buf(5, (int *) snappy_transfer_args->compressed);
    129. /* Free bulk handles */
    130. HG_Bulk_free(snappy_transfer_args->local_input_bulk_handle);
    131. if (snappy_validate_compressed_buffer(snappy_transfer_args->compressed,
    132. snappy_transfer_args->compressed_length) == SNAPPY_OK) {
    133. printf("Compressed buffer validated: compressed successfully\n");
    134. }
    135. /* 将压缩后的数据push回源节点 */
    136. HG_Bulk_create(HG_Get_info(snappy_transfer_args->handle)->hg_class, 1,
    137. &snappy_transfer_args->compressed,
    138. &snappy_transfer_args->compressed_length, HG_BULK_READ_ONLY,
    139. &snappy_transfer_args->local_compressed_bulk_handle);
    140. HG_Bulk_transfer(HG_Get_info(snappy_transfer_args->handle)->context,
    141. snappy_push_cb, snappy_transfer_args, HG_BULK_PUSH,
    142. HG_Get_info(snappy_transfer_args->handle)->addr,
    143. snappy_transfer_args->snappy_compress_input.compressed_bulk_handle,
    144. 0, /* origin */
    145. snappy_transfer_args->local_compressed_bulk_handle, 0, /* local */
    146. snappy_transfer_args->compressed_length, HG_OP_ID_IGNORE);
    147. return ret;
    148. }
    149. /* 压缩数据推送到源节点后回调,给源节点HG_Forward请求回消息*/
    150. static hg_return_t
    151. snappy_push_cb(const struct hg_cb_info *hg_cb_info)
    152. {
    153. struct snappy_transfer_args *snappy_transfer_args =
    154. (struct snappy_transfer_args *) hg_cb_info->arg;
    155. hg_return_t ret = HG_SUCCESS;
    156. snappy_compress_out_t snappy_compress_output;
    157. /* 设置输出参数以通知源节点 */
    158. snappy_compress_output.ret = snappy_transfer_args->ret;
    159. snappy_compress_output.compressed_length =
    160. snappy_transfer_args->compressed_length;
    161. printf("Transferred compressed buffer of length %zu\n",
    162. snappy_transfer_args->compressed_length);
    163. printf("Sending output parameters back to origin\n");
    164. HG_Respond(snappy_transfer_args->handle, snappy_compress_done_cb, NULL,
    165. &snappy_compress_output);
    166. /* Free bulk handles */
    167. printf("Freeing resources\n");
    168. HG_Bulk_free(snappy_transfer_args->local_compressed_bulk_handle);
    169. free(snappy_transfer_args->compressed);
    170. /* Free input */
    171. HG_Free_input(snappy_transfer_args->handle,
    172. &snappy_transfer_args->snappy_compress_input);
    173. /* Destroy handle (no longer need it, safe because of reference count) */
    174. HG_Destroy(snappy_transfer_args->handle);
    175. free(snappy_transfer_args);
    176. return ret;
    177. }
    178. static hg_return_t
    179. snappy_compress_done_cb(const struct hg_cb_info *callback_info)
    180. {
    181. /* We're done */
    182. snappy_compress_done_target_g = HG_TRUE;
    183. return callback_info->ret;
    184. }

  • 相关阅读:
    7、DVWA——SQL盲注
    【MySQL —— 数据库约束】
    功能测试进阶建议,学习思路讲解
    shopee知虾数据:提升Shopee店铺运营效果必备工具—知虾数据工具
    【翻译】Raft 共识算法:集群成员变更
    Codeforces Round #779 (Div. 2)
    MySQL底层知识总结
    轻松学会结构栈
    1069 The Black Hole of Numbers
    【目标检测】Faster R-CNN 论文复现代码(含源代码)
  • 原文地址:https://blog.csdn.net/cyq6239075/article/details/126752268