• ZMQ协议


    一、ZMQ特点

    普通的socket是端对端的关系,ZMQ是N:M的关系,socket的连接需要显式地建立连接,销毁连接,选择协议(TCP/UDP)和错误处理,ZMQ屏蔽了这些细节,像是一个封装了的socket库,让网络编程变得更简单。ZMQ不光用于主机与主机之间的socket通信,还可以是线程和进程之间的通信。ZMQ提供的套接字可以在多种协议中传输消息,线程间,进程间,TCP等。可以使用套接字创建多种消息模式,如‘请求-应答模式’,‘发布-订阅模式’,‘分布式模式’等。

    组件来去自如,ZQM会负责自动重连,服务端和客户端可以随意的退出网络。tcp的话,必须先服务端启动,再启动客户端,否则会报错。

    • ZMQ会在必要的情况下将消息放入队列中保存,一旦建立了连接就开始发送。
    • ZMQ有阈值机制,当队列满的时候,可以自动阻塞发送者,或者丢弃部分消息。
    • ZMQ可以使用不同的通信协议进行连接,TCP,进程间,线程间。
    • ZMQ提供了多种模式进行消息路由。如请求-应答模式,发布-订阅模式等,这些模式可以用来搭建网络拓扑结构。
    • ZMQ会在后台线程异步的处理I/O操作,他使用一种不会死锁的数据结构来存储消息。

    二、ZMQ消息模式

    1、Reuqest-Reply(请求-应答模式)

    • 使用Request-Reply模式,需要遵循一定的规律。
    • 客户端必要先发送消息,在接收消息;服务端必须先进行接收客户端发送过来的消息,在发送应答给客户端,如此循环
    • 服务端和客户端谁先启动,效果都是一样的。
    • 服务端在收到消息之前,会一直阻塞,等待客户端连上来。

    server.py

    import zmq
    import time
     
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:5555")
    count = 0
     
    # 必须要先接收消息,然后再应答
    if __name__ == '__main__':
        print('zmq server start....')
        while True:
            message = socket.recv().decode("UTF-8")
            count += 1
            print(f'received request. message:{message}; count:{count}')
            time.sleep(1)
            socket.send_string("world!")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    client.py

    import zmq
     
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
     
    # 客户端必须要先发送消息,然后在接收消息
    if __name__ == '__main__':
        print('zmq client start....')
        for i in range(1, 10):
            socket.send_string("hello")
            message = socket.recv().decode("UTF-8")
            print(f'received response. message:{message}')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    常用数据发送和接收:

    # 发送数据
    socket.send_json(data)                      # data 会被json序列化后进行传输 (json.dumps)
    socket.send_string(data, encoding="utf-8")  # data为unicode字符串,会进行编码成子节再传输
    socket.send_pyobj(obj)                      # obj为python对象,采用pickle进行序列化后传输
    socket.send_multipart(msg_parts)            # msg_parts, 发送多条消息组成的迭代器序列,每条消息是子节类型,
                                                # 如[b"message1", b"message2", b"message2"]
     
    # 接收数据
    socket.recv_json()
    socket.recv_string()
    socket.recv_pyobj()
    socket.recv_multipart()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2、Publisher-Subscriber(发布-订阅模式)

    Publisher-Subscriber模式,消息是单向流动的,发布者只能发布消息,不能接受消息;订阅者只能接受消息,不能发送消息(可参考 Redis 的发布和订阅方式)。服务端发布消息的过程中,如果有订阅者退出,不影响发布者继续发布消息,当订阅者再次连接上来,收到的消息是后来发布的消息。比较晚加入的订阅者,或者中途离开的订阅者,必然会丢掉一部分信息,如果发布者停止,所有的订阅者会阻塞,等发布者再次上线的时候回继续接受消息。

    “慢连接”: 我们不知道订阅者是何时开始接受消息的,就算启动"订阅者",再启动"发布者", “订阅者"还是会缺失一部分的消息,因为建立连接是需要时间的,虽然时间很短,但不是零。ZMQ在后台是进行异步的IO传输,在建立TCP连接的短时间段内,ZMQ就可以发送很多消息了。有种简单的方法来同步"发布者” 和"订阅者", 通过sleep让发布者延迟发布消息,等连接建立完成后再进行发送。
    publisher.py

    import zmq
    import time
    import random
     
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5555")
     
    if __name__ == '__main__':
        print("发布者启动.....")
        for i in range(1000):
            time.sleep(0.1)
            temperature = random.randint(-10, 40)
            message = f"我是publisher, 这是我发布给你们的第{i+1}个消息!今日温度{temperature}"
            socket.send_string(message)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    subscriber.py

    import zmq
     
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5555")
     
    # 客户端需要设定一个过滤,否则收不到任何信息
    socket.setsockopt_string(zmq.SUBSCRIBE, '')
     
    if __name__ == '__main__':
        print('订阅者一号启动....')
        while True:
            message = socket.recv_string()
            print(f"(subscriber1)接收到'发布者'发送的消息:{message}")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3、Push-Pull(平行管道模式/分布式处理)

    Ventilator:任务发布器会生成大量可以并行运算的任务。

    Worker:有一组worker会处理这些任务。

    Sink:结果接收器会在末端接收所有的Worker的处理结果,进行汇总。

    Worker上游和"任务发布器"相连,下游和"结果接收器"相连,“任务发布器” 和 "结果接收器"是这个网路结构中比较稳定的部分,由他们绑定至端点

    Worker只是连接两个端点,需要等Worker全部启动后,再进行任务分发。Socket的连接会消耗一定时间(慢连接), 如果不进行同步的话,第一个Worker启动,会一下子接收很多任务。

    “任务分发器” 会向Worker均匀的分发任务(负载均衡机制)

    “结果接收器” 会均匀地从Worker处收集消息(公平队列机制)

    三、ZeroMQ接口函数解释

    1. zmq_ctx_new : 创建一个新的ZMQ 环境上下文

    表达形式:void *zmq_ctx_new ();

    解释:zmq_ctx_new()函数创建一个新的ZMQ 环境上下文。

    返回值:如果zmq_ctx_new() 执行成功,则会返回一个新创建的context句柄。否则此函数将会返回NULL并设置errno为下列定义的值。

    2. zmq_socket :创建ZMQ套接字。

    表达形式:void *zmq_socket (void *context, int type);

    解释:函数zmq_socket()根据context参数创建一个ZMQ套接字(socket),并且以一个不透明指针的形式返回这新创建的socket。type参数指明了要创建的socket的类型,这个类型决定了在进行传输时在此socket上执行的语义。 新创建的socket初始值是未绑定的,并且未和任何终结点相联系。为了能够在一个socket上建立消息,必须先要使用zmq_connect(3)连接上一个终结点,或者至少有使用zmq_bind(3)函数绑定一个终结点来接收传入的消息。

    返回值:当zmq_socket()函数运行成功的时候会返回新创建的套接字(socket)句柄。否则,函数返回NULL。

    3. zmq_connect:由一个socket创建一个对外连接

    表达式:int zmq_connect (void *socket, const char *endpoint);

    解释:zmq_connect()函数将socket连接到节点endpoint上,然后开始接受在这个节点上的到来的连接请求。endpoint是一个字符串,包含一个协议://紧跟一个address地址。协议有下列的协议指定。address指定要进行绑定的用来传输的地址。

    返回值:执行成功时zmq_connect()会返回0。其它情况返回 -1, 并且设置errno为下列的值。

    4. zmq_bind - 绑定一个socket,接收发来的连接请求socket

    表现形式:int zmq_bind (void *socket, const char *endpoint);

    解释:zmq_bind() 函数绑定把一个socket绑定在一个本地的网络节点(endpoint)上,而后开始接收链接到本节点上的其它节点发送来的消息。

    节点是一个字符串,它包括一个协议://然后跟着一个address。协议为下列给定的协议中的一个。address指定了进行绑定的地址。

    5. 套接字API

    ​ZMQ提供了一套类似于BSD套接字的API,但将很多消息处理机制的细节隐藏了起来,你会逐渐适应这种变化,并乐于用它进行编程。

    ​套接字事实上是用于网络编程的标准接口,ZMQ之所那么吸引人眼球,原因之一就是它是建立在标准套接字API之上。因此,ZMQ的套接字操作非常容易理解,其生命周期主要包含四个部分:

    • 创建和销毁套接字:zmq_socket(), zmq_close()
    • 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
    • 为套接字建立连接:zmq_bind(), zmq_connect()
    • 发送和接收消息:zmq_send(), zmq_recv()
  • 相关阅读:
    新能源商用车软件开发设计规范
    全局异常处理器
    simplemde 下载问题
    Netty 进阶学习(十)-- 协议设计与解析
    RocketMQ 5.0 API 与 SDK 的演进
    Bellman-Ford算法与SPFA算法详解
    手机上比较好用的笔记软件使用哪一款?
    create-react-app创建Electron应用,打包
    PostMan测试接口-----上传文件、导出excel
    腾讯测试大鸟分享4个关于 Python 函数(方法)的冷知识
  • 原文地址:https://blog.csdn.net/qq_36741413/article/details/133124075