• ZMQ之多线程编程


            使用ZMQ进行多线程编程(MT编程)将会是一种享受。在多线程中使用ZMQ套接字时,你不需要考虑额外的东西,让它们自如地运作就好。

            使用ZMQ进行多线程编程时,不需要考虑互斥、锁、或其他并发程序中要考虑的因素,你唯一要关心的仅仅是线程之间的消息

            什么叫“完美”的多线程编程,指的是代码易写易读,可以跨系统、跨语言地使用同一种技术,能够在任意颗核心的计算机上运行,没有状态,没有速度的瓶颈。

            如果你有多年的多线程编程经验,知道如何使用锁、信号灯、临界区等机制来使代码运行得正确(尚未考虑快速),那你可能会很沮丧,因为ZMQ将改变这一切。三十多年来,并发式应用程序开发所总结的经验是:不要共享状态。这就好比两个醉汉想要分享一杯啤酒,如果他们不是铁哥们儿,那他们很快就会打起来。当有更多的醉汉加入时,情况就会更糟。多线程编程有时就像醉汉抢夺啤酒那样糟糕。

            进行多线程编程往往是痛苦的,当程序因为压力过大而崩溃时,你会不知所然。有人写过一篇《多线程代码中的11个错误易发点》的文章,在大公司中广为流传,列举其中的几项:没有进行同步、错误的粒度、读写分离、无锁排序、锁传递、优先级冲突等。

            假设某一天的下午三点,当证券市场正交易得如火如荼的时候,突然之间,应用程序因为锁的问题崩溃了,那将会是何等的场景?所以,作为程序员的我们,为解决那些复杂的多线程问题,只能用上更复杂的编程机制。

            有人曾这样比喻,那些多线程程序原本应作为大型公司的核心支柱,但往往又最容易出错;那些想要通过网络不断进行延伸的产品,最后总以失败告终。

            如何用ZMQ进行多线程编程,以下是一些规则:

                    1、不要在不同的线程之间访问同一份数据,如果要用到传统编程中的互斥机制,那就有违ZMQ的思想了。唯一的例外是ZMQ上下文对象,它是线程安全的。

                    2、必须为进程创建ZMQ上下文,并将其传递给所有你需要使用inproc协议进行通信的线程;

                    3、你可以将线程作为单独的任务来对待,使用自己的上下文,但是这些线程之间就不能使用inproc协议进行通信了。这样做的好处是可以在日后方便地将程序拆分为不同的进程来运行。

                    4、不要在不同的线程之间传递套接字对象,这些对象不是线程安全的。从技术上来说,你是可以这样做的,但是会用到互斥和锁的机制,这会让你的应用程序变得缓慢和脆弱。唯一合理的情形是,在某些语言的ZMQ类库内部,需要使用垃圾回收机制,这时可能会进行套接字对象的传递。

            当你需要在应用程序中使用两个装置时,可能会将套接字对象从一个线程传递给另一个线程,这样做一开始可能会成功,但最后一定会随机地发生错误。所以说,应在同一个线程中打开和关闭套接字。

            如果你能遵循上面的规则,就会发现多线程程序可以很容易地拆分成多个进程。程序逻辑可以在线程、进程、或是计算机中运行,根据你的需求进行部署即可。

            ZMQ使用的是系统原生的线程机制,而不是某种“绿色线程”。这样做的好处是你不需要学习新的多线程编程API,而且可以和目标操作系统进行很好的结合。你可以使用类似英特尔的ThreadChecker工具来查看线程工作的情况。缺点在于,如果程序创建了太多的线程(如上千个),则可能导致操作系统负载过高。

            下面我们举一个实例,让原来的Hello World服务变得更为强大。原来的服务是单线程的,如果请求较少,自然没有问题。ZMQ的线程可以在一个核心上高速地运行,执行大量的工作。但是,如果有一万次请求同时发送过来会怎么样?因此,现实环境中,我们会启动多个worker线程,他们会尽可能地接收客户端请求,处理并返回应答。

            当然,我们可以使用启动多个worker进程的方式来实现,但是启动一个进程总比启动多个进程要来的方便且易于管理。而且,作为线程启动的worker,所占用的带宽会比较少,延迟也会较低。
            以下是多线程版的Hello World服务:

            mtserver: Multithreaded service in C

    1. #include "zhelpers.h"
    2. #include
    3. static void *
    4. worker_routine (void *context) {
    5. // 连接至代理的套接字
    6. void *receiver = zmq_socket (context, ZMQ_REP);
    7. zmq_connect (receiver, "inproc://workers");
    8. while (1) {
    9. char *string = s_recv (receiver);
    10. printf ("Received request: [%s]\n", string);
    11. free (string);
    12. // 工作
    13. sleep (1);
    14. // 返回应答
    15. s_send (receiver, "World");
    16. }
    17. zmq_close (receiver);
    18. return NULL;
    19. }
    20. int main (void)
    21. {
    22. void *context = zmq_init (1);
    23. // 用于和client进行通信的套接字
    24. void *clients = zmq_socket (context, ZMQ_ROUTER);
    25. zmq_bind (clients, "tcp://*:5555");
    26. // 用于和worker进行通信的套接字
    27. void *workers = zmq_socket (context, ZMQ_DEALER);
    28. zmq_bind (workers, "inproc://workers");
    29. // 启动一个worker池
    30. int thread_nbr;
    31. for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
    32. pthread_t worker;
    33. pthread_create (&worker, NULL, worker_routine, context);
    34. }
    35. // 启动队列装置
    36. zmq_device (ZMQ_QUEUE, clients, workers);
    37. // 程序不会运行到这里,但仍进行清理工作
    38. zmq_close (clients);
    39. zmq_close (workers);
    40. zmq_term (context);
    41. return 0;
    42. }

            所有的代码应该都已经很熟悉了:

                    1、服务端启动一组worker线程,每个worker创建一个REP套接字,并处理收到的请求。worker线程就像是一个单线程的服务,唯一的区别是使用了inproc而非tcp协议,以及绑定-连接的方向调换了。

                    2、服务端创建ROUTER套接字用以和client通信,因此提供了一个TCP协议的外部接口。

                    3、服务端创建DEALER套接字用以和worker通信,使用了内部接口(inproc)。

                    4、服务端启动了QUEUE内部装置,连接两个端点上的套接字。QUEUE装置会将收到的请求分发给连接上的worker,并将应答路由给请求方。

            需要注意的是,在某些编程语言中,创建线程并不是特别方便,POSIX提供的类库是pthreads,但Windows中就需要使用不同的API了。我们会在第三章中讲述如何包装一个多线程编程的API。

            示例中的“工作”仅仅是1秒钟的停留,我们可以在worker中进行任意的操作,包括与其他节点进行通信。消息的流向是这样的:REQ-ROUTER-queue-DEALER-REP。

    线程间的信号传输

            当你刚开始使用ZMQ进行多线程编程时,你可能会问:要如何协调两个线程的工作呢?可能会想要使用sleep()这样的方法,或者使用诸如信号、互斥等机制。事实上,你唯一要用的就是ZMQ本身。回忆一下那个醉汉抢啤酒的例子吧。

            下面的示例演示了三个线程之间需要如何进行同步:

            我们使用PAIR套接字和inproc协议。

            mtrelay: Multithreaded relay in C

    1. #include "zhelpers.h"
    2. #include
    3. static void *
    4. step1 (void *context) {
    5. // 连接至步骤2,告知我已就绪
    6. void *xmitter = zmq_socket (context, ZMQ_PAIR);
    7. zmq_connect (xmitter, "inproc://step2");
    8. printf ("步骤1就绪,正在通知步骤2……\n");
    9. s_send (xmitter, "READY");
    10. zmq_close (xmitter);
    11. return NULL;
    12. }
    13. static void *
    14. step2 (void *context) {
    15. // 启动步骤1前先绑定至inproc套接字
    16. void *receiver = zmq_socket (context, ZMQ_PAIR);
    17. zmq_bind (receiver, "inproc://step2");
    18. pthread_t thread;
    19. pthread_create (&thread, NULL, step1, context);
    20. // 等待信号
    21. char *string = s_recv (receiver);
    22. free (string);
    23. zmq_close (receiver);
    24. // 连接至步骤3,告知我已就绪
    25. void *xmitter = zmq_socket (context, ZMQ_PAIR);
    26. zmq_connect (xmitter, "inproc://step3");
    27. printf ("步骤2就绪,正在通知步骤3……\n");
    28. s_send (xmitter, "READY");
    29. zmq_close (xmitter);
    30. return NULL;
    31. }
    32. int main (void)
    33. {
    34. void *context = zmq_init (1);
    35. // 启动步骤2前先绑定至inproc套接字
    36. void *receiver = zmq_socket (context, ZMQ_PAIR);
    37. zmq_bind (receiver, "inproc://step3");
    38. pthread_t thread;
    39. pthread_create (&thread, NULL, step2, context);
    40. // 等待信号
    41. char *string = s_recv (receiver);
    42. free (string);
    43. zmq_close (receiver);
    44. printf ("测试成功!\n");
    45. zmq_term (context);
    46. return 0;
    47. }

            这是一个ZMQ多线程编程的典型示例:

                    1、两个线程通过inproc协议进行通信,使用同一个上下文。

                    2、父线程创建一个套接字,绑定至inproc://端点,然后再启动子线程,将上下文对象传递给它。

                    3、子线程创建第二个套接字,连接至inproc://端点,然后发送已就绪信号给父线程。

            需要注意的是,这段代码无法扩展到多个进程之间的协调。如果你使用inproc协议,只能建立结构非常紧密的应用程序。在延迟时间必须严格控制的情况下可以使用这种方法。对其他应用程序来说,每个线程使用同一个上下文,协议选用ipc或tcp。然后,你就可以自由地将应用程序拆分为多个进程甚至是多台计算机了。

            这是我们第一次使用PAIR套接字。为什么要使用PAIR?其他类型的套接字也可以使用,但都有一些缺点会影响到线程间的通信:

                    1、你可以让信号发送方使用PUSH,接收方使用PULL,这看上去可能可以,但是需要注意的是,PUSH套接字发送消息时会进行负载均衡,如果你不小心开启了两个接收方,就会“丢失”一半的信号。而PAIR套接字建立的是一对一的连接,具有排他性。

                    2、可以让发送方使用DEALER,接收方使用ROUTER。但是,ROUTER套接字会在消息的外层包裹一个来源地址,这样一来原本零字节的信号就可能要成为一个多段消息了。如果你不在乎这个问题,并且不会重复读取那个套接字,自然可以使用这种方法。但是,如果你想要使用这个套接字接收真正的数据,你就会发现ROUTER提供的消息是错误的。至于DEALER套接字,它同样有负载均衡的机制,和PUSH套接字有相同的风险。

                    3、可以让发送方使用PUB,接收方使用SUB。一来消息可以照原样发送,二来PUB套接字不会进行负载均衡。但是,你需要对SUB套接字设置一个空的订阅信息(用以接收所有消息);而且,如果SUB套接字没有及时和PUB建立连接,消息很有可能会丢失。

            综上,使用PAIR套接字进行线程间的协调是最合适的。

  • 相关阅读:
    frp内网穿透实战
    C++ STL 【priority_queue】
    EasyCode全自动单表增删改查!
    python自动化测试平台开发:自动化测试平台简介
    Spring AOP如何使用AspectJ注解进行开发呢?
    艾美捷专有脂质SM-102说明书
    一篇文章带你了解轻量级Web服务器——Nginx简单入门
    微信小程序的开发---tabBar的介绍
    Ambire & KuCoin 发起 ADX 交易竞赛,提供价值 50,000 美元的奖池
    自动化测试面试常见技术题目
  • 原文地址:https://blog.csdn.net/code_lyb/article/details/128116085