一,利用NNG pair模式,实现异步通信。
二,manager端 绑定地址,回调函数里 接收 异步消息:
- #include <stdint.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <time.h>
-
- #include <nng/nng.h>
- #include <nng/protocol/pair0/pair.h>
- #include <nng/supplemental/util/platform.h>
-
- #include <iostream>
- #include <thread>
- #include <chrono>
- #include <atomic>
- #include <signal.h>
- #include <sys/wait.h>
-
- using namespace std;
- using namespace std::chrono;
-
- static bool exit_flag = false;
-
- void recv_data_callback(void *arg);
- static void sig_handler(int sig)
- {
- exit_flag = true;
- std::cout << "sig_handler " << exit_flag << endl;
- }
-
- void fatal(const char *func, int rv)
- {
- fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
- exit(1);
- }
-
- class Manager
- {
- public:
- //初始化
- bool init()
- {
- //创建io 并绑定回调函数
- rv = nng_aio_alloc(&aio, recv_data_callback, this);
- if (rv < 0)
- {
- fatal("cannot allocate aio", rv);
- }
-
- //打开
- rv = nng_pair0_open(&sock);
- if (rv != 0)
- {
- fatal("nng_pair0_open", rv);
- }
-
- //设置缓冲区大小
- nng_socket_set_int(sock, NNG_OPT_SENDBUF, 2048);
- nng_socket_set_int(sock, NNG_OPT_RECVBUF, 2048);
-
- //开始监听
- if ((rv = nng_listen(sock, url.c_str(), NULL, 0)) != 0)
- {
- fatal("nng_listen", rv);
- }
- nng_recv_aio(sock, aio);
-
- isInit = true;
- return isInit;
- }
-
- //发送数据
- void send(const std::string &msgStr)
- {
- if (!isInit)
- return;
-
- if (!isInit)
- return;
-
- nng_msg *msg = NULL;
- nng_msg_alloc(&msg, sizeof(msgStr));
- memcpy(nng_msg_body(msg), msgStr.c_str(), sizeof(msgStr));
-
- nng_sendmsg(sock, msg, 0);
- }
-
- public:
- nng_socket sock;
- nng_aio *aio{nullptr};
-
- private:
- int rv;
- std::string url{"ipc:///tmp/pair"};
- bool isInit{false};
- };
-
- void recv_data_callback(void *arg)
- {
- int rv = 0;
- Manager *manager = static_cast<Manager*>(arg);
- nng_msg *msg = NULL;
- size_t json_len = 0;
- char * json_str = NULL;
-
- rv = nng_aio_result(manager->aio);
- if (0 != rv) {
- fatal("nng_recv error ", rv);
- }
-
- msg = nng_aio_get_msg(manager->aio);
- json_str = static_cast<char*>(nng_msg_body(msg));
- json_len = nng_msg_len(msg);
-
- std::cout<<"recv_data_callback "<<json_str<<std::endl;
-
- nng_msg_free(msg);
- nng_recv_aio(manager->sock, manager->aio);
- }
-
- int main(int argc, char *grgv[])
- {
- signal(SIGINT, sig_handler);
- signal(SIGTERM, sig_handler);
- signal(SIGABRT, sig_handler);
-
- Manager manager;
- if (manager.init())
- {
- cout << "init success" << endl;
- }
- else
- {
- cout << "init failed" << endl;
- }
-
- while (!exit_flag)
- {
- manager.send("Not bad");
- this_thread::sleep_for(seconds(1));
- }
- return 0;
- }
三,adapter 端,同步发送数据,单开一个线程 进行数据的轮询接收。
-
- #include <nng/nng.h>
- #include <nng/protocol/pair0/pair.h>
- #include <nng/supplemental/util/platform.h>
-
- #include <iostream>
- #include <thread>
- #include <chrono>
- #include <atomic>
- #include <signal.h>
- #include <sys/wait.h>
- #include <string.h>
-
- using namespace std;
- using namespace std::chrono;
-
- static bool exit_flag = false;
-
- static void sig_handler(int sig)
- {
- exit_flag = true;
- std::cout << "sig_handler " << exit_flag << endl;
- }
-
- void fatal(const char *func, int rv)
- {
- fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
- exit(1);
- }
-
- void recv_data_callback(void *arg)
- {
- }
-
- class Adapter
- {
- public:
- //初始化
- bool init()
- {
- //打开
- rv = nng_pair0_open(&sock);
- if (rv != 0)
- {
- fatal("nng_pair0_open", rv);
- }
-
- //设置缓冲区大小
- nng_socket_set_int(sock, NNG_OPT_SENDBUF, 2048);
- nng_socket_set_int(sock, NNG_OPT_RECVBUF, 2048);
-
- rv = nng_dial(sock, url.c_str(), &dialer, 0);
- if (rv != 0)
- {
- fatal("nng_dial", rv);
- }
-
- isInit = true;
- return isInit;
- }
-
- //开始接收
- void start()
- {
- if (!isInit)
- return;
- std::thread t([&]()
- {
- while (!isStop)
- {
- nng_msg * msg = NULL;
- char * json_str = NULL;
- nng_recvmsg(sock, &msg, 0);
- json_str = static_cast<char*>(nng_msg_body(msg));
- std::cout<<"nng_recvmsg "<<json_str<<std::endl;
- } });
- t.detach();
- }
-
- void stop()
- {
- isStop = true;
- cout << "stop " << isStop << endl;
- }
-
- void send(const std::string &msgStr)
- {
- if (!isInit)
- return;
-
- nng_msg *msg = NULL;
- nng_msg_alloc(&msg, sizeof(msgStr));
- memcpy(nng_msg_body(msg), msgStr.c_str(), sizeof(msgStr));
-
- nng_sendmsg(sock, msg, 0);
- }
-
- public:
- nng_socket sock;
- nng_dialer dialer;
- std::atomic<bool> isStop{false};
-
- private:
- std::string url{"ipc:///tmp/pair"};
- int rv;
- bool isInit{false};
- };
-
- int main(int argc, char *grgv[])
- {
- signal(SIGINT, sig_handler);
- signal(SIGTERM, sig_handler);
- signal(SIGABRT, sig_handler);
-
- Adapter adapter;
- if (adapter.init())
- {
- cout << "init success" << endl;
- }
- else
- {
- cout << "init failed" << endl;
- }
- adapter.start();
-
- while (!exit_flag)
- {
- adapter.send("How are you?");
- this_thread::sleep_for(seconds(1));
- }
- adapter.stop();
- return 0;
- }

3,CMakeLists.txt 两端 基本一致
- cmake_minimum_required (VERSION 2.8.12)
- project(adapter)
- set(TARGET_NAME adapter)
-
- find_package(nng CONFIG REQUIRED)
-
- find_package(Threads)
-
- add_executable(${TARGET_NAME} adapter.cpp)
- target_link_libraries(${TARGET_NAME} nng::nng)
- target_compile_definitions(${TARGET_NAME} PRIVATE NNG_ELIDE_DEPRECATED)