使用ZMQ进行多线程编程(MT编程)将会是一种享受。在多线程中使用ZMQ套接字时,你不需要考虑额外的东西,让它们自如地运作就好。
使用ZMQ进行多线程编程时,不需要考虑互斥、锁、或其他并发程序中要考虑的因素,你唯一要关心的仅仅是线程之间的消息。
什么叫“完美”的多线程编程,指的是代码易写易读,可以跨系统、跨语言地使用同一种技术,能够在任意颗核心的计算机上运行,没有状态,没有速度的瓶颈。
如果你有多年的多线程编程经验,知道如何使用锁、信号灯、临界区等机制来使代码运行得正确(尚未考虑快速),那你可能会很沮丧,因为ZMQ将改变这一切。三十多年来,并发式应用程序开发所总结的经验是:不要共享状态。这就好比两个醉汉想要分享一杯啤酒,如果他们不是铁哥们儿,那他们很快就会打起来。当有更多的醉汉加入时,情况就会更糟。多线程编程有时就像醉汉抢夺啤酒那样糟糕。
进行多线程编程往往是痛苦的,当程序因为压力过大而崩溃时,你会不知所然。有人写过一篇《多线程代码中的11个错误易发点》的文章,在大公司中广为流传,列举其中的几项:没有进行同步、错误的粒度、读写分离、无锁排序、锁传递、优先级冲突等。
假设某一天的下午三点,当证券市场正交易得如火如荼的时候,突然之间,应用程序因为锁的问题崩溃了,那将会是何等的场景?所以,作为程序员的我们,为解决那些复杂的多线程问题,只能用上更复杂的编程机制。
有人曾这样比喻,那些多线程程序原本应作为大型公司的核心支柱,但往往又最容易出错;那些想要通过网络不断进行延伸的产品,最后总以失败告终。
如何用ZMQ进行多线程编程,以下是一些规则:
1、不要在不同的线程之间访问同一份数据,如果要用到传统编程中的互斥机制,那就有违ZMQ的思想了。唯一的例外是ZMQ上下文对象,它是线程安全的。
2、必须为进程创建ZMQ上下文,并将其传递给所有你需要使用inproc协议进行通信的线程;
3、你可以将线程作为单独的任务来对待,使用自己的上下文,但是这些线程之间就不能使用inproc协议进行通信了。这样做的好处是可以在日后方便地将程序拆分为不同的进程来运行。
4、不要在不同的线程之间传递套接字对象,这些对象不是线程安全的。从技术上来说,你是可以这样做的,但是会用到互斥和锁的机制,这会让你的应用程序变得缓慢和脆弱。唯一合理的情形是,在某些语言的ZMQ类库内部,需要使用垃圾回收机制,这时可能会进行套接字对象的传递。
当你需要在应用程序中使用两个装置时,可能会将套接字对象从一个线程传递给另一个线程,这样做一开始可能会成功,但最后一定会随机地发生错误。所以说,应在同一个线程中打开和关闭套接字。
如果你能遵循上面的规则,就会发现多线程程序可以很容易地拆分成多个进程。程序逻辑可以在线程、进程、或是计算机中运行,根据你的需求进行部署即可。
ZMQ使用的是系统原生的线程机制,而不是某种“绿色线程”。这样做的好处是你不需要学习新的多线程编程API,而且可以和目标操作系统进行很好的结合。你可以使用类似英特尔的ThreadChecker工具来查看线程工作的情况。缺点在于,如果程序创建了太多的线程(如上千个),则可能导致操作系统负载过高。
下面我们举一个实例,让原来的Hello World服务变得更为强大。原来的服务是单线程的,如果请求较少,自然没有问题。ZMQ的线程可以在一个核心上高速地运行,执行大量的工作。但是,如果有一万次请求同时发送过来会怎么样?因此,现实环境中,我们会启动多个worker线程,他们会尽可能地接收客户端请求,处理并返回应答。
当然,我们可以使用启动多个worker进程的方式来实现,但是启动一个进程总比启动多个进程要来的方便且易于管理。而且,作为线程启动的worker,所占用的带宽会比较少,延迟也会较低。
以下是多线程版的Hello World服务:
mtserver: Multithreaded service in C
- #include "zhelpers.h"
- #include
-
- static void *
- worker_routine (void *context) {
- // 连接至代理的套接字
- void *receiver = zmq_socket (context, ZMQ_REP);
- zmq_connect (receiver, "inproc://workers");
-
- while (1) {
- char *string = s_recv (receiver);
- printf ("Received request: [%s]\n", string);
- free (string);
- // 工作
- sleep (1);
- // 返回应答
- s_send (receiver, "World");
- }
- zmq_close (receiver);
- return NULL;
- }
-
- int main (void)
- {
- void *context = zmq_init (1);
-
- // 用于和client进行通信的套接字
- void *clients = zmq_socket (context, ZMQ_ROUTER);
- zmq_bind (clients, "tcp://*:5555");
-
- // 用于和worker进行通信的套接字
- void *workers = zmq_socket (context, ZMQ_DEALER);
- zmq_bind (workers, "inproc://workers");
-
- // 启动一个worker池
- int thread_nbr;
- for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
- pthread_t worker;
- pthread_create (&worker, NULL, worker_routine, context);
- }
- // 启动队列装置
- zmq_device (ZMQ_QUEUE, clients, workers);
-
- // 程序不会运行到这里,但仍进行清理工作
- zmq_close (clients);
- zmq_close (workers);
- zmq_term (context);
- return 0;
- }
所有的代码应该都已经很熟悉了:
1、服务端启动一组worker线程,每个worker创建一个REP套接字,并处理收到的请求。worker线程就像是一个单线程的服务,唯一的区别是使用了inproc而非tcp协议,以及绑定-连接的方向调换了。
2、服务端创建ROUTER套接字用以和client通信,因此提供了一个TCP协议的外部接口。
3、服务端创建DEALER套接字用以和worker通信,使用了内部接口(inproc)。
4、服务端启动了QUEUE内部装置,连接两个端点上的套接字。QUEUE装置会将收到的请求分发给连接上的worker,并将应答路由给请求方。
需要注意的是,在某些编程语言中,创建线程并不是特别方便,POSIX提供的类库是pthreads,但Windows中就需要使用不同的API了。我们会在第三章中讲述如何包装一个多线程编程的API。
示例中的“工作”仅仅是1秒钟的停留,我们可以在worker中进行任意的操作,包括与其他节点进行通信。消息的流向是这样的:REQ-ROUTER-queue-DEALER-REP。
当你刚开始使用ZMQ进行多线程编程时,你可能会问:要如何协调两个线程的工作呢?可能会想要使用sleep()这样的方法,或者使用诸如信号、互斥等机制。事实上,你唯一要用的就是ZMQ本身。回忆一下那个醉汉抢啤酒的例子吧。
下面的示例演示了三个线程之间需要如何进行同步:
我们使用PAIR套接字和inproc协议。
mtrelay: Multithreaded relay in C
- #include "zhelpers.h"
- #include
-
- static void *
- step1 (void *context) {
- // 连接至步骤2,告知我已就绪
- void *xmitter = zmq_socket (context, ZMQ_PAIR);
- zmq_connect (xmitter, "inproc://step2");
- printf ("步骤1就绪,正在通知步骤2……\n");
- s_send (xmitter, "READY");
- zmq_close (xmitter);
-
- return NULL;
- }
-
- static void *
- step2 (void *context) {
- // 启动步骤1前先绑定至inproc套接字
- void *receiver = zmq_socket (context, ZMQ_PAIR);
- zmq_bind (receiver, "inproc://step2");
- pthread_t thread;
- pthread_create (&thread, NULL, step1, context);
-
- // 等待信号
- char *string = s_recv (receiver);
- free (string);
- zmq_close (receiver);
-
- // 连接至步骤3,告知我已就绪
- void *xmitter = zmq_socket (context, ZMQ_PAIR);
- zmq_connect (xmitter, "inproc://step3");
- printf ("步骤2就绪,正在通知步骤3……\n");
- s_send (xmitter, "READY");
- zmq_close (xmitter);
-
- return NULL;
- }
-
- int main (void)
- {
- void *context = zmq_init (1);
-
- // 启动步骤2前先绑定至inproc套接字
- void *receiver = zmq_socket (context, ZMQ_PAIR);
- zmq_bind (receiver, "inproc://step3");
- pthread_t thread;
- pthread_create (&thread, NULL, step2, context);
-
- // 等待信号
- char *string = s_recv (receiver);
- free (string);
- zmq_close (receiver);
-
- printf ("测试成功!\n");
- zmq_term (context);
- return 0;
- }
这是一个ZMQ多线程编程的典型示例:
1、两个线程通过inproc协议进行通信,使用同一个上下文。
2、父线程创建一个套接字,绑定至inproc://端点,然后再启动子线程,将上下文对象传递给它。
3、子线程创建第二个套接字,连接至inproc://端点,然后发送已就绪信号给父线程。
需要注意的是,这段代码无法扩展到多个进程之间的协调。如果你使用inproc协议,只能建立结构非常紧密的应用程序。在延迟时间必须严格控制的情况下可以使用这种方法。对其他应用程序来说,每个线程使用同一个上下文,协议选用ipc或tcp。然后,你就可以自由地将应用程序拆分为多个进程甚至是多台计算机了。
这是我们第一次使用PAIR套接字。为什么要使用PAIR?其他类型的套接字也可以使用,但都有一些缺点会影响到线程间的通信:
1、你可以让信号发送方使用PUSH,接收方使用PULL,这看上去可能可以,但是需要注意的是,PUSH套接字发送消息时会进行负载均衡,如果你不小心开启了两个接收方,就会“丢失”一半的信号。而PAIR套接字建立的是一对一的连接,具有排他性。
2、可以让发送方使用DEALER,接收方使用ROUTER。但是,ROUTER套接字会在消息的外层包裹一个来源地址,这样一来原本零字节的信号就可能要成为一个多段消息了。如果你不在乎这个问题,并且不会重复读取那个套接字,自然可以使用这种方法。但是,如果你想要使用这个套接字接收真正的数据,你就会发现ROUTER提供的消息是错误的。至于DEALER套接字,它同样有负载均衡的机制,和PUSH套接字有相同的风险。
3、可以让发送方使用PUB,接收方使用SUB。一来消息可以照原样发送,二来PUB套接字不会进行负载均衡。但是,你需要对SUB套接字设置一个空的订阅信息(用以接收所有消息);而且,如果SUB套接字没有及时和PUB建立连接,消息很有可能会丢失。
综上,使用PAIR套接字进行线程间的协调是最合适的。