• ZMQ/ZeroMQ的三种消息模式


    目录

    一、 Reuqest-Reply(请求-应答模式)

    二、Publisher-Subscriber(发布-订阅模式)

     三、Push-Pull(平行管道模式/分布式处理)


    一、 Reuqest-Reply(请求-应答模式)

            1、使用Request-Reply模式,需要遵循一定的规律。

            2、客户端必要先发送消息,在接收消息;服务端必须先进行接收客户端发送过来的消息,在发送应答给客户端,如此循环

            3、服务端和客户端谁先启动,效果都是一样的。

            4、服务端在收到消息之前,会一直阻塞,等待客户端连上来。

            创建一个客户端和服务端,客户端发送消息给服务端,服务端返回消息给客户端,客户端和服务器谁先启动都可以。

            server.cpp

    1. #include
    2. #include
    3. #include
    4. #ifndef _WIN32
    5. #include
    6. #else
    7. #include
    8. #define sleep(n) Sleep(n)
    9. #endif
    10. int main () {
    11. // Prepare our context and socket
    12. zmq::context_t context (2);
    13. zmq::socket_t socket (context, zmq::socket_type::rep);
    14. socket.bind ("tcp://*:5555");
    15. while (true) {
    16. zmq::message_t request;
    17. // Wait for next request from client
    18. socket.recv (request, zmq::recv_flags::none);
    19. std::cout << "Received Hello" << std::endl;
    20. // Do some 'work'
    21. sleep(1);
    22. // Send reply back to client
    23. zmq::message_t reply (5);
    24. memcpy (reply.data (), "World", 5);
    25. socket.send (reply, zmq::send_flags::none);
    26. }
    27. return 0;
    28. }

            client.cpp

    1. #include
    2. #include
    3. #include
    4. int main ()
    5. {
    6. // Prepare our context and socket
    7. zmq::context_t context (1);
    8. zmq::socket_t socket (context, zmq::socket_type::req);
    9. std::cout << "Connecting to hello world server..." << std::endl;
    10. socket.connect ("tcp://localhost:5555");
    11. // Do 10 requests, waiting each time for a response
    12. for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
    13. zmq::message_t request (5);
    14. memcpy (request.data (), "Hello", 5);
    15. std::cout << "Sending Hello " << request_nbr << "..." << std::endl;
    16. socket.send (request, zmq::send_flags::none);
    17. // Get the reply.
    18. zmq::message_t reply;
    19. socket.recv (reply, zmq::recv_flags::none);
    20. std::cout << "Received World " << request_nbr << std::endl;
    21. }
    22. return 0;
    23. }

    二、Publisher-Subscriber(发布-订阅模式)

            Publisher-Subscriber模式,消息是单向流动的,发布者只能发布消息,不能接受消息;订阅者只能接受消息,不能发送消息。

            服务端发布消息的过程中,如果有订阅者退出,不影响发布者继续发布消息,当订阅者再次连接上来,收到的消息是后来发布的消息

            比较晚加入的订阅者,或者中途离开的订阅者,必然会丢掉一部分信息

            如果发布者停止,所有的订阅者会阻塞,等发布者再次上线的时候回继续接受消息。

            "慢连接": 我们不知道订阅者是何时开始接受消息的,就算启动"订阅者",在启动"发布者", "订阅者"还是会缺失一部分的消息,因为建立连接是需要时间的,虽然时间很短,但不是零。ZMQ在后台是进行异步的IO传输,在建立TCP连接的短短的时间段内,ZMQ就可以发送很多消息了。

           

            publisher.cpp

    1. #include
    2. #include
    3. #include
    4. #include
    5. #if (defined (WIN32))
    6. #include
    7. #endif
    8. #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
    9. int main () {
    10. // Prepare our context and publisher
    11. zmq::context_t context (1);
    12. zmq::socket_t publisher (context, zmq::socket_type::pub);
    13. publisher.bind("tcp://*:5556");
    14. publisher.bind("ipc://weather.ipc"); // Not usable on Windows.
    15. // Initialize random number generator
    16. srandom ((unsigned) time (NULL));
    17. while (1) {
    18. int zipcode, temperature, relhumidity;
    19. // Get values that will fool the boss
    20. zipcode = within (100000);
    21. temperature = within (215) - 80;
    22. relhumidity = within (50) + 10;
    23. // Send message to all subscribers
    24. zmq::message_t message(20);
    25. snprintf ((char *) message.data(), 20 ,
    26. "%05d %d %d", zipcode, temperature, relhumidity);
    27. publisher.send(message, zmq::send_flags::none);
    28. }
    29. return 0;
    30. }

             subscriber.cpp

    1. #include
    2. #include
    3. #include
    4. int main (int argc, char *argv[])
    5. {
    6. zmq::context_t context (1);
    7. // Socket to talk to server
    8. std::cout << "Collecting updates from weather server...\n" << std::endl;
    9. zmq::socket_t subscriber (context, zmq::socket_type::sub);
    10. subscriber.connect("tcp://localhost:5556");
    11. // Subscribe to zipcode, default is NYC, 10001
    12. const char *filter = (argc > 1)? argv [1]: "10001 ";
    13. subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
    14. // Process 100 updates
    15. int update_nbr;
    16. long total_temp = 0;
    17. for (update_nbr = 0; update_nbr < 100; update_nbr++) {
    18. zmq::message_t update;
    19. int zipcode, temperature, relhumidity;
    20. subscriber.recv(update, zmq::recv_flags::none);
    21. std::istringstream iss(static_cast<char*>(update.data()));
    22. iss >> zipcode >> temperature >> relhumidity ;
    23. total_temp += temperature;
    24. }
    25. std::cout << "Average temperature for zipcode '"<< filter
    26. <<"' was "<<(int) (total_temp / update_nbr) <<"F"
    27. << std::endl;
    28. return 0;
    29. }

    三、Push-Pull(平行管道模式/分布式处理)

            1、Ventilator:任务发布器会生成大量可以并行运算的任务。

            2、Worker:有一组worker会处理这些任务。

            3、Sink:结果接收器会在末端接收所有的Worker的处理结果,进行汇总。

            4、Worker上游和"任务发布器"相连,下游和"结果接收器"相连。

            5、"任务发布器" 和 "结果接收器"是这个网路结构中比较稳定的部分,由他们绑定至端点。

            6、Worker只是连接两个端点。

            7、需要等Worker全部启动后,在进行任务分发。Socket的连接会消耗一定时间(慢连接), 如果不尽兴同步的话,第一个Worker启动。

            8、会一下子接收很多任务。

            9、"任务分发器" 会向Worker均匀的分发任务(负载均衡机制)。

            10、"结果接收器" 会均匀地从Worker处收集消息(公平队列机制)。

             taskvent.cpp

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
    7. int main (int argc, char *argv[])
    8. {
    9. zmq::context_t context (1);
    10. // Socket to send messages on
    11. zmq::socket_t sender(context, ZMQ_PUSH);
    12. sender.bind("tcp://*:5557");
    13. std::cout << "Press Enter when the workers are ready: " << std::endl;
    14. getchar ();
    15. std::cout << "Sending tasks to workers...\n" << std::endl;
    16. // The first message is "0" and signals start of batch
    17. zmq::socket_t sink(context, ZMQ_PUSH);
    18. sink.connect("tcp://localhost:5558");
    19. zmq::message_t message(2);
    20. memcpy(message.data(), "0", 1);
    21. sink.send(message);
    22. // Initialize random number generator
    23. srandom ((unsigned) time (NULL));
    24. // Send 100 tasks
    25. int task_nbr;
    26. int total_msec = 0; // Total expected cost in msecs
    27. for (task_nbr = 0; task_nbr < 100; task_nbr++) {
    28. int workload;
    29. // Random workload from 1 to 100msecs
    30. workload = within (100) + 1;
    31. total_msec += workload;
    32. message.rebuild(10);
    33. memset(message.data(), '\0', 10);
    34. sprintf ((char *) message.data(), "%d", workload);
    35. sender.send(message);
    36. }
    37. std::cout << "Total expected cost: " << total_msec << " msec" << std::endl;
    38. sleep (1); // Give 0MQ time to deliver
    39. return 0;
    40. }

             taskwork.cpp

    1. #include "zhelpers.hpp"
    2. #include
    3. int main (int argc, char *argv[])
    4. {
    5. zmq::context_t context(1);
    6. // Socket to receive messages on
    7. zmq::socket_t receiver(context, ZMQ_PULL);
    8. receiver.connect("tcp://localhost:5557");
    9. // Socket to send messages to
    10. zmq::socket_t sender(context, ZMQ_PUSH);
    11. sender.connect("tcp://localhost:5558");
    12. // Process tasks forever
    13. while (1) {
    14. zmq::message_t message;
    15. int workload; // Workload in msecs
    16. receiver.recv(&message);
    17. std::string smessage(static_cast<char*>(message.data()), message.size());
    18. std::istringstream iss(smessage);
    19. iss >> workload;
    20. // Do the work
    21. s_sleep(workload);
    22. // Send results to sink
    23. message.rebuild();
    24. sender.send(message);
    25. // Simple progress indicator for the viewer
    26. std::cout << "." << std::flush;
    27. }
    28. return 0;
    29. }

            tasksink.cpp

    1. #include
    2. #include
    3. #include
    4. #include
    5. int main (int argc, char *argv[])
    6. {
    7. // Prepare our context and socket
    8. zmq::context_t context(1);
    9. zmq::socket_t receiver(context,ZMQ_PULL);
    10. receiver.bind("tcp://*:5558");
    11. // Wait for start of batch
    12. zmq::message_t message;
    13. receiver.recv(&message);
    14. // Start our clock now
    15. struct timeval tstart;
    16. gettimeofday (&tstart, NULL);
    17. // Process 100 confirmations
    18. int task_nbr;
    19. int total_msec = 0; // Total calculated cost in msecs
    20. for (task_nbr = 0; task_nbr < 100; task_nbr++) {
    21. receiver.recv(&message);
    22. if (task_nbr % 10 == 0)
    23. std::cout << ":" << std::flush;
    24. else
    25. std::cout << "." << std::flush;
    26. }
    27. // Calculate and report duration of batch
    28. struct timeval tend, tdiff;
    29. gettimeofday (&tend, NULL);
    30. if (tend.tv_usec < tstart.tv_usec) {
    31. tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;
    32. tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;
    33. }
    34. else {
    35. tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;
    36. tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;
    37. }
    38. total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;
    39. std::cout << "\nTotal elapsed time: " << total_msec << " msec\n" << std::endl;
    40. return 0;
    41. }

     

  • 相关阅读:
    [项目设计] 从零实现的高并发内存池(四)
    UE4 死活打不开工程,卡在91%就闪退了
    为报复老东家,程序员编码给自己转账553笔,金额超21万元
    Git学习笔记4
    从零开始了解协同OA办公系统,一篇文章就够了!
    springboot整合kettle和xxljob
    PIL Image格式转Tensor
    zookeeper-3.6.4集群搭建
    SpringBoot使用@Async实现异步调用
    mall商城项目:只启动mall-admin情况下Windows环境的部署
  • 原文地址:https://blog.csdn.net/code_lyb/article/details/127984185