• NNG pair 异步通信


    一,利用NNG pair模式,实现异步通信

    二,manager端  绑定地址,回调函数里 接收 异步消息:

    1. #include <stdint.h>
    2. #include <stdio.h>
    3. #include <stdlib.h>
    4. #include <string.h>
    5. #include <time.h>
    6. #include <nng/nng.h>
    7. #include <nng/protocol/pair0/pair.h>
    8. #include <nng/supplemental/util/platform.h>
    9. #include <iostream>
    10. #include <thread>
    11. #include <chrono>
    12. #include <atomic>
    13. #include <signal.h>
    14. #include <sys/wait.h>
    15. using namespace std;
    16. using namespace std::chrono;
    17. static bool exit_flag = false;
    18. void recv_data_callback(void *arg);
    19. static void sig_handler(int sig)
    20. {
    21. exit_flag = true;
    22. std::cout << "sig_handler " << exit_flag << endl;
    23. }
    24. void fatal(const char *func, int rv)
    25. {
    26. fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
    27. exit(1);
    28. }
    29. class Manager
    30. {
    31. public:
    32. //初始化
    33. bool init()
    34. {
    35. //创建io 并绑定回调函数
    36. rv = nng_aio_alloc(&aio, recv_data_callback, this);
    37. if (rv < 0)
    38. {
    39. fatal("cannot allocate aio", rv);
    40. }
    41. //打开
    42. rv = nng_pair0_open(&sock);
    43. if (rv != 0)
    44. {
    45. fatal("nng_pair0_open", rv);
    46. }
    47. //设置缓冲区大小
    48. nng_socket_set_int(sock, NNG_OPT_SENDBUF, 2048);
    49. nng_socket_set_int(sock, NNG_OPT_RECVBUF, 2048);
    50. //开始监听
    51. if ((rv = nng_listen(sock, url.c_str(), NULL, 0)) != 0)
    52. {
    53. fatal("nng_listen", rv);
    54. }
    55. nng_recv_aio(sock, aio);
    56. isInit = true;
    57. return isInit;
    58. }
    59. //发送数据
    60. void send(const std::string &msgStr)
    61. {
    62. if (!isInit)
    63. return;
    64. if (!isInit)
    65. return;
    66. nng_msg *msg = NULL;
    67. nng_msg_alloc(&msg, sizeof(msgStr));
    68. memcpy(nng_msg_body(msg), msgStr.c_str(), sizeof(msgStr));
    69. nng_sendmsg(sock, msg, 0);
    70. }
    71. public:
    72. nng_socket sock;
    73. nng_aio *aio{nullptr};
    74. private:
    75. int rv;
    76. std::string url{"ipc:///tmp/pair"};
    77. bool isInit{false};
    78. };
    79. void recv_data_callback(void *arg)
    80. {
    81. int rv = 0;
    82. Manager *manager = static_cast<Manager*>(arg);
    83. nng_msg *msg = NULL;
    84. size_t json_len = 0;
    85. char * json_str = NULL;
    86. rv = nng_aio_result(manager->aio);
    87. if (0 != rv) {
    88. fatal("nng_recv error ", rv);
    89. }
    90. msg = nng_aio_get_msg(manager->aio);
    91. json_str = static_cast<char*>(nng_msg_body(msg));
    92. json_len = nng_msg_len(msg);
    93. std::cout<<"recv_data_callback "<<json_str<<std::endl;
    94. nng_msg_free(msg);
    95. nng_recv_aio(manager->sock, manager->aio);
    96. }
    97. int main(int argc, char *grgv[])
    98. {
    99. signal(SIGINT, sig_handler);
    100. signal(SIGTERM, sig_handler);
    101. signal(SIGABRT, sig_handler);
    102. Manager manager;
    103. if (manager.init())
    104. {
    105. cout << "init success" << endl;
    106. }
    107. else
    108. {
    109. cout << "init failed" << endl;
    110. }
    111. while (!exit_flag)
    112. {
    113. manager.send("Not bad");
    114. this_thread::sleep_for(seconds(1));
    115. }
    116. return 0;
    117. }

    三,adapter 端,同步发送数据,单开一个线程 进行数据的轮询接收。

    1. #include <nng/nng.h>
    2. #include <nng/protocol/pair0/pair.h>
    3. #include <nng/supplemental/util/platform.h>
    4. #include <iostream>
    5. #include <thread>
    6. #include <chrono>
    7. #include <atomic>
    8. #include <signal.h>
    9. #include <sys/wait.h>
    10. #include <string.h>
    11. using namespace std;
    12. using namespace std::chrono;
    13. static bool exit_flag = false;
    14. static void sig_handler(int sig)
    15. {
    16. exit_flag = true;
    17. std::cout << "sig_handler " << exit_flag << endl;
    18. }
    19. void fatal(const char *func, int rv)
    20. {
    21. fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
    22. exit(1);
    23. }
    24. void recv_data_callback(void *arg)
    25. {
    26. }
    27. class Adapter
    28. {
    29. public:
    30. //初始化
    31. bool init()
    32. {
    33. //打开
    34. rv = nng_pair0_open(&sock);
    35. if (rv != 0)
    36. {
    37. fatal("nng_pair0_open", rv);
    38. }
    39. //设置缓冲区大小
    40. nng_socket_set_int(sock, NNG_OPT_SENDBUF, 2048);
    41. nng_socket_set_int(sock, NNG_OPT_RECVBUF, 2048);
    42. rv = nng_dial(sock, url.c_str(), &dialer, 0);
    43. if (rv != 0)
    44. {
    45. fatal("nng_dial", rv);
    46. }
    47. isInit = true;
    48. return isInit;
    49. }
    50. //开始接收
    51. void start()
    52. {
    53. if (!isInit)
    54. return;
    55. std::thread t([&]()
    56. {
    57. while (!isStop)
    58. {
    59. nng_msg * msg = NULL;
    60. char * json_str = NULL;
    61. nng_recvmsg(sock, &msg, 0);
    62. json_str = static_cast<char*>(nng_msg_body(msg));
    63. std::cout<<"nng_recvmsg "<<json_str<<std::endl;
    64. } });
    65. t.detach();
    66. }
    67. void stop()
    68. {
    69. isStop = true;
    70. cout << "stop " << isStop << endl;
    71. }
    72. void send(const std::string &msgStr)
    73. {
    74. if (!isInit)
    75. return;
    76. nng_msg *msg = NULL;
    77. nng_msg_alloc(&msg, sizeof(msgStr));
    78. memcpy(nng_msg_body(msg), msgStr.c_str(), sizeof(msgStr));
    79. nng_sendmsg(sock, msg, 0);
    80. }
    81. public:
    82. nng_socket sock;
    83. nng_dialer dialer;
    84. std::atomic<bool> isStop{false};
    85. private:
    86. std::string url{"ipc:///tmp/pair"};
    87. int rv;
    88. bool isInit{false};
    89. };
    90. int main(int argc, char *grgv[])
    91. {
    92. signal(SIGINT, sig_handler);
    93. signal(SIGTERM, sig_handler);
    94. signal(SIGABRT, sig_handler);
    95. Adapter adapter;
    96. if (adapter.init())
    97. {
    98. cout << "init success" << endl;
    99. }
    100. else
    101. {
    102. cout << "init failed" << endl;
    103. }
    104. adapter.start();
    105. while (!exit_flag)
    106. {
    107. adapter.send("How are you?");
    108. this_thread::sleep_for(seconds(1));
    109. }
    110. adapter.stop();
    111. return 0;
    112. }

     

    3,CMakeLists.txt  两端 基本一致

    1. cmake_minimum_required (VERSION 2.8.12)
    2. project(adapter)
    3. set(TARGET_NAME adapter)
    4. find_package(nng CONFIG REQUIRED)
    5. find_package(Threads)
    6. add_executable(${TARGET_NAME} adapter.cpp)
    7. target_link_libraries(${TARGET_NAME} nng::nng)
    8. target_compile_definitions(${TARGET_NAME} PRIVATE NNG_ELIDE_DEPRECATED)

  • 相关阅读:
    Excel自学三部曲_Part3:Excel工作场景实战(四)
    【迁移学习】迁移学习的基本概念与应用
    卷积运算与互相关运算
    浅学Go下的ssti
    [附源码]Python计算机毕业设计SSM交通事故记录信息管理系统(程序+LW)
    《RAPL: A Relation-Aware Prototype Learning Approach for Few-Shot Document-Level Relation Extraction》阅读笔记
    【Java】已解决java.nio.channels.ClosedChannelException异常
    SpringBoot第四课-Web开发
    Kubernetes rancher、prometheus、ELK的安装
    基于JAVA忆居民宿管理计算机毕业设计源码+系统+数据库+lw文档+部署
  • 原文地址:https://blog.csdn.net/weixin_38416696/article/details/127883212