• C语言代码封装MQTT协议报文,了解MQTT协议通信过程


    【1】MQTT协议介绍

    MQTT是一种轻量级的通信协议,适用于物联网(IoT)和低带宽网络环境。它基于一种“发布/订阅”模式,其中设备发送数据(也称为 “发布”)到经纪人(称为MQTT代理),这些数据被存储,并在需要时被转发给订阅者。这种方式简化了网络管理,允许多个设备在不同的网络条件下进行通信(包括延迟和带宽限制),并支持实时数据更新。它是开放的,可免费使用并易于实施。
    在这里插入图片描述

    【2】MQTT协议报文字段介绍

    MQTT协议报文由两部分组成:固定报头和可变报头。

    固定报头的格式是统一的,其中包括了报文类型和剩余长度两个字段。

    可变报头的格式取决于报文类型。

    下面是MQTT协议中各个报文类型的可变报头字段说明。

    (1)CONNECT:MQTT连接请求报文

    CONNECT报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节(即报文类型和标志位的组合)为0x10,表示这是一个CONNECT报文。

    可变报头包括了以下字段:

    • 协议名(Protocol Name):用于标识MQTT协议的名称,固定为字符串"MQTT";
    • 协议级别(Protocol Level):用于标识所使用的MQTT协议的版本号,一般情况下为4;
    • 连接标志(Connect Flags):用于设置各种连接选项,其中包括:
      • 用户名/密码(Username/Password):用于对连接进行身份验证;
      • 清理会话(Clean Session):表示客户端需要清除服务器上旧的Session信息;
      • 遗嘱标志(Will Flag):表示客户端是否需要在与服务器的连接意外断开时发送遗嘱信息;
      • 遗嘱QoS(Will QoS):用于设置遗嘱消息的服务质量等级;
      • 遗嘱保留(Will Retain):表示遗嘱消息是否需要被服务器保留;
      • 用户名标志(Username Flag):表示客户端是否需要发送用户名字段;
      • 密码标志(Password Flag):表示客户端是否需要发送密码字段。
    • 保持连接(Keep Alive):用于设置心跳包的发送间隔时间,以便客户端和服务器之间保持连接。

    (2)CONNACK:MQTT连接响应报文

    CONNACK报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0x20,表示这是一个CONNACK报文。

    可变报头包括了以下字段:

    • 连接应答(Connect Acknowledgment):用于表示连接是否成功,一般为0表示成功,其他值表示失败;
    • 保留标志(Reserved Flag):保留字段,必须为0。

    (3)PUBLISH:MQTT发布消息报文

    PUBLISH报文包括固定报头和可变报头两部分,以及消息体。其中,固定报头的第一个字节由报文类型和QoS级别组合而成,QoS级别可以为0、1或2。

    可变报头包括了以下字段:

    • 主题名(Topic Name):用于标识消息的主题;
    • 报文标识符(Packet Identifier):用于在QoS级别为1或2时确认消息分发的情况,如果为0则表示QoS级别为0。

    消息体包括了要发布的消息内容。

    (4)PUBACK:MQTT发布确认报文

    PUBACK报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0x40,表示这是一个PUBACK报文。

    可变报头仅包括一个报文标识符(Packet Identifier)字段,用于确认QoS级别为1的发布消息。

    (5)PUBREC:MQTT发布接收报文

    PUBREC报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0x50,表示这是一个PUBREC报文。

    可变报头仅包括一个报文标识符(Packet Identifier)字段,用于确认QoS级别为2的发布消息。

    (6)PUBREL:MQTT发布释放报文

    PUBREL报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0x62,表示这是一个PUBREL报文。

    可变报头仅包括一个报文标识符(Packet Identifier)字段,用于确认QoS级别为2的发布消息。

    (7)PUBCOMP:MQTT发布完成报文

    PUBCOMP报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0x70,表示这是一个PUBCOMP报文。

    可变报头仅包括一个报文标识符(Packet Identifier)字段,用于确认QoS级别为2的发布消息。

    (8)SUBSCRIBE:MQTT订阅请求报文

    SUBSCRIBE报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0x82,表示这是一个SUBSCRIBE报文。

    可变报头包括了以下字段:

    • 报文标识符(Packet Identifier):用于确认订阅请求的情况;
    • 订阅主题(Subscription Topic):用于设置订阅的主题;
    • 服务质量等级(QoS Level):用于设置订阅请求使用的服务质量等级,可以为0、1或2。

    (9)SUBACK:MQTT订阅确认报文

    SUBACK报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0x90,表示这是一个SUBACK报文。

    可变报头包括了以下字段:

    • 报文标识符(Packet Identifier):用于确认订阅请求的情况;
    • 订阅确认等级(Subscription Acknowledgment):用于确认订阅请求的服务质量等级,可以为0、1或2。

    (10)UNSUBSCRIBE:MQTT取消订阅报文

    UNSUBSCRIBE报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0xA2,表示这是一个UNSUBSCRIBE报文。

    可变报头包括了以下字段:

    • 报文标识符(Packet Identifier):用于确认取消订阅请求的情况;
    • 订阅主题(Subscription Topic):用于设置要取消订阅的主题。

    (11)UNSUBACK:MQTT取消订阅确认报文

    UNSUBACK报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0xB0,表示这是一个UNSUBACK报文。

    可变报头仅包含报文标识符(Packet Identifier)字段,用于确认取消订阅请求。

    (12)PINGREQ:MQTT心跳请求报文

    PINGREQ报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0xC0,表示这是一个PINGREQ报文。

    PINGREQ报文不包含可变报头字段。

    (13)PINGRESP:MQTT心跳响应报文

    PINGRESP报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0xD0,表示这是一个PINGRESP报文。

    PINGRESP报文不包含可变报头字段。

    (14)DISCONNECT:MQTT断开连接报文

    DISCONNECT报文包括固定报头和可变报头两部分。其中,固定报头的第一个字节为0xE0,表示这是一个DISCONNECT报文。

    DISCONNECT报文不包含可变报头字段。

    【3】封装MQTT协议

    这是一个使用C语言在Linux下建立TCP通信并发送MQTT报文的例子。 根据MQTT报文自己封装协议。

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    // 定义MQTT报文类型
    #define MQTT_CONNECT    0x10
    #define MQTT_CONNACK    0x20
    #define MQTT_PUBLISH    0x30
    #define MQTT_PUBACK     0x40
    #define MQTT_SUBSCRIBE  0x80
    #define MQTT_SUBACK     0x90
    #define MQTT_UNSUBSCRIBE    0xA0
    #define MQTT_UNSUBACK   0xB0
    #define MQTT_PINGREQ    0xC0
    #define MQTT_PINGRESP   0xD0
    #define MQTT_DISCONNECT    0xE0
    
    // 定义MQTT连接标志
    #define MQTT_CONNECT_FLAG_CLEAN     0x02
    #define MQTT_CONNECT_FLAG_WILL      0x04
    #define MQTT_CONNECT_FLAG_WILL_QOS0 0x00
    #define MQTT_CONNECT_FLAG_WILL_QOS1 0x08
    #define MQTT_CONNECT_FLAG_WILL_QOS2 0x10
    #define MQTT_CONNECT_FLAG_WILL_RETAIN   0x20
    #define MQTT_CONNECT_FLAG_PASSWORD  0x40
    #define MQTT_CONNECT_FLAG_USERNAME  0x80
    
    // 定义MQTT报文结构体
    typedef struct mqtt_packet 
    {
    	unsigned char *data;
    	unsigned int length;
    }
    mqtt_packet_t;
    
    // 建立socket连接并返回socket文件描述符
    int socket_connect(char *address, int port) 
    {
    	struct sockaddr_in server_address;
    	int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
    	if (socket_fd == -1) 
    	{
    		printf("Failed to create socket!\n");
    		return -1;
    	}
    	server_address.sin_family = AF_INET;
    	server_address.sin_port = htons(port);
    	if ((inet_pton(AF_INET, address, &server_address.sin_addr)) <= 0) 
    	{
    		printf("Invalid address/ Address not supported\n");
    		return -1;
    	}
    	if (connect(socket_fd, (struct sockaddr *)&server_address, sizeof(server_address)) < 0) 
    	{
    		printf("Connection Failed!\n");
    		return -1;
    	}
    	return socket_fd;
    }
    // 打包MQTT连接报文 
    mqtt_packet_t *mqtt_connect(char *client_id, char *username, char *password) 
    {
    	mqtt_packet_t *packet = (mqtt_packet_t *)malloc(sizeof(mqtt_packet_t));
    	unsigned char *data = (unsigned char *)malloc(256);
    	unsigned int length = 0;
    	// 固定报头 
    	data[length++] = MQTT_CONNECT;
    	// 可变报头 
    	data[length++] = 0x0C;
    	// 清理会话标志和协议版本号
    	data[length++] = 'M';
    	data[length++] = 'Q';
    	data[length++] = 'T';
    	data[length++] = 'T';
    	data[length++] = 0x04;
    	// 协议版本号 // 连接标志 
    	unsigned char flags = MQTT_CONNECT_FLAG_CLEAN;
    	if (username != NULL) 
    	{
    		flags |= MQTT_CONNECT_FLAG_USERNAME;
    	}
    	if (password != NULL) 
    	{
    		flags |= MQTT_CONNECT_FLAG_PASSWORD;
    	}
    	data[length++] = flags;
    	data[length++] = 0xFF;
    	// 保持连接时间低8位 
    	data[length++] = 0xFF;
    	// 保持连接时间高8位 // 剩余长度 
    	unsigned char remaining_length = length - 1;
    	data[remaining_length++] = (unsigned char)(length - 2);
    	packet->data = data;
    	packet->length = length;
    	return packet;
    }
    // 发送MQTT报文 
    void mqtt_send(int socket_fd, mqtt_packet_t *packet) 
    {
    	if (send(socket_fd, packet->data, packet->length, 0) < 0) 
    	{
    		printf("Failed to send message!\n");
    	}
    }
    // 接收MQTT报文
    int mqtt_recv(int socket_fd, mqtt_packet_t *packet) 
    {
    	unsigned char header[2];
    	if (recv(socket_fd, header, 2, 0) != 2) 
    	{
    		printf("Failed to receive message header!\n");
    		return -1
    	}
    	unsigned int remaining_length = 0;
    	unsigned int multiplier = 1;
    	int i = 1;
    	do 
    	{
    		if (recv(socket_fd, &header[i], 1, 0) != 1) 
    		{
    			printf("Failed to receive remaining_length byte %d!\n", i);
    			return -1;
    		}
    		remaining_length += (header[i] & 127) * multiplier;
    		multiplier *= 128;
    		i++;
    	}
    	while ((header[i - 1] & 128) != 0);
    	packet->length = remaining_length + i;
    	packet->data = (unsigned char *)malloc(packet->length);
    	memcpy(packet->data, header, 2);
    	if (recv(socket_fd, packet->data + 2, packet->length - 2, 0) != packet->length - 2) 
    	{
    		printf("Failed to receive full message!\n");
    		return -1;
    	}
    	return 0;
    }
    
    
    int main(int argc, char *argv[]) 
    {
    	// 建立 TCP 连接 
    	int socket_fd = socket_connect("test.mosquitto.org", 1883);
    	if (socket_fd == -1) 
    	{
    		printf("Failed to connect to MQTT server!\n");
    		return -1;
    	}
    	printf("Connected to MQTT server!\n");
    	// 打包并发送 MQTT 连接报文
    	mqtt_packet_t *connect_packet = mqtt_connect("test_client", NULL, NULL);
    	mqtt_send(socket_fd, connect_packet);
    	printf("Sent MQTT CONNECT packet!\n");
    	free(connect_packet->data);
    	free(connect_packet);
    	// 接收 MQTT CONNACK 报文
    	mqtt_packet_t *connack_packet = (mqtt_packet_t *)malloc(sizeof(mqtt_packet_t));
    	if (mqtt_recv(socket_fd, connack_packet) != 0) 
    	{
    		printf("Failed to receive MQTT CONNACK packet!\n");
    		return -1;
    	}
    	if (connack_packet->data[1] != 0x00) 
    	{
    		printf("MQTT server rejected connection!\n");
    		return -1;
    	}
    	printf("Received MQTT CONNACK packet!\n");
    	free(connack_packet->data);
    	free(connack_packet);
    	// 断开 TCP 连接 close(socket_fd); return 0; 
    }
    	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
  • 相关阅读:
    大一想学习数学建模不知道怎么入门
    怎样不依靠工资收入赚到一万元?
    rh358 005 dhcp dhcp6 打印机 ansible配置dhcp和打印机
    软件开发工具的技术要素
    ES系列、Elasticsearch Suggester API(自动补全)
    学习爬虫,这个是你必须要知道的,get和post请求的区别
    C++ 中的虚函数和多态性
    hive3.1核心源码思路
    Springboot 开发 Web Flux
    实习面试
  • 原文地址:https://blog.csdn.net/xiaolong1126626497/article/details/130848791