• ZMTP协议


    ZoreMQ Transport Protocol是一个传输层协议,用于ZMQ的连接的信息交互,本文档描述的是3.0协议,主要分析基于NULL Security Mechanism


    协议语法

    ZMTP由三部分组成,分别是 greeting、handshake、traffic

    部分描述构成
    greeting描述ZMQ版本、安全机制等signature + version + mechanism + as-server + filler
    handshake描述端类型,如 PUB/SUB,REQ/REP一个或多个command
    traffic命令或者消息command

    ZMTP Wireshark 抓包

    WireShark 默认不提供ZMTP解析插件,需要自己配置,步骤如下:

    插件仓库:https://github.com/whitequark/zmtp-wireshark

    下载插件:https://github.com/whitequark/zmtp-wireshark/blob/master/zmtp-dissector.lua

    将插件zmtp-dissector.lua放到WireShark安装目录,比如我的是:C:\Program Files\Wireshark

    修改C:\Program Files\Wireshark\init.lua,在文件末尾添加

    dofile(DATA_DIR.."zmtp-dissector.lua")
    
    • 1

    对基于TCP端口通讯ZMQ进行抓包,例如端口为7380,将该端口Decode As ZMTP

    8607ab1ed5f523a6bbc4491ec74b924f.png
    解析接如下

    1b1998d082deac0fe4c96b7aafb00f94.png

    greeting

    greeting 固定64个字节大小,下面将依次介绍每个部分。

    signature

    固定10字节大小,固定值为ff 00 00 00 00 00 00 00 01 7f;

    signature可以用来校验链接是否为ZMQ链接,连续读取10个字节,判断开头是否为0xff,结尾是否为0x7f

    version

    固定2字节大小,格式为{major_version, minor_version}3.0 协议则为03 00,实际编码过程中只会校验major_version;

    mechanism

    固定20字节大小,这里只介绍NULL Security Mechanism,也就是不校验,其值为NULL,剩余以内容填充0;

    as-server

    固定一个字节大小,0x00 或者 0x01 ,当mechanism为NULL时候,as-server必须为0

    filler

    填充greeting至64个字节。

    抓包示意

    由Wireshark解析过后的协议。

    6bd633daef76a532eb2eed806a8e6485.png

    Frame

    greeting之后的所有数据格式都为Frame,包含commandmessage

    frame的格式如下:

    Frame = Flag + Payload Length + Payload

    抓包示意如下
    在这里插入图片描述

    • Flag
      Flag 为1字节大小,每位代表不同的意思,参考抓包解释
      在这里插入图片描述
      低1位:表示是否有更多Frame,这里用于ZMQ中sendmore属性
      低2位:表示长度是否为8字节长度,否则为1字节长度
      低3位:表示当前frame是否为Command
      其他:保留,为0

    • Payload Length
      数据长度,可以为1字节或者8字节大小,根据Flag中的标志位决定

    • Payload
      实际的数据,大小为Payload Length

    handshake

    此阶段用来交换对端的READY命令以及metadata,主要包含对端的类型。handshake本质是Command,为Frame的一种。
    NULL Security Mechanism机制中,以PUB/SUB模式为例,handshake的数据如下:
    在这里插入图片描述
    Payload内容如下:

    [1 byte] Command size + [n bytes]Command Name + [1 bytes]Metadata Key size + [n bytes]Metadata Key + [4 bytes]Metadata Value size + [n bytes]Metadata Value

    使用Socket实现ZMQ SUB方法

    代码如下:

    //
    //  ZMTP 3.0 debugging subscriber
    #define WIN32_LEAN_AND_MEAN
    #include 
    #include 
    #include 
    
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    #pragma comment(lib, "ws2_32.lib")
    
    typedef struct
    {
        uint8_t flags;     //  Must be zero
        uint8_t size;      //  Size, 0 to 255 byte
        uint8_t data[255]; //  Message data
    } zmtp_msg_t;
    
    static void derp(char *s)
    {
        perror(s);
        exit(1);
    }
    
    static void tcp_send(int handle, void *buffer, size_t len)
    {
        if (send(handle, (char *) buffer, len, 0) == -1)
            derp((char *) "send");
    }
    
    static void tcp_recv(int handle, void *buffer, size_t len)
    {
        printf(" - reading %d bytes: ", (int) len);
        fflush(stdout);
        size_t len_recd = 0;
        while (len_recd < len)
        {
            size_t bytes = recv(handle, (char *) buffer + len_recd, len - len_recd, 0);
            if (bytes == 0)
                break; //  Peer has shutdown
            printf(" [%d]", (int) bytes);
            fflush(stdout);
            if (bytes == -1)
                derp((char *) "recv");
            len_recd += bytes;
        }
        printf("\n");
        fflush(stdout);
    }
    
    static void zmtp_recv(int handle, zmtp_msg_t *msg)
    {
        tcp_recv(handle, (uint8_t *) msg, 2);
        tcp_recv(handle, msg->data, msg->size);
    }
    
    static void zmtp_send(int handle, zmtp_msg_t *msg)
    {
        tcp_send(handle, (uint8_t *) msg, msg->size + 2);
    }
    
    //  This is the 3.0 greeting (64 bytes)
    typedef struct
    {
        uint8_t signature[10];
        uint8_t version[2];
        uint8_t mechanism[20];
        uint8_t as_server[1];
        uint8_t filler[31];
    } zmtp_greeting_t;
    
    int main(void)
    {
        puts("I: starting subscriber");
    
        WSADATA wsData;
        if (WSAStartup(MAKEWORD(2, 2), &wsData) != 0)
        {
            std::cerr << "无法初始化Winsock" << std::endl;
            return 1;
        }
    
        //  Create TCP socket
        int peer;
        if ((peer = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
            derp((char *) "socket");
    
        const char *serverIP   = "127.0.0.1";
        const int   serverPort = 5559;
    
        sockaddr_in serverAddress {};
        serverAddress.sin_family = AF_INET;
        serverAddress.sin_port   = htons(serverPort);
        if (inet_pton(AF_INET, serverIP, &(serverAddress.sin_addr)) <= 0)
        {
            std::cerr << "无效的服务器IP地址" << std::endl;
            closesocket(peer);
            WSACleanup();
            return 1;
        }
    
        //  Keep trying to connect until we succeed
        puts("I: waiting for connection");
        while (connect(peer, reinterpret_cast<sockaddr *>(&serverAddress), sizeof(serverAddress)) == -1)
            Sleep(1);
    
        puts("I: connected OK");
        //  This is our greeting (64 octets)
        zmtp_greeting_t outgoing = {
            {0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F},
            {3, 0},
            {'N', 'U', 'L', 'L', 0},
            {0},
            {0}
        };
        //  Do full backwards version detection following RFC23
        //  Send first ten bytes of greeting to peer
        tcp_send(peer, &outgoing, 10);
    
        //  Read first byte from peer
        zmtp_greeting_t incoming;
        tcp_recv(peer, &incoming, 1);
        uint8_t length = incoming.signature[0];
        if (length != 0xFF)
        {
            puts("E: signature not valid (1)");
            closesocket(peer);
            exit(0);
        }
        //  Looks like 2.0+, read 9 more bytes to be sure
        tcp_recv(peer, (uint8_t *) &incoming + 1, 9);
        if ((incoming.signature[9] & 1) != 1)
        {
            puts("E: signature not valid (2)");
            closesocket(peer);
            exit(0);
        }
        //  Exchange major version numbers
        puts("I: signature valid, exchanging major versions");
        tcp_send(peer, (uint8_t *) &outgoing + 10, 1);
        tcp_recv(peer, (uint8_t *) &incoming + 10, 1);
    
        if (incoming.version[0] >= 3)
        {
            //  If version >= 3, the peer is using ZMTP 3.0, so send
            //  rest of the greeting and continue with ZMTP 3.0.
            puts("I: peer is talking ZMTP 3.0");
            puts("I: sending rest of greeting...");
            tcp_send(peer, (uint8_t *) &outgoing + 11, 53);
            //  Get remainder of greeting from peer
            puts("I: waiting for greeting from peer...");
            tcp_recv(peer, (uint8_t *) &incoming + 11, 53);
            //  Do NULL handshake - send READY command
            //  For now, empty dictionary
            puts("I: have full greeting from peer");
            zmtp_msg_t  ready = {0x04, 0x19};
            std::string data;
            data.push_back(0x05);
            data.append("READY");
            data.push_back(0x0b);
            data.append("Socket-Type");
            int         netByteOrderSize = htonl(3);
            const char *valueBytes       = reinterpret_cast<const char *>(&netByteOrderSize);
            data.append(valueBytes, sizeof(netByteOrderSize));
            data.append("SUB");
    
            memcpy(ready.data, data.c_str(), data.size());
            puts("I: sending READY");
            zmtp_send(peer, &ready);
            //  Now wait for peer's READY command
            puts("I: expecting READY from peer");
            zmtp_recv(peer, &ready);
            //assert(memcmp(ready.data, "READY   ", 8) == 0);
            puts("I: OK! NULL security handshake completed");
    
            puts("I: send sub command");
            zmtp_msg_t subCmd {0x00, 0x01};
            subCmd.data[0] = 0x01;
            zmtp_send(peer, &subCmd);
        }
        else
        {
            puts("E: major version not valid");
            closesocket(peer);
            exit(0);
        }
        puts("I: READY, printing messages");
        while (true)
        {
            zmtp_msg_t msg;
            zmtp_recv(peer, &msg);
            msg.data[msg.size] = 0;
            puts((char *) msg.data);
        }
        closesocket(peer);
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
  • 相关阅读:
    2022年 - 年中总结
    springboot+影院售票小程序 毕业设计-附源码111154
    到底什么是Linux?快进来学习!
    架构设计方法(4A架构)-信息架构
    可扩展标记语言-----XML
    java架构知识-设计模式与实践(学习笔记)
    Nacos + Prometheus + Grafana 搭建走起~
    TypeScript系列之类型 null
    【C语言航路】第六站:指针初阶
    30岁了,还能转行做测试/开发程序员吗?下定决心开始干......
  • 原文地址:https://blog.csdn.net/xuancuo8078/article/details/132635129