目录
mqttConnectedCb--------MQTT_Subscribe
mqttConnectedCb-----MQTT_Publish
首先先注意下这个结构体变量
- // 全局变量
- //============================================================================
- MQTT_Client mqttClient; // MQTT客户端_结构体【此变量非常重要】
他是8266之后将要使用的TCP连接,MQTT连接,MQTT收发报文等等所有参数集合,需要特别留意一下
- // MQTT客户端
- //-----------------------------------------------------------------------------
- typedef struct
- {
- struct espconn *pCon; // TCP连接结构体指针
- uint8_t security; // 安全类型
- uint8_t* host; // 服务端域名/地址
- uint32_t port; // 网络连接端口号
- ip_addr_t ip; // 32位IP地址
-
- mqtt_state_t mqtt_state; // MQTT状态
-
- mqtt_connect_info_t connect_info; // MQTT【CONNECT】报文的连接参数
-
- MqttCallback connectedCb; // MQTT连接成功_回调
- MqttCallback disconnectedCb; // MQTT断开连接_回调
- MqttCallback publishedCb; // MQTT发布成功_回调
- MqttCallback timeoutCb; // MQTT超时_回调
- MqttDataCallback dataCb; // MQTT接收数据_回调
-
- ETSTimer mqttTimer; // MQTT定时器
-
- uint32_t keepAliveTick; // MQTT客户端(ESP8266)心跳计数
- uint32_t reconnectTick; // 重连等待计时
- uint32_t sendTimeout; // 报文发送超时时间
-
- tConnState connState; // ESP8266运行状态
-
- QUEUE msgQueue; // 消息队列
-
- void* user_data; // 用户数据(预留给用户的指针)
-
- } MQTT_Client;
- void user_init(void)
- {
- uart_init(BIT_RATE_115200, BIT_RATE_115200); // 串口波特率设为115200
- os_delay_us(60000);
-
-
- //【技小新】添加
- //###########################################################################
- PIN_FUNC_SELECT(PERIPHS_IO_MUX_GPIO4_U, FUNC_GPIO4); // GPIO4输出高 #
- GPIO_OUTPUT_SET(GPIO_ID_PIN(4),1); // LED初始化 #
- //###########################################################################
-
-
- CFG_Load(); // 加载/更新系统参数【WIFI参数、MQTT参数】
-
-
- // 网络连接参数赋值:服务端域名【mqtt_test_jx.mqtt.iot.gz.baidubce.com】、网络连接端口【1883】、安全类型【0:NO_TLS】
- //-------------------------------------------------------------------------------------------------------------------
- MQTT_InitConnection(&mqttClient, sysCfg.mqtt_host, sysCfg.mqtt_port, sysCfg.security);
-
- // MQTT连接参数赋值:客户端标识符【..】、MQTT用户名【..】、MQTT密钥【..】、保持连接时长【120s】、清除会话【1:clean_session】
- //----------------------------------------------------------------------------------------------------------------------------
- MQTT_InitClient(&mqttClient, sysCfg.device_id, sysCfg.mqtt_user, sysCfg.mqtt_pass, sysCfg.mqtt_keepalive, 1);
-
- // 设置遗嘱参数(如果云端没有对应的遗嘱主题,则MQTT连接会被拒绝)
- //--------------------------------------------------------------
- // MQTT_InitLWT(&mqttClient, "Will", "ESP8266_offline", 0, 0);
-
-
- // 设置MQTT相关函数
- //--------------------------------------------------------------------------------------------------
- MQTT_OnConnected(&mqttClient, mqttConnectedCb); // 设置【MQTT成功连接】函数的另一种调用方式
- MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb); // 设置【MQTT成功断开】函数的另一种调用方式
- MQTT_OnPublished(&mqttClient, mqttPublishedCb); // 设置【MQTT成功发布】函数的另一种调用方式
- MQTT_OnData(&mqttClient, mqttDataCb); // 设置【接收MQTT数据】函数的另一种调用方式
-
-
- // 连接WIFI:SSID[..]、PASSWORD[..]、WIFI连接成功函数[wifiConnectCb]
- //--------------------------------------------------------------------------
- WIFI_Connect(sysCfg.sta_ssid, sysCfg.sta_pwd, wifiConnectCb);
-
-
- INFO("\r\nSystem started ...\r\n");
- }
在user_init函数中首先执行串口初始化,之后调用这个函数 CFG_Load();来加载/更新系统参数
- // 加载/更新系统参数【WIFI参数、MQTT参数】(由持有人标识决定)
- //===================================================================================================================================
- void ICACHE_FLASH_ATTR CFG_Load()
- {
- INFO("\r\nload ...\r\n");
-
- // 读Flash【0x7C】扇区,存放到【saveFlag】(读出之前的持有人标识)
- //----------------------------------------------------------------------------------------------
- spi_flash_read((CFG_LOCATION+3)*SPI_FLASH_SEC_SIZE,(uint32 *)&saveFlag, sizeof(SAVE_FLAG));
-
-
- //根据【参数扇区标志】,读取对应扇区的系统参数【0:系统参数在0x79扇区 !0:系统参数在0x7A扇区】
- //-------------------------------------------------------------------------------------------------------------------------
- if (saveFlag.flag == 0)
- {
- spi_flash_read((CFG_LOCATION+0)*SPI_FLASH_SEC_SIZE, (uint32 *)&sysCfg, sizeof(SYSCFG)); // 读出系统参数(1区:0x79)
- }
- else //saveFlag.flag != 0
- {
- spi_flash_read((CFG_LOCATION+1)*SPI_FLASH_SEC_SIZE, (uint32 *)&sysCfg, sizeof(SYSCFG)); // 读出系统参数(2区:0x7A)
- }
-
-
- // 只有在【持有人标识和之前不同】的情况下,才会更新系统参数(修改系统参数时,一定要记得修改持有人标识的值)
- //------------------------------------------------------------------------------------------------------------------------
- if(sysCfg.cfg_holder != CFG_HOLDER) // 持有人标识不同
- {
- os_memset(&sysCfg, 0x00, sizeof sysCfg); // 参数扇区=0
-
- sysCfg.cfg_holder = CFG_HOLDER; // 更新持有人标识
-
- os_sprintf(sysCfg.device_id, MQTT_CLIENT_ID, system_get_chip_id()); // 【MQTT_CLIENT_ID】MQTT客户端标识符
- sysCfg.device_id[sizeof(sysCfg.device_id) - 1] = '\0'; // 最后添'\0'(防止字符串填满数组,指针溢出)
-
- os_strncpy(sysCfg.sta_ssid, STA_SSID, sizeof(sysCfg.sta_ssid)-1); // 【STA_SSID】WIFI名称
- os_strncpy(sysCfg.sta_pwd, STA_PASS, sizeof(sysCfg.sta_pwd)-1); // 【STA_PASS】WIFI密码
- sysCfg.sta_type = STA_TYPE; // 【STA_TYPE】WIFI类型
-
- os_strncpy(sysCfg.mqtt_host, MQTT_HOST, sizeof(sysCfg.mqtt_host)-1); // 【MQTT_HOST】MQTT服务端域名/IP地址
- sysCfg.mqtt_port = MQTT_PORT; // 【MQTT_PORT】网络连接端口号
- os_strncpy(sysCfg.mqtt_user, MQTT_USER, sizeof(sysCfg.mqtt_user)-1); // 【MQTT_USER】MQTT用户名
- os_strncpy(sysCfg.mqtt_pass, MQTT_PASS, sizeof(sysCfg.mqtt_pass)-1); // 【MQTT_PASS】MQTT密码
-
- sysCfg.security = DEFAULT_SECURITY; /* default non ssl */ // 【DEFAULT_SECURITY】默认安全等级(默认=0,不加密)
-
- sysCfg.mqtt_keepalive = MQTT_KEEPALIVE; // 【MQTT_KEEPALIVE】保持连接时长(宏定义==120)
-
- INFO(" default configuration\r\n");
-
- CFG_Save(); // 将更新后的系统参数烧录到Flash中
- }
- }
这也就是为什么我们之前在设置MQTT连接WIFI相关参数的时候,需要把持有人标志更新一下
- // 只有在【持有人标识和之前不同】的情况下,才会更新系统参数(修改系统参数时,一定要记得修改持有人标识的值)
- //------------------------------------------------------------------------------------------------------------------------
- if(sysCfg.cfg_holder != CFG_HOLDER) // 持有人标识不同
#define CFG_HOLDER 0x66666663 // 持有人标识(只有更新此数值,系统参数才会更新) /* Change this value to load default configurations */
如果觉得麻烦的话,可以把if(sysCfg.cfg_holder != CFG_HOLDER) 这句话注释掉,那么8266每次上电执行的时候,都会将参数更新到flash中
接下来调用MQTT_InitConnection这个函数来将网络连接参数赋值
- // 网络连接参数赋值:服务端域名【mqtt_test_jx.mqtt.iot.gz.baidubce.com】、网络连接端口【1883】、安全类型【0:NO_TLS】
- //====================================================================================================================
- void ICACHE_FLASH_ATTR MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32_t port, uint8_t security)
- {
- uint32_t temp;
-
- INFO("MQTT_InitConnection\r\n");
-
- os_memset(mqttClient, 0, sizeof(MQTT_Client)); // 【MQTT客户端】结构体 = 0
-
- temp = os_strlen(host); // 服务端域名/IP的字符串长度
- mqttClient->host = (uint8_t*)os_zalloc(temp+1); // 申请空间,存放服务端域名/IP地址字符串
-
- os_strcpy(mqttClient->host, host); // 字符串拷贝
- mqttClient->host[temp] = 0; // 最后'\0'
-
- mqttClient->port = port; // 网络端口号 = 1883
-
- mqttClient->security = security; // 安全类型 = 0 = NO_TLS
- }
之后调用MQTT_InitClient这个函数来将MQTT连接参数赋值
- // MQTT连接参数赋值:客户端标识符【..】、MQTT用户名【..】、MQTT密钥【..】、保持连接时长【120s】、清除会话【1:clean_session】
- //======================================================================================================================================================
- void ICACHE_FLASH_ATTR
- MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession)
- {
- uint32_t temp;
-
- INFO("MQTT_InitClient\r\n");
-
- // MQTT【CONNECT】报文的连接参数 赋值
- //---------------------------------------------------------------------------------------------------------------
- os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t)); // MQTT【CONNECT】报文的连接参数 = 0
-
- temp = os_strlen(client_id);
- mqttClient->connect_info.client_id = (uint8_t*)os_zalloc(temp + 1); // 申请【客户端标识符】的存放内存
- os_strcpy(mqttClient->connect_info.client_id, client_id); // 赋值【客户端标识符】
- mqttClient->connect_info.client_id[temp] = 0; // 最后'\0'
-
- if (client_user) // 判断是否有【MQTT用户名】
- {
- temp = os_strlen(client_user);
- mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1);
- os_strcpy(mqttClient->connect_info.username, client_user); // 赋值【MQTT用户名】
- mqttClient->connect_info.username[temp] = 0;
- }
-
- if (client_pass) // 判断是否有【MQTT密码】
- {
- temp = os_strlen(client_pass);
- mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1);
- os_strcpy(mqttClient->connect_info.password, client_pass); // 赋值【MQTT密码】
- mqttClient->connect_info.password[temp] = 0;
- }
-
- mqttClient->connect_info.keepalive = keepAliveTime; // 保持连接 = 120s
- mqttClient->connect_info.clean_session = cleanSession; // 清除会话 = 1 = clean_session
- //--------------------------------------------------------------------------------------------------------------
-
- // 设置mqtt_state部分参数
- //------------------------------------------------------------------------------------------------------------------------------------------------------------
- mqttClient->mqtt_state.in_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE); // 申请in_buffer内存【入站报文缓存区】
- mqttClient->mqtt_state.in_buffer_length = MQTT_BUF_SIZE; // 设置in_buffer大小
- mqttClient->mqtt_state.out_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE); // 申请out_buffer内存【出站报文缓存区】
- mqttClient->mqtt_state.out_buffer_length = MQTT_BUF_SIZE; // 设置out_buffer大小
- mqttClient->mqtt_state.connect_info = &(mqttClient->connect_info); // MQTT【CONNECT】报文的连接参数(指针),赋值给mqttClient->mqtt_state.connect_info
-
-
- // 初始化MQTT出站报文缓存区
- //----------------------------------------------------------------------------------------------------------------------------------
- mqtt_msg_init(&mqttClient->mqtt_state.mqtt_connection, mqttClient->mqtt_state.out_buffer, mqttClient->mqtt_state.out_buffer_length);
-
- QUEUE_Init(&mqttClient->msgQueue, QUEUE_BUFFER_SIZE); // 消息队列初始化【队列可以存放一个/多个MQTT报文】
-
-
- // 创建任务:任务函数【MQTT_Task】、优先级【2】、任务指针【mqtt_procTaskQueue】、消息深度【1】
- //---------------------------------------------------------------------------------------------
- system_os_task(MQTT_Task, MQTT_TASK_PRIO, mqtt_procTaskQueue, MQTT_TASK_QUEUE_SIZE);
-
- // 安排任务:参数1=任务等级 / 参数2=消息类型 / 参数3=消息参数
- //-----------------------------------------------------------------------------------------------
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient); // 参数3的类型必须为【os_param_t】型
- }
创建了一个任务,MQTT例程就是基于任务来处理不同状态下所要做的操作
- // 创建任务:任务函数【MQTT_Task】、优先级【2】、任务指针【mqtt_procTaskQueue】、消息深度【1】
- //---------------------------------------------------------------------------------------------
- system_os_task(MQTT_Task, MQTT_TASK_PRIO, mqtt_procTaskQueue, MQTT_TASK_QUEUE_SIZE);
设置遗嘱参数(如果云端没有对应的遗嘱主题,则MQTT连接会被拒绝)-------这里不用遗嘱
- // 设置遗嘱:遗嘱主题【...】、遗嘱消息【...】、遗嘱质量【Will_Qos=0】、遗嘱保持【Will_Retain=0】
- //====================================================================================================================
- void ICACHE_FLASH_ATTR
- MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain)
- {
- uint32_t temp;
- temp = os_strlen(will_topic);
- mqttClient->connect_info.will_topic = (uint8_t*)os_zalloc(temp + 1); // 申请【遗嘱主题】的存放内存
- os_strcpy(mqttClient->connect_info.will_topic, will_topic); // 赋值【遗嘱主题】
- mqttClient->connect_info.will_topic[temp] = 0; // 最后'\0'
-
- temp = os_strlen(will_msg);
- mqttClient->connect_info.will_message = (uint8_t*)os_zalloc(temp + 1);
- os_strcpy(mqttClient->connect_info.will_message, will_msg); // 赋值【遗嘱消息】
- mqttClient->connect_info.will_message[temp] = 0;
-
- mqttClient->connect_info.will_qos = will_qos; // 遗嘱质量【Will_Qos=0】
- mqttClient->connect_info.will_retain = will_retain; // 遗嘱保持【Will_Retain=0】
- }
MQTT_OnConnected(&mqttClient, mqttConnectedCb); // 设置【MQTT成功连接】函数的另一种调用方式
- // 函数调用重定义
- //………………………………………………………………………………………………………………………………………
- // 执行 mqttClient->connectedCb(...) => mqttConnectedCb(...)
- //------------------------------------------------------------------------------------------
- void ICACHE_FLASH_ATTR MQTT_OnConnected(MQTT_Client*mqttClient, MqttCallback connectedCb)
- {
- mqttClient->connectedCb = connectedCb; // 函数名【mqttConnectedCb】
- }
执行 mqttClient->connectedCb(...) => mqttConnectedCb(...)这条语句就相当于调用了mqttConnectedCb这个函数
- // MQTT已成功连接:ESP8266发送【CONNECT】,并接收到【CONNACK】
- //============================================================================
- void mqttConnectedCb(uint32_t *args)
- {
- MQTT_Client* client = (MQTT_Client*)args; // 获取mqttClient指针
-
- INFO("MQTT: Connected\r\n");
-
- // 【参数2:主题过滤器 / 参数3:订阅Qos】
- //-----------------------------------------------------------------
- MQTT_Subscribe(client, "SW_LED", 0); // 订阅主题"SW_LED",QoS=0
- // MQTT_Subscribe(client, "SW_LED", 1);
- // MQTT_Subscribe(client, "SW_LED", 2);
-
- // 【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain】
- //-----------------------------------------------------------------------------------------------------------------------------------------
- MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 0, 0); // 向主题"SW_LED"发布"ESP8266_Online",Qos=0、retain=0
- // MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 1, 0);
- // MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 2, 0);
- }
设置WiFi连接的参数
- // 连接WIFI:SSID【...】、PASSWORD【...】、WIFI连接成功回调函数【wifiConnectCb】
- //====================================================================================================================
- void ICACHE_FLASH_ATTR WIFI_Connect(uint8_t* ssid, uint8_t* pass, WifiCallback cb)
- {
- struct station_config stationConf;
-
- INFO("WIFI_INIT\r\n");
-
- wifi_set_opmode_current(STATION_MODE); // 设置ESP8266为STA模式
-
- //wifi_station_set_auto_connect(FALSE); // 上电不自动连接已记录的AP(已注释,即:上电自动连接已记录的AP(默认))
-
-
- //--------------------------------------------------------------------------------------
- wifiCb = cb; // 函数名赋值:wifiCb可以作为函数名使用,wifiCb(..) == wifiConnectCb(..)
-
-
- // 设置STA参数
- //--------------------------------------------------------------------------
- os_memset(&stationConf, 0, sizeof(struct station_config)); // STA信息 = 0
- os_sprintf(stationConf.ssid, "%s", ssid); // SSID赋值
- os_sprintf(stationConf.password, "%s", pass); // 密码赋值
- wifi_station_set_config_current(&stationConf); // 设置STA参数
-
-
- // 设置WiFiLinker定时器
- //-------------------------------------------------------------------------------------------------------
- os_timer_disarm(&WiFiLinker); // 定时器:WIFI连接
- os_timer_setfn(&WiFiLinker, (os_timer_func_t *)wifi_check_ip, NULL); // wifi_check_ip:检查IP获取情况
- os_timer_arm(&WiFiLinker, 1000, 0); // 1秒定时(1次)
-
- //wifi_station_set_auto_connect(TRUE); // 上电自动连接已记录AP
-
-
- wifi_station_connect(); // ESP8266接入WIFI
- }
在这里他定时检查ip地址的获取情况,如果没有成功获取到ip地址,他会再次启用定时器,如果WIFI状态改变,则调用[wifiConnectCb]函数
- // 定时函数:检查IP获取情况
- //==============================================================================
- static void ICACHE_FLASH_ATTR wifi_check_ip(void *arg)
- {
- struct ip_info ipConfig;
-
- os_timer_disarm(&WiFiLinker); // 关闭WiFiLinker定时器
-
- wifi_get_ip_info(STATION_IF, &ipConfig); // 获取IP地址
-
- wifiStatus = wifi_station_get_connect_status(); // 获取接入状态
-
-
- // 获取到IP地址
- //-------------------------------------------------------------------------
- if (wifiStatus == STATION_GOT_IP && ipConfig.ip.addr != 0)
- {
- // 获取IP后,每2秒检查一次WIFI连接的正确性【防止WIFI掉线等情况】
- //------------------------------------------------------------------
- os_timer_setfn(&WiFiLinker, (os_timer_func_t *)wifi_check_ip, NULL);
- os_timer_arm(&WiFiLinker, 2000, 0);
- }
- //-------------------------------------------------------------------------
-
-
- // 未获取到IP地址
- //-------------------------------------------------------------------------
- else
- {
- if(wifi_station_get_connect_status() == STATION_WRONG_PASSWORD)
- {
- INFO("STATION_WRONG_PASSWORD\r\n"); // 密码错误
-
- wifi_station_connect();
- }
- else if(wifi_station_get_connect_status() == STATION_NO_AP_FOUND)
- {
- INFO("STATION_NO_AP_FOUND\r\n"); // 未发现对应AP
-
- wifi_station_connect();
- }
- else if(wifi_station_get_connect_status() == STATION_CONNECT_FAIL)
- {
- INFO("STATION_CONNECT_FAIL\r\n"); // 连接失败
-
- wifi_station_connect();
- }
- else
- {
- INFO("STATION_IDLE\r\n");
- }
-
- // 再次开启定时器
- //------------------------------------------------------------------
- os_timer_setfn(&WiFiLinker, (os_timer_func_t *)wifi_check_ip, NULL);
- os_timer_arm(&WiFiLinker, 500, 0); // 500Ms定时
- }
- //-------------------------------------------------------------------------
-
- // 如果WIFI状态改变,则调用[wifiConnectCb]函数
- //-------------------------------------------------------------------------
- if(wifiStatus != lastWifiStatus)
- {
- lastWifiStatus = wifiStatus; // WIFI状态更新
-
- if(wifiCb) // 判断是否设置了[wifiConnectCb]函数
- wifiCb(wifiStatus); // wifiCb(wifiStatus)=wifiConnectCb(wifiStatus)
- }
- }
在这里先判断是否获取到ip地址,如果成功获取的话,进行SNTP初始化,并设置SNTP定时器
- // WIFI连接状态改变:参数 = wifiStatus
- //============================================================================
- void wifiConnectCb(uint8_t status)
- {
- // 成功获取到IP地址
- //---------------------------------------------------------------------
- if(status == STATION_GOT_IP)
- {
- ip_addr_t * addr = (ip_addr_t *)os_zalloc(sizeof(ip_addr_t));
-
- // 在官方例程的基础上,增加2个备用服务器
- //---------------------------------------------------------------
- sntp_setservername(0, "us.pool.ntp.org"); // 服务器_0【域名】
- sntp_setservername(1, "ntp.sjtu.edu.cn"); // 服务器_1【域名】
-
- ipaddr_aton("210.72.145.44", addr); // 点分十进制 => 32位二进制
- sntp_setserver(2, addr); // 服务器_2【IP地址】
- os_free(addr); // 释放addr
-
- sntp_init(); // SNTP初始化
-
-
- // 设置SNTP定时器[sntp_timer]
- //-----------------------------------------------------------
- os_timer_disarm(&sntp_timer);
- os_timer_setfn(&sntp_timer, (os_timer_func_t *)sntpfn, NULL);
- os_timer_arm(&sntp_timer, 1000, 1); // 1s定时
- }
-
- // IP地址获取失败
- //----------------------------------------------------------------
- else
- {
- MQTT_Disconnect(&mqttClient); // WIFI连接出错,TCP断开连接
- }
- }
在SNTP定时函数中判断是否成功获取到网络时间,如果成功获取的话进入MQTT_Connect这个函数,进行MQTT连接准备
- // SNTP定时函数:获取当前网络时间
- //============================================================================
- void sntpfn()
- {
- u32_t ts = 0;
-
- ts = sntp_get_current_timestamp(); // 获取当前的偏移时间
-
- os_printf("current time : %s\n", sntp_get_real_time(ts)); // 获取真实时间
-
- if (ts == 0) // 网络时间获取失败
- {
- os_printf("did not get a valid time from sntp server\n");
- }
- else //(ts != 0) // 网络时间获取成功
- {
- os_timer_disarm(&sntp_timer); // 关闭SNTP定时器
-
- MQTT_Connect(&mqttClient); // 开始MQTT连接
- }
- }
在这里进行tcp连接设置,并定义了tcp连接成功的回调函数,定义了tcp异常中断的回调函数,设置1s重复定时,之后调用UTILS_StrToIP这个函数来解析点分十进制形式的ip地址,如果解析失败的话,他将进行域名解析mqtt_dns_found
- // WIFI连接、SNTP成功后 => MQTT连接准备(设置TCP连接、解析域名)
- //============================================================================================================================================
- void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient)
- {
- //espconn_secure_set_size(0x01,6*1024); // SSL双向认证时才需使用 // try to modify memory size 6*1024 if ssl/tls handshake failed
-
- // 开始MQTT连接前,判断是否存在MQTT的TCP连接。如果有,则清除之前的TCP连接
- //------------------------------------------------------------------------------------
- if (mqttClient->pCon)
- {
- // Clean up the old connection forcefully - using MQTT_Disconnect
- // does not actually release the old connection until the
- // disconnection callback is invoked.
-
- mqtt_tcpclient_delete(mqttClient); // 删除TCP连接、释放pCon内存、清除TCP连接指针
- }
-
- // TCP连接设置
- //------------------------------------------------------------------------------------------------------
- mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn)); // 申请pCon内存
- mqttClient->pCon->type = ESPCONN_TCP; // 设为TCP连接
- mqttClient->pCon->state = ESPCONN_NONE;
- mqttClient->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp)); // 申请esp_tcp内存
- mqttClient->pCon->proto.tcp->local_port = espconn_port(); // 获取ESP8266可用端口
- mqttClient->pCon->proto.tcp->remote_port = mqttClient->port; // 设置端口号
- mqttClient->pCon->reverse = mqttClient; // mqttClient->pCon->reverse 缓存 mqttClient指针
-
- espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb); // 注册TCP连接成功的回调函数
- espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb); // 注册TCP异常中断的回调函数
-
-
- //---------------------------------------------------------------------------------------------------
- mqttClient->keepAliveTick = 0; // MQTT客户端(ESP8266)心跳计数
- mqttClient->reconnectTick = 0; // 重连等待计时:当进入重连请求状态后,需等待5秒,之后进行重新连接
-
- // 设置MQTT定时(1秒)【功能:心跳计时、重连计时、TCP发送计时】
- //---------------------------------------------------------------------------------------------------
- os_timer_disarm(&mqttClient->mqttTimer);
- os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient); // mqtt_timer
- os_timer_arm(&mqttClient->mqttTimer, 1000, 1); // 1秒定时(重复)
-
-
- // 打印SSL配置:安全类型[NO_TLS == 0]
- //--------------------------------------------------------------------------------------------------------------------------------------------------------------
- os_printf("your ESP SSL/TLS configuration is %d.[0:NO_TLS\t1:TLS_WITHOUT_AUTHENTICATION\t2ONE_WAY_ANTHENTICATION\t3TWO_WAY_ANTHENTICATION]\n",DEFAULT_SECURITY);
-
-
- // 解析点分十进制形式的IP地址
- //------------------------------------------------------------------------------------------------------------------
- if (UTILS_StrToIP(mqttClient->host, &mqttClient->pCon->proto.tcp->remote_ip)) // 解析IP地址(点分十进制字符串形式)
- {
- INFO("TCP: Connect to ip %s:%d\r\n", mqttClient->host, mqttClient->port); // 打印IP地址
-
- // 根据安全类型,调用不同的TCP连接方式
- //-------------------------------------------------------------------------------------------------
- if (mqttClient->security) // 安全类型 != 0
- {
- #ifdef MQTT_SSL_ENABLE
-
- if(DEFAULT_SECURITY >= ONE_WAY_ANTHENTICATION ) // 单向认证【ONE_WAY_ANTHENTICATION = 2】
- {
- espconn_secure_ca_enable(ESPCONN_CLIENT,CA_CERT_FLASH_ADDRESS);
- }
-
- if(DEFAULT_SECURITY >= TWO_WAY_ANTHENTICATION) // 双向认证【TWO_WAY_ANTHENTICATION = 3】
- {
- espconn_secure_cert_req_enable(ESPCONN_CLIENT,CLIENT_CERT_FLASH_ADDRESS);
- }
-
- espconn_secure_connect(mqttClient->pCon); // 不认证【TLS_WITHOUT_AUTHENTICATION = 1】
- #else
- INFO("TCP: Do not support SSL\r\n");
- #endif
- }
-
- else // 安全类型 = 0 = NO_TLS
- {
- espconn_connect(mqttClient->pCon); // TCP连接(作为Client连接Server)
- }
- }
-
- // 解析域名
- //----------------------------------------------------------------------------------------------
- else
- {
- INFO("TCP: Connect to domain %s:%d\r\n", mqttClient->host, mqttClient->port);
-
- espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found);
- }
-
- mqttClient->connState = TCP_CONNECTING; // TCP正在连接
- }
- // 将字符串的IP地址解析为数组IP
- //============================================================================
- uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const int8_t* str, void *ip)
- {
- /* The count of the number of bytes processed. */
- int i;
-
- /* A pointer to the next digit to process. */
- const char * start;
-
- start = str; // 字符串指针赋值
-
- // IPV4
- //---------------------------------------------------
- for (i = 0; i < 4; i++) // 一共四个十进制字符串
- {
- /* The digit being processed. */
- char c;
- /* The value of this byte. */
- int n = 0;
-
- while (1)
- {
- c = * start; // 字符串某字符赋值
-
- start++; // 从前往后(从左向右)
-
- if (c >= '0' && c <= '9') // 在“0~9”范围内
- {
- n *= 10; // 权重
- n += c - '0'; // 将'0'~'9'字符转换为对应的数字
- }
-
- /* We insist on stopping at "." if we are still parsing
- the first, second, or third numbers. If we have reached
- the end of the numbers, we will allow any character. */
- else if ((i < 3 && c == '.') || i == 3)
- {
- break; // 遇到'.'则解析下一十进制字符串
- }
-
- else
- {
- return 0; // 解析失败
- }
- }
-
- if (n >= 256) // n过大,解析失败
- {
- return 0;
- }
-
- ((uint8_t*)ip)[i] = n; // 赋值给IP数组
- }
-
- return 1; // 解析成功,返回1
- }
他根据安全类型的不同来选择不同的tcp连接方式,这里暂时不加密,所以调用espconn_connect这个函数来将esp8266作为tcp_client连接到tcp_server
- // 域名解析成功_回调
- //=============================================================================================
- LOCAL void ICACHE_FLASH_ATTR mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
- {
- struct espconn *pConn = (struct espconn *)arg; // 获取TCP连接指针
-
- MQTT_Client* client = (MQTT_Client *)pConn->reverse; // 获取mqttClient指针
-
- if (ipaddr == NULL) // 域名解析失败
- {
- INFO("DNS: Found, but got no ip, try to reconnect\r\n");
-
- client->connState = TCP_RECONNECT_REQ; // TCP重连请求(等待5秒)
-
- return;
- }
-
- INFO("DNS: found ip %d.%d.%d.%d\n", // 打印域名对应的IP地址
- *((uint8 *) &ipaddr->addr),
- *((uint8 *) &ipaddr->addr + 1),
- *((uint8 *) &ipaddr->addr + 2),
- *((uint8 *) &ipaddr->addr + 3));
-
- // 判断IP地址是否正确(?=0)
- //----------------------------------------------------------------------------------------
- if (client->ip.addr == 0 && ipaddr->addr != 0) // 未保存IP地址:mqttClient->ip.addr == 0
- {
- os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4); // IP赋值
-
- // 根据安全类型,调用不同的TCP连接方式
- //-------------------------------------------------------------------------------------------------
- if (client->security) // 安全类型 != 0
- {
- #ifdef MQTT_SSL_ENABLE
- if(DEFAULT_SECURITY >= ONE_WAY_ANTHENTICATION ) // 单向认证【ONE_WAY_ANTHENTICATION = 2】
- {
- espconn_secure_ca_enable(ESPCONN_CLIENT,CA_CERT_FLASH_ADDRESS);
- }
-
- if(DEFAULT_SECURITY >= TWO_WAY_ANTHENTICATION) // 双向认证【TWO_WAY_ANTHENTICATION = 3】
- {
- espconn_secure_cert_req_enable(ESPCONN_CLIENT,CLIENT_CERT_FLASH_ADDRESS);
- }
-
- espconn_secure_connect(client->pCon); // 不认证【TLS_WITHOUT_AUTHENTICATION = 1】
- #else
- INFO("TCP: Do not support SSL\r\n");
- #endif
- }
-
- else // 安全类型 = 0 = NO_TLS
- {
- espconn_connect(client->pCon); // TCP连接(作为Client连接Server)
- }
-
- client->connState = TCP_CONNECTING; // TCP正在连接
-
- INFO("TCP: connecting...\r\n");
- }
-
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务MQTT_Task
- }
当tcp连接成功建立后8266将作为MQTT客户端与MQTT服务端建立MQTT连接,那么8266应该向MQTT服务端发送【CONNECT】控制报文,那么他首先要配置CONNECT控制报文
- // TCP连接成功的回调函数
- //============================================================================================================================================
- void ICACHE_FLASH_ATTR mqtt_tcpclient_connect_cb(void *arg)
- {
- struct espconn *pCon = (struct espconn *)arg; // 获取TCP连接指针
- MQTT_Client* client = (MQTT_Client *)(pCon->reverse);// 获取mqttClient指针
-
- // 注册回调函数
- //--------------------------------------------------------------------------------------
- espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb); // TCP断开成功_回调
- espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv); // TCP接收成功_回调
- espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb); // TCP发送成功_回调
-
- INFO("MQTT: Connected to broker %s:%d\r\n", client->host, client->port);
-
- // 【CONNECT】报文发送准备
- //……………………………………………………………………………………………………………………………………………………………………………………
- // 初始化MQTT报文缓存区
- //-----------------------------------------------------------------------------------------------------------------------
- mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);
-
- // 配置【CONNECT】控制报文,并获取【CONNECT】报文[指针]、[长度]
- //----------------------------------------------------------------------------------------------------------------------------
- client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);
-
- // 获取待发送的报文类型(此处是【CONNECT】报文)
- //----------------------------------------------------------------------------------------------
- client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
-
- // 获取待发送报文中的【报文标识符】(【CONNECT】报文中没有)
- //-------------------------------------------------------------------------------------------------------------------------------------
- client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,client->mqtt_state.outbound_message->length);
-
- // TCP发送成功/报文发送5秒计时结束 => 报文发送结束(sendTimeout=0)
- //-------------------------------------------------------------------------
- client->sendTimeout = MQTT_SEND_TIMOUT; // 发送MQTT报文时,sendTimeout=5
-
- INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
- //……………………………………………………………………………………………………………………………………………………………………………………
-
- // TCP:发送【CONNECT】报文
- //-----------------------------------------------------------------------------------------------------------------------------
- if (client->security) // 安全类型 != 0
- {
- #ifdef MQTT_SSL_ENABLE
- espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
- #else
- INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else // 安全类型 = 0 = NO_TLS
- {
- // TCP发送:数据=[client->mqtt_state.outbound_message->data]、长度=[client->mqtt_state.outbound_message->length]
- //-----------------------------------------------------------------------------------------------------------------
- espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
- }
- //-----------------------------------------------------------------------------------------------------------------------------
-
- client->mqtt_state.outbound_message = NULL; // 报文发送完后,清除出站报文指针
-
- client->connState = MQTT_CONNECT_SENDING; // 状态设为:MQTT【CONNECT】报文发送中【MQTT_CONNECT_SENDING】
-
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务MQTT_Task
-
- }
我么来看下他是如何配置的
这个函数将connect控制报文的固定报头部分,可变报头部分,有效载荷部分,在空间上依次写入出站报文缓存区,可以参考mqtt协议3.1
首先为connect控制报文的固定报头预留了3个字节,为什么要预留3个字节呢?因为剩余长度最多占两个字节,虽然MQTT协议规定剩余长度最多是4个字节,但是因为tcp发送数据报的限制,剩余长度他不会大于两个字节
接下来是赋值可变报头部分,依次赋值协议名,协议级别。接下来是连接标志字节,暂时设置为0,之后再按位赋值
然后设置保持连接时长
设置清除会话标志接下来设置有效载荷部分的设备id字段,这个设备id是一个字符串,那么字符串前就需要添加两个字节的前缀。调用append_string这个函数,来将将[参数字符串]字段添加到报文缓存区,并且在字符串前添加了两个字节的前缀来表示这个字符串的长度,之后返回[参数字符串]在MQTT控制报文中所占长度。
之后判断遗嘱主题是否存在,如果遗嘱主题存在的话,依次将遗嘱主题遗嘱消息字符串添加到出站报文缓存区,并且将连接标志字节中的主题标志[Will Flag]、主题质量[Will QoS]、主题保留[Will Retain],这些标志位依次赋值
之后判断是否有用户名字段,如果有用户名字段的话,将用户名字段添加前缀写入出站报文缓存区,并且将连接标志字节中的[username]用户名标志位置一
判断是否存在用户名密码字段[password]是否存在
最后调用fini_message设置【CONNECT】控制报文的固定报头
- // 配置【CONNECT】控制报文
- // mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info)
- //================================================================================================================================
- mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info)
- {
- struct mqtt_connect_variable_header* variable_header; // 【CONNECT】报文的【可变报头】指针
-
- init_message(connection); // 设置报文长度 = 3(暂时设为【固定报头】长度(3),之后添加【可变报头】、【有效载荷】)
-
- // 判断消息长度是否超过缓存区长度 // 【注:[message.length]是指TCP传输的整个MQTT报文长度】
- //------------------------------------------------------------------------------------------------------------
- if(connection->message.length + sizeof(*variable_header) > connection->buffer_length) // 判断MQTT报文长度
- return fail_message(connection);
-
- // 跳过了对【固定报头】的赋值,只为【固定报头】保留了3个字节的空间。 注:剩余长度最多占两字节。
-
- // 获取【可变报头】指针,并更新报文长度
- //----------------------------------------------------------------------------------------------------------------------------
- variable_header = (void*)(connection->buffer + connection->message.length); // 【可变报头】指针 = 报文缓存区指针+3(固定报头)
- connection->message.length += sizeof(*variable_header); // 报文长度 == 固定报头 + 可变报头
-
-
- // 协议名、协议级别赋值
- //-----------------------------------------------
- variable_header->lengthMsb = 0; // lengthMsb
- #if defined(PROTOCOL_NAMEv31)
- variable_header->lengthLsb = 6; // lengthLsb
- memcpy(variable_header->magic, "MQIsdp", 6);
- variable_header->version = 3; // v31版本 = 3
-
- #elif defined(PROTOCOL_NAMEv311)
- variable_header->lengthLsb = 4; // lengthLsb
- memcpy(variable_header->magic, "MQTT", 4);
- variable_header->version = 4; // v311版本 = 4
- #else
- #error "Please define protocol name"
- #endif
-
-
- //----------------------------------------------------------------------
- variable_header->flags = 0; // 连接标志字节 = 0(暂时清0,待会赋值)
-
- // 保持连接时长赋值
- //----------------------------------------------------------------------
- variable_header->keepaliveMsb = info->keepalive >> 8; // 赋值高字节
- variable_header->keepaliveLsb = info->keepalive & 0xff; // 赋值低字节
-
-
- // clean_session = 1:客户端和服务端必须丢弃之前的任何会话并开始一个新的会话
- //----------------------------------------------------------------------------
- if(info->clean_session)
- variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; //clean_session=1
-
-
- // 判断是否存在[client_id],存在则设置[client_id]字段
- //----------------------------------------------------------------------------
- if(info->client_id != NULL && info->client_id[0] != '\0')
- {
- // 将[client_id]字段添加到报文缓存区,报文长度+=[client_id]所占长度
- //--------------------------------------------------------------------------
- if(append_string(connection, info->client_id, strlen(info->client_id)) < 0)
- return fail_message(connection);
- }
- else
- return fail_message(connection); // 报文出错
-
- // 判断是否存在[will_topic]
- //---------------------------------------------------------------------------------
- if(info->will_topic != NULL && info->will_topic[0] != '\0')
- {
- // 将[will_topic]字段添加到报文缓存区,报文长度+=[will_topic]所占长度
- //--------------------------------------------------------------------------
- if(append_string(connection, info->will_topic,strlen(info->will_topic))<0)
- return fail_message(connection);
-
- // 将[will_message]字段添加到报文缓存区,报文长度+=[will_message]所占长度
- //----------------------------------------------------------------------------
- if(append_string(connection,info->will_message,strlen(info->will_message))<0)
- return fail_message(connection);
-
- // 设置【CONNECT】报文中的Will标志位:[Will Flag]、[Will QoS]、[Will Retain]
- //--------------------------------------------------------------------------
- variable_header->flags |= MQTT_CONNECT_FLAG_WILL; // 遗嘱标志位 = 1
- if(info->will_retain)
- variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;// WILL_RETAIN = 1
- variable_header->flags |= (info->will_qos & 3) << 3; // will质量赋值
- }
-
- // 判断是否存在[username]
- //----------------------------------------------------------------------------
- if(info->username != NULL && info->username[0] != '\0')
- {
- // 将[username]字段添加到报文缓存区,报文长度+=[username]所占长度
- //--------------------------------------------------------------------------
- if(append_string(connection, info->username, strlen(info->username)) < 0)
- return fail_message(connection);
-
- // 设置【CONNECT】报文中的[username]标志位
- //--------------------------------------------------------------------------
- variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME; // username = 1
- }
-
- // 判断是否存在[password]
- //----------------------------------------------------------------------------
- if(info->password != NULL && info->password[0] != '\0')
- {
- // 将[password]字段添加到报文缓存区,报文长度+=[password]所占长度
- //--------------------------------------------------------------------------
- if(append_string(connection, info->password, strlen(info->password)) < 0)
- return fail_message(connection);
-
- // 设置【CONNECT】报文中的[password]标志位
- //--------------------------------------------------------------------------
- variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD; // password = 1
- }
-
- // 设置【CONNECT】报文固定头:类型[Bit7~4=1]、[Bit3=0]、[Bit2~1=0]、[Bit1=0]
- //----------------------------------------------------------------------------
- return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);
- }
- // 将[参数字符串]字段添加到报文缓存区,报文长度+=[参数字符串]所占长度 【注:字符串前需添加两字节的前缀】
- //========================================================================================================
- static int ICACHE_FLASH_ATTR append_string(mqtt_connection_t* connection, const char* string, int len)
- {
- if(connection->message.length + len + 2 > connection->buffer_length) // 判断报文是否过长
- return -1;
-
- // 设置字符串前的两字节前缀,表示此字符串的长度
- //--------------------------------------------------------------------------
- connection->buffer[connection->message.length++] = len >> 8; // 高八位
- connection->buffer[connection->message.length++] = len & 0xff; // 低八位
-
- memcpy(connection->buffer+connection->message.length, string, len); // 将[参数字符串]添加到报文缓存区
-
- connection->message.length += len; // 报文长度 += [参数字符串]所占长度
-
- return len + 2; // 返回[参数字符串]在MQTT控制报文中所占长度
- }
参数二是报文类型,后三个参数是报文类型标志位,也就是固定报头的第一个字节的低四位
在此之前我们已经将可变报头部分和有效载荷部分写入了出站报文缓存区,这里我们通过可变报头加有效载荷的长度来设置固定报头。
如果可变报头加有效载荷的长度大于127,那么固定报头就是3个字节,第一个字节就是固定报头的首字节也就是报文类型再加上报文类型标志位,后面的两个字节是剩余长度字段。
如果可变报头加有效载荷的长度小于等于127,那么他的剩余长度占一个字节,它的固定报头是两个字节,所以我们预留给他的3个字节中的第一个字节是没有用的。固定报头的首字节写入3个字节中的第二个字节,剩余长度部分写入第三个字节,之后将出站缓存区的首地址或者是首地址加1,这个是由剩余长度或者是固定报头的长度来决定的,把出站报文赋值给MQTT报文指针
注:
在【PUBLISH】报文中,报文类型标志位由重复分发标志[dup][Bit3]、服务质量[qos][Bit2~1]、报文保留标志[retain][Bit1=0]组成。其余类型报文的报文类型标志位是固定的
- // 设置【MQTT控制报文】的固定报头
- //------------------------------------------------------------------------------------------------------------------------------
- // 注: 在【PUBLISH】报文中,报文类型标志位由重复分发标志[dup][Bit3]、服务质量[qos][Bit2~1]、报文保留标志[retain][Bit1=0]组成。
- // 其余类型报文的报文类型标志位是固定的。
- //==============================================================================================================================
- static mqtt_message_t* ICACHE_FLASH_ATTR fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain)
- {
- int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; // 获取【可变报头】+【有效载荷】长度
-
- // 设置固定报头(固定头中的剩余长度使用变长度编码方案,详情请参考MQTT协议手册)
- //----------------------------------------------------------------------------------------------------------
- if(remaining_length > 127) // 剩余长度占2字节
- {
- connection->buffer[0] = ((type&0x0f)<<4)|((dup&1)<<3)|((qos&3)<<1)|(retain&1); // 固定头的首字节赋值
-
- connection->buffer[1] = 0x80 | (remaining_length % 128); // 剩余长度的第一个字节
- connection->buffer[2] = remaining_length / 128; // 剩余长度的第二个字节
-
- connection->message.length = remaining_length + 3; // 报文的整个长度
- connection->message.data = connection->buffer; // MQTT报文指针 -> 出站报文缓存区首地址
- }
- else //if(remaining_length<= 127) // 剩余长度占1字节
- {
- // buffer[0] = 无
- connection->buffer[1] = ((type&0x0f)<<4)|((dup&1)<<3)|((qos&3)<<1)|(retain&1); // 固定头的首字节赋值
-
- connection->buffer[2] = remaining_length; // 固定头中的[剩余长度](可变报头+负载数据)
-
- connection->message.length = remaining_length + 2; // 报文的整个长度
- connection->message.data = connection->buffer + 1; // MQTT报文指针 -> 出站报文缓存区首地址+1
- }
-
- return &connection->message; // 返回报文首地址【报文数据、报文整体长度】
- }
我们已经配置【CONNECT】控制报文,并获取【CONNECT】报文[指针]、[长度],那么接下来我们通过TCP发送API来将我们配置的connect控制报文发送给MQTT服务端
- // TCP发送:数据=[client->mqtt_state.outbound_message->data]、长度=[client->mqtt_state.outbound_message->length]
- //-----------------------------------------------------------------------------------------------------------------
- espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
报文发送完成后清除mqtt报文指针
client->mqtt_state.outbound_message = NULL; // 报文发送完后,清除出站报文指针
那么当我们向mqtt服务端发送connect控制报文之后,mqtt服务端会向我们返回确实连接请求报文,当8266成功接收网络数据后会进入mqtt_tcpclient_recv这个回调函数,在这里判断当前接收到的mqtt报文是否是确认连接请求报文
if (msg_type == MQTT_MSG_TYPE_CONNACK)
并且判断8266是否是请求mqtt连接状态
if (client->mqtt_state.pending_msg_type!=MQTT_MSG_TYPE_CONNECT)
如果是的话将8266设为mqtt传输数据状态,并且执行mqtt连接成功的函数
- client->connState = MQTT_DATA; // ESP8266状态改变:【MQTT_DATA】
-
- if (client->connectedCb) // 执行[mqttConnectedCb]函数(MQTT连接成功函数)
- client->connectedCb((uint32_t*)client); // 参数 = mqttClient
- // TCP成功接收网络数据:【服务端->客户端ESP8266】
- //==================================================================================================================================================
- void ICACHE_FLASH_ATTR mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
- {
- uint8_t msg_type; // 消息类型
- uint8_t msg_qos; // 服务质量
- uint16_t msg_id; // 消息ID
-
- struct espconn *pCon = (struct espconn*)arg; // 获取TCP连接指针
-
- MQTT_Client *client = (MQTT_Client *)pCon->reverse; // 获取mqttClient指针
-
- client->keepAliveTick = 0; // 只要收到网络数据,客户端(ESP8266)心跳计数 = 0
-
- // 读取数据包
- //----------------------------------------------------------------------------------------------------------------------------
- READPACKET:
- INFO("TCP: data received %d bytes\r\n",len);
-
- // 【技小新】添加
- //###############################################################################################################################
- INFO("TCP: data received %d,%d,%d,%d \r\n", *pdata,*(pdata+1),*(pdata+2),*(pdata+3)); // 查看【确认连接请求】报文的具体值 #
- //###############################################################################################################################
-
- if (len
0) // 接收到的数据长度在允许范围内 - {
- os_memcpy(client->mqtt_state.in_buffer, pdata, len); // 获取接收数据,存入【入站报文缓存区】
-
- msg_type = mqtt_get_type(client->mqtt_state.in_buffer); // 获取【报文类型】
- msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); // 获取【消息质量】
- msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); // 获取报文中的【报文标识符】
-
- // 根据ESP8266运行状态,执行相应操作
- //-------------------------------------------------------------------------
- switch (client->connState)
- {
- case MQTT_CONNECT_SENDING: // 【MQTT_CONNECT_SENDING】
- if (msg_type == MQTT_MSG_TYPE_CONNACK) // 判断消息类型!=【CONNACK】
- {
- // ESP8266发送 != 【CONNECT】报文
- //--------------------------------------------------------------
- if (client->mqtt_state.pending_msg_type!=MQTT_MSG_TYPE_CONNECT)
- {
- INFO("MQTT: Invalid packet\r\n"); // 网络数据出错
-
- if (client->security) // 安全类型 != 0
- {
- #ifdef MQTT_SSL_ENABLE
- espconn_secure_disconnect(client->pCon);// 断开TCP连接
- #else
- INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else // 安全类型 = 0 = NO_TLS
- {
- espconn_disconnect(client->pCon); // 断开TCP连接
- }
- }
-
- // ESP8266发送 == 【CONNECT】报文
- //---------------------------------------------------------------------------------
- else
- {
- INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port);
-
- client->connState = MQTT_DATA; // ESP8266状态改变:【MQTT_DATA】
-
- if (client->connectedCb) // 执行[mqttConnectedCb]函数(MQTT连接成功函数)
- client->connectedCb((uint32_t*)client); // 参数 = mqttClient
- }
- }
- break;
-
-
- // 当前ESP8266状态 == MQTT_DATA || MQTT_KEEPALIVE_SEND
- //-----------------------------------------------------
- case MQTT_DATA:
- case MQTT_KEEPALIVE_SEND:
-
- client->mqtt_state.message_length_read = len; // 获取接收网络数据的长度
-
- // 计算接收到的网络数据中,报文的实际长度(通过【剩余长度】得到)
- //------------------------------------------------------------------------------------------------------------------------------
- client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
-
-
- // 当前ESP8266状态 ==【MQTT_DATA || MQTT_KEEPALIVE_SEND】,根据接收的报文类型,决定ESP8266接下来的动作
- //-----------------------------------------------------------------------------------------------------------------------
- switch (msg_type)
- {
- // ESP8266接收到【SUBACK】报文:订阅确认
- //-----------------------------------------------------------------------------------------------------------------------
- case MQTT_MSG_TYPE_SUBACK:
- // 判断ESP8266之前发送的报文是否是【订阅主题】
- //-------------------------------------------------------------------------------------------------------------
- if (client->mqtt_state.pending_msg_type==MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id==msg_id)
- INFO("MQTT: Subscribe successful\r\n"); //
- break;
- //-----------------------------------------------------------------------------------------------------------------------
-
-
- // ESP8266接收到【UNSUBACK】报文:取消订阅确认
- //-----------------------------------------------------------------------------------------------------------------------
- case MQTT_MSG_TYPE_UNSUBACK:
- // 判断ESP8266之前发送的报文是否是【订阅主题】
- //------------------------------------------------------------------------------------------------------------------
- if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
- INFO("MQTT: UnSubscribe successful\r\n");
- break;
- //-----------------------------------------------------------------------------------------------------------------------
-
-
- // ESP8266接收到【PUBLISH】报文:发布消息
- //--------------------------------------------------------------------------------------------------------------------------------
- case MQTT_MSG_TYPE_PUBLISH:
-
- if (msg_qos == 1) // 【服务端->客户端】发布消息 Qos=1
- client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); // 配置【PUBACK】报文
- else if (msg_qos == 2) // 【服务端->客户端】发布消息 Qos=2
- client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); // 配置【PUBREC】报文
-
- if (msg_qos == 1 || msg_qos == 2)
- {
- INFO("MQTT: Queue response QoS: %d\r\n", msg_qos);
-
- // 将ESP8266应答报文(【PUBACK】或【PUBREC】),写入队列
- //-------------------------------------------------------------------------------------------------------------------------------
- if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1)
- {
- INFO("MQTT: Queue full\r\n");
- }
- }
-
- /// 获取服务端【PUBLISH】报文的【主题】、【有效载荷】
- //---------------------------------------------------------------------------------------------
- deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
-
- break;
- //-----------------------------------------------------------------------------------------------------------------------
-
- // ESP8266接收到【PUBACK】报文:发布消息应答
- //-------------------------------------------------------------------------------------------------------------------
- case MQTT_MSG_TYPE_PUBACK:
- if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id)
- {
- INFO("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n");
- }
- break;
- //-------------------------------------------------------------------------------------------------------------------
-
-
- // Qos == 2
- //-------------------------------------------------------------------------------------------------------------------------------------
- case MQTT_MSG_TYPE_PUBREC:
- client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
- if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
- INFO("MQTT: Queue full\r\n");
- }
- break;
- case MQTT_MSG_TYPE_PUBREL:
- client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
- if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
- INFO("MQTT: Queue full\r\n");
- }
- break;
- case MQTT_MSG_TYPE_PUBCOMP:
- if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) {
- INFO("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\r\n");
- }
- break;
- //-------------------------------------------------------------------------------------------------------------------------------------
-
-
- // ESP8266接收到【PINGREQ】报文:【这个是服务端发送给客户端的,正常情况下,ESP8266不会收到此报文】
- //-------------------------------------------------------------------------------------------------------------------------------------
- case MQTT_MSG_TYPE_PINGREQ:
- client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection);
- if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
- INFO("MQTT: Queue full\r\n");
- }
- break;
- //-------------------------------------------------------------------------------------------------------------------------------------
-
-
- // ESP8266接收到【PINGRESP】报文:心跳应答
- //-----------------------------------------
- case MQTT_MSG_TYPE_PINGRESP:
- // Ignore // 心跳应答报文则忽略
- break;
- //-----------------------------------------
- }
-
-
- // NOTE: this is done down here and not in the switch case above
- // because the PSOCK_READBUF_LEN() won't work inside a switch
- // statement due to the way protothreads resume.
-
- // ESP8266接收到【PUBLISH】报文:发布消息
- //-------------------------------------------------------------------------------------
- if (msg_type == MQTT_MSG_TYPE_PUBLISH)
- {
- len = client->mqtt_state.message_length_read;
-
- // 报文实际长度 < 网络数据长度
- //------------------------------------------------------------------------------
- if (client->mqtt_state.message_length < client->mqtt_state.message_length_read)
- {
- //client->connState = MQTT_PUBLISH_RECV;
- //Not Implement yet
- len -= client->mqtt_state.message_length; // 跳过之前解析过的报文
- pdata += client->mqtt_state.message_length; // 指向之后为解析的网络数据
-
- INFO("Get another published message\r\n");
-
- goto READPACKET; // 重新解析网络数据
- }
- }
- break; // case MQTT_DATA:
- // case MQTT_KEEPALIVE_SEND:
- //-------------------------------------------------------------------------------------
- }
- }
-
- else // 接收到的报文出错
- {
- INFO("ERROR: Message too long\r\n");
- }
-
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务MQTT_Task
- }
如果8266接收到订阅确认报文的话,他会串口打印订阅成功。
如果8266向mqtt服务端发送发布消息报文并且消息质量等于1,mqtt服务端会向他应答发布确认报文,8266会串口打印接收发布确认报文,QoS等于1的消息发布成功
当然mqtt服务端并不只会向8266发送应答,当有其他客户端向8266订阅主题发布消息,mqtt服务端会将消息分发给8266,8266会收到publish报文,如果消息质量等于1,8266会配置发布确认报文mqtt_msg_puback,并且将这个报文写入队列当中QUEUE_Puts。
调用deliver_publish这个函数来获取publish报文的主题和有效载荷
- // ESP8266接收到【SUBACK】报文:订阅确认
- //-----------------------------------------------------------------------------------------------------------------------
- case MQTT_MSG_TYPE_SUBACK:
- // 判断ESP8266之前发送的报文是否是【订阅主题】
- //-------------------------------------------------------------------------------------------------------------
- if (client->mqtt_state.pending_msg_type==MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id==msg_id)
- INFO("MQTT: Subscribe successful\r\n"); //
- break;
- // ESP8266接收到【PUBACK】报文:发布消息应答
- //-------------------------------------------------------------------------------------------------------------------
- case MQTT_MSG_TYPE_PUBACK:
- if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id)
- {
- INFO("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n");
- }
- break;
- // ESP8266接收到【PUBLISH】报文:发布消息
- //--------------------------------------------------------------------------------------------------------------------------------
- case MQTT_MSG_TYPE_PUBLISH:
-
- if (msg_qos == 1) // 【服务端->客户端】发布消息 Qos=1
- client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); // 配置【PUBACK】报文
- else if (msg_qos == 2) // 【服务端->客户端】发布消息 Qos=2
- client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); // 配置【PUBREC】报文
-
- if (msg_qos == 1 || msg_qos == 2)
- {
- INFO("MQTT: Queue response QoS: %d\r\n", msg_qos);
-
- // 将ESP8266应答报文(【PUBACK】或【PUBREC】),写入队列
- //-------------------------------------------------------------------------------------------------------------------------------
- if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1)
- {
- INFO("MQTT: Queue full\r\n");
- }
- }
-
- /// 获取服务端【PUBLISH】报文的【主题】、【有效载荷】
- //---------------------------------------------------------------------------------------------
- deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
-
- break;
- /// 获取服务端【PUBLISH】报文的【主题】、【有效载荷】
- //---------------------------------------------------------------------------------------------
- deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
-
- break;
进入deliver_publish
当成功获取到主题名和有效载荷之后,进入mqtt接受收数据函数
- // ESP8266获取服务端【PUBLISH】报文的【主题】、【有效载荷】
- //===================================================================================================================================
- LOCAL void ICACHE_FLASH_ATTR deliver_publish(MQTT_Client* client, uint8_t* message, int length)
- {
- mqtt_event_data_t event_data;
-
- event_data.topic_length = length; // 主题名长度初始化
- event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length); // 获取【PUBLISH】报文的主题名(指针)、主题名长度
-
- event_data.data_length = length; // 有效载荷长度初始化
- event_data.data = mqtt_get_publish_data(message, &event_data.data_length); // 获取【PUBLISH】报文的载荷(指针)、载荷长度
-
-
- // 进入【接收MQTT的[PUBLISH]数据】函数
- //-------------------------------------------------------------------------------------------------------------------------
- if (client->dataCb)
- client->dataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length);
- }
在成功接收mqtt数据函数这里根据接收到的主题名/有效载荷,控制LED的亮/灭
- // 【接收MQTT的[PUBLISH]数据】函数 【参数1:主题 / 参数2:主题长度 / 参数3:有效载荷 / 参数4:有效载荷长度】
- //===============================================================================================================
- void mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t data_len)
- {
- char *topicBuf = (char*)os_zalloc(topic_len+1); // 申请【主题】空间
- char *dataBuf = (char*)os_zalloc(data_len+1); // 申请【有效载荷】空间
-
-
- MQTT_Client* client = (MQTT_Client*)args; // 获取MQTT_Client指针
-
-
- os_memcpy(topicBuf, topic, topic_len); // 缓存主题
- topicBuf[topic_len] = 0; // 最后添'\0'
-
- os_memcpy(dataBuf, data, data_len); // 缓存有效载荷
- dataBuf[data_len] = 0; // 最后添'\0'
-
- INFO("Receive topic: %s, data: %s \r\n", topicBuf, dataBuf); // 串口打印【主题】【有效载荷】
- INFO("Topic_len = %d, Data_len = %d\r\n", topic_len, data_len); // 串口打印【主题长度】【有效载荷长度】
-
-
- // 【技小新】添加
- //########################################################################################
- // 根据接收到的主题名/有效载荷,控制LED的亮/灭
- //-----------------------------------------------------------------------------------
- if( os_strcmp(topicBuf,"SW_LED") == 0 ) // 主题 == "SW_LED"
- {
- if( os_strcmp(dataBuf,"LED_ON") == 0 ) // 有效载荷 == "LED_ON"
- {
- GPIO_OUTPUT_SET(GPIO_ID_PIN(4),0); // LED亮
- }
-
- else if( os_strcmp(dataBuf,"LED_OFF") == 0 ) // 有效载荷 == "LED_OFF"
- {
- GPIO_OUTPUT_SET(GPIO_ID_PIN(4),1); // LED灭
- }
- }
- //########################################################################################
-
-
- os_free(topicBuf); // 释放【主题】空间
- os_free(dataBuf); // 释放【有效载荷】空间
- }
在这里调用mqtt_get_publish_topic这个函数,来获取发布消息报文中的主题名,和主题名长度
- // 获取【PUBLISH】报文中的主题名(指针)、主题名长度
- //=========================================================================================
- const char* ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length)
- {
- int i;
- int totlen = 0;
- int topiclen;
-
- // 计算剩余长度的值
- //--------------------------------------------------------------------
- for(i = 1; i<*length; ++i) // 解析【剩余长度】字段(从buffer[1]开始)
- {
- totlen += (buffer[i]&0x7f)<<(7*(i-1));
-
- // 如果剩余长度的当前字节的值<0x80,则表示此字节是最后的字节
- //-----------------------------------------------------------------
- if((buffer[i] & 0x80) == 0) // 当前字节的值<0x80
- {
- ++i; // i == 固定报头长度 == 报文类型(1字节) + 剩余长度字段
-
- break; // 跳出循环
- }
- }
-
- totlen += i; // 报文总长度(这句可删,没有用)
-
-
- if(i + 2 >= *length) // 如果没有载荷,则返回NULL
- return NULL;
-
- topiclen = buffer[i++] << 8; // 获取主题名长度(2字节)
- topiclen |= buffer[i++]; // 前缀
-
- if(i + topiclen > *length) // 报文出错(没有主题名),返回NULL
- return NULL;
-
- *length = topiclen; // 返回主题名长度
-
调用mqtt_get_publish_data这个函数 ,来获取有效载荷和有效载荷长度
可以到mqtt手册发布消息章节看下他们是如何解析主题名和有效载荷的
- // 获取【PUBLISH】报文的载荷(指针)、载荷长度
- //========================================================================================================
- const char* ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t* buffer, uint16_t* length)
- {
- int i;
- int totlen = 0;
- int topiclen;
- int blength = *length;
- *length = 0;
-
- // 计算剩余长度的值
- //--------------------------------------------------------------------
- for(i = 1; i
// 解析【剩余长度】字段(从buffer[1]开始) - {
- totlen += (buffer[i]&0x7f)<<(7*(i-1));
-
- // 如果剩余长度的当前字节的值<0x80,则表示此字节是最后的字节
- //-----------------------------------------------------------------
- if((buffer[i] & 0x80) == 0)
- {
- ++i; // i == 固定报头长度 == 报文类型(1字节) + 剩余长度字段
-
- break; // 跳出循环
- }
- }
-
- totlen += i; // 报文总长度 = 剩余长度表示的值 + 固定头所占字节
-
-
- if(i + 2 >= blength) // 如果没有载荷,则返回NULL
- return NULL;
-
- topiclen = buffer[i++] << 8; // 获取主题名长度(2字节)
- topiclen |= buffer[i++]; // 前缀
-
- if(i+topiclen >= blength) // 报文出错(没有主题名),返回NULL
- return NULL;
-
- i += topiclen; // i = 【固定报头】+【主题名(包括前缀)】
-
- // Qos > 0
- //----------------------------------------------------------------------------
- if(mqtt_get_qos(buffer)>0) // 当Qos>0时,【主题名】字段后面是【报文标识符】
- {
- if(i + 2 >= blength) // 报文错误(无载荷)
- return NULL;
-
- i += 2; // i = 【固定报头】+【可变报头】
- }
-
- if(totlen < i) // 报文错误,返回NULL
- return NULL;
-
- if(totlen <= blength) // 报文总长度 <= 网络接收数据长度
- *length = totlen - i; // 【有效载荷】长度 = 报文长度- (【固定报头】长度+【可变报头】长度)
-
- else // 报文总长度 > 网络接收数据长度【丢失数据/未接收完毕】
- *length = blength - i; // 有效载荷长度 = 网络接收数据长度 - (【固定报头】长度+【可变报头】长度)
-
- return (const char*)(buffer + i); // 返回有效载荷首地址
- }
当8266作为mqtt客户端成功连接到mqtt服务端之后,他就可以向服务端订阅主题,发布消息
我们来看下他是如何订阅主题的,进入MQTT_Subscribe
再接着我们来看发布消息函数MQTT_Publish
这样我们就向队列当中成功的写入了订阅主题报文和发布消息报文
接下来看下任务函数
- // MQTT已成功连接:ESP8266发送【CONNECT】,并接收到【CONNACK】
- //============================================================================
- void mqttConnectedCb(uint32_t *args)
- {
- MQTT_Client* client = (MQTT_Client*)args; // 获取mqttClient指针
-
- INFO("MQTT: Connected\r\n");
-
- // 【参数2:主题过滤器 / 参数3:订阅Qos】
- //-----------------------------------------------------------------
- MQTT_Subscribe(client, "SW_LED", 0); // 订阅主题"SW_LED",QoS=0
- // MQTT_Subscribe(client, "SW_LED", 1);
- // MQTT_Subscribe(client, "SW_LED", 2);
-
- // 【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain】
- //-----------------------------------------------------------------------------------------------------------------------------------------
- MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 0, 0); // 向主题"SW_LED"发布"ESP8266_Online",Qos=0、retain=0
- // MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 1, 0);
- // MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 2, 0);
- }
首先配置【SUBSCRIBE】报文,和之前配置connect控制报文差不太多,当配置完订阅主题报文后,需要特别注意一下,我们不是直接使用tcp接口来将订阅报文发送给mqtt服务端,而是将订阅报文写入队列当中QUEUE_Puts
用过调用这个MQTT_Subscribe函数来将订阅主题报文写入到队列当中,之后安排系统任务
- // ESP8266订阅主题【参数2:主题过滤器 / 参数3:订阅Qos】
- //============================================================================================================================================
- BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
- {
- uint8_t dataBuffer[MQTT_BUF_SIZE]; // 解析后报文缓存(1204字节)
- uint16_t dataLen; // 解析后报文长度
-
- // 配置【SUBSCRIBE】报文,并获取【SUBSCRIBE】报文[指针]、[长度]
- //----------------------------------------------------------------------------------------------------------------------------------------
- client->mqtt_state.outbound_message=mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,topic, qos,&client->mqtt_state.pending_msg_id);
-
- INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
-
-
- // 将报文写入队列,并返回写入字节数(包括特殊码)
- //----------------------------------------------------------------------------------------------------------------------------------
- while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1)
- {
- INFO("MQTT: Queue full\r\n");
-
- // 解析队列中的报文
- //-----------------------------------------------------------------------------------------------
- if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) // 解析失败 = -1
- {
- INFO("MQTT: Serious buffer error\r\n");
-
- return FALSE;
- }
- }
-
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务MQTT_Task
-
- return TRUE;
- }
将固定报头部分,可变报头部分(报文标识符),有效载荷部分(主题)依次写入出站报文缓存区,
- // 配置【SUBSCRIBE】报文:【参数2:订阅主题 / 参数3:订阅质量 / 参数4:报文标识符】
- //=======================================================================================================================================
- mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id)
- {
- init_message(connection); // 报文长度 = 3(固定头)
-
- if(topic == NULL || topic[0] == '\0') // 主题无效
- return fail_message(connection);
-
- if((*message_id = append_message_id(connection, 0)) == 0) // 添加[报文标识符]
- return fail_message(connection);
-
- if(append_string(connection, topic, strlen(topic)) < 0) // 添加[主题]
- return fail_message(connection);
-
- if(connection->message.length + 1 > connection->buffer_length)// 判断报文长度
- return fail_message(connection);
-
- connection->buffer[connection->message.length++] = qos; // 设置消息质量QoS
-
-
- // 设置【SUBSCRIBE】报文的固定报头
- //-----------------------------------------------------------------
- return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
- }
- // 将报文写入队列,并返回写入字节数(包括特殊码)
- //=================================================================================
- int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t* buffer, uint16_t len)
- {
- return PROTO_AddRb(&queue->rb, buffer, len);
- }
队列是另外开辟的长度为2048 字节的一块内存,它的作用是暂时缓存一个或多个8266将要发送给mqtt服务端的报文,之后在任务函数中再将这些报文一次发送给mqtt服务端
那么这样的话就出现一个问题,当队列中有多个报文的话,我们如何将这些报文区别开,也就是我们怎么知道呢个字节是报文的头,呢个字节又是报文的尾呢?程序中使用的方法是在报文头之前添加一个起始码0x7E,在报文尾添加结束码0x7F,这样的话只要我们看到队列中有0x7E,0x7F那么他们中间就是一个完整的mqtt报文。那么这样的话又会出现一个小问题,如果我们mqtt报文中本身包含有0x7E,0x7F这样的数值该怎么办呢?
解决的办法是只要报文中包含有0x7D,0x7E,0x7F这样的数值,就将这个数值和0x20相异或,并且在他之前添加一个前缀码0x7D为什么要这样做呢?是因为假设一个数a和b相异或,异或的结果再和b相异或得到的最终结果就是a
这样的话我们再将mqtt报文放入队列的时候只需要将0x7D,0x7E,0x7F三个特殊码和0x20异或,在前面添加上前缀码0x7D,这样的话这个报文就不会包含有起始码和结束码,我们就可以通过起始码和结束码来找到这个完整的报文,当我们解析队列中的报文的时候,我们首先找到起始码之后将每个字节依次解析如果看到有0x7D这样的数值说明他之后的这个字节是一个特殊码,我们将它之后的这个字节再异或上0x20就得到了mqtt报文中这个字节本身的值,之后一直解析直到遇到结束码,这个报文就解析完成了
来看下函数的具体实现,首先向队列中写入0x7起始码,之后进入一个循环,循环次数就是报文的总长度也就是说报文的所有字节,一字不落的依次写入队列当中,当写入队列的时候,要判断当前字节是否是特殊码,如果是特殊码的话要在前面加上0x7D前缀码,并且将这个特殊码异或上0x20,如果不是特殊码的话就直接将这个数值写入队列
当报文都写入队列完成之后呢,最后要添加一个0x7F的结束码,并且返回写入数量,这样的话这个函数就执行完毕了

- // 将报文依次写入队列写指针(p_w)所指向的位置,添加【起始码】【结束码】【前缀码】,并返回写入的字节数
- //---------------------------------------------------------------------------------------------------------
- // 注:队列中【起始码】=[0x7E]、【结束码】=[0x7F]、【前缀码】=[Ox7D]。 报文中的[0x7D][0x7E][0x7F],需要避讳特殊码(通过异或方式)
- //===============================================================================================================================
- I16 ICACHE_FLASH_ATTR PROTO_AddRb(RINGBUF *rb, const U8 *packet, I16 len)
- {
- U16 i = 2;
-
- if(RINGBUF_Put(rb,0x7E)==-1) return -1; // 向当前队列写指针指向处写入【起始码:0x7E】
-
- while (len--) // 循环[len]次(报文所有字节)
- {
- switch (*packet) // 获取当前数据包的一个字节
- {
- case 0x7D: // 判断数据 ?= 【0x7D】/【0x7E】/【0x7F】
- case 0x7E:
- case 0x7F:
-
- // 如果数据==[0x7D]||[0x7E]||[0x7F],都在此数据前写入[0x7D]【因为[0x7E]==起始码、[0x7F]==结束码】
- //-----------------------------------------------------------------------------------------------------------
- if(RINGBUF_Put(rb, 0x7D) == -1) return -1; // 在此数据前写入[0x7D]
-
- if(RINGBUF_Put(rb, *packet++^0x20) == -1) return -1; // 【0x7D/0x7E/0x7F】^=0x20,写入队列(注:a^b^b == a)
-
- i += 2; // 写入队列的字节数+2
-
- break;
-
- // 数据包当前数据不是特殊码,则正常写入
- //------------------------------------------------------------------------------
- default:
- if(RINGBUF_Put(rb, *packet++) == -1) return -1; // 写入数据包指针对应值
-
- i++; // 写入队列的字节数+1
-
- break;
- }
- }
-
- if(RINGBUF_Put(rb, 0x7F) == -1) return -1; // 向当前队列写指针指向处写入[结束码:0x7F]
-
- return i; // 返回写入数量(包括起始码、结束码)
- }
首先呢他依然是配置发送消息PUBLISH报文,然后将报文写入队列中,最后向系统安排任务
- // ESP8266向主题发布消息:【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain】
- //============================================================================================================================================
- BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain)
- {
- uint8_t dataBuffer[MQTT_BUF_SIZE]; // 解析后报文缓存(1204字节)
- uint16_t dataLen; // 解析后报文长度
-
- // 配置【PUBLISH】报文,并获取【PUBLISH】报文[指针]、[长度]
- //------------------------------------------------------------------------------------------
- client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
- topic, data, data_length,
- qos, retain,
- &client->mqtt_state.pending_msg_id);
-
- if (client->mqtt_state.outbound_message->length == 0) // 判断报文是否正确
- {
- INFO("MQTT: Queuing publish failed\r\n");
- return FALSE;
- }
-
- // 串口打印:【PUBLISH】报文长度,(队列装填数量/队列大小)
- //--------------------------------------------------------------------------------------------------------------------------------------------------------------------
- INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
-
- // 将报文写入队列,并返回写入字节数(包括特殊码)
- //----------------------------------------------------------------------------------------------------------------------------------
- while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1)
- {
- INFO("MQTT: Queue full\r\n"); // 队列已满
-
- // 解析队列中的数据包
- //-----------------------------------------------------------------------------------------------
- if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) // 解析失败 = -1
- {
- INFO("MQTT: Serious buffer error\r\n");
-
- return FALSE;
- }
- }
-
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务
-
- return TRUE;
- }
MQTT任务函数中会根据ESP8266运行状态,执行相应操作,8266已经和mqtt服务端建立了mqtt连接,所以说是mqtt传输数据状态,首先调用QUEUE_IsEmpty这个函数来判断队列是否为空,如果队列不为空的话来调用QUEUE_Gets,来解析一个完整的报文
- // 解析队列中的报文【参数2:报文解析后缓存指针 / 参数3:解析后的报文长度】
- //===================================================================================================
- int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen)
- {
- return PROTO_ParseRb(&queue->rb, buffer, len, maxLen);
- }
循环解析队列直到遇见0x7F结束码,也就是说即使队列中有多个未被解析的报文,他也仅仅解析一个
- // 解析队列中的数据,解析的报文指针赋值给[参数2:bufOut]【遇到[0x7F]结束码则返回0】
- //=====================================================================================
- I16 ICACHE_FLASH_ATTR PROTO_ParseRb(RINGBUF* rb, U8 *bufOut, U16* len, U16 maxBufLen)
- {
- U8 c;
-
- PROTO_PARSER proto; // 定义队列解析结构体
-
- PROTO_Init(&proto, NULL, bufOut, maxBufLen); // 初始化队列解析结构体
-
- // 循环解析,直到遇见【Ox7F】结束码
- //-----------------------------------------------------------------
- while(RINGBUF_Get(rb, &c) == 0)
- {
- if(PROTO_ParseByte(&proto, c) == 0) // 解析队列的一个字节
- {
- // 遇到【0x7F】结束码,进入这里
- //------------------------------------------------
- *len = proto.dataLen; // 解析后的报文长度
- return 0; // 跳出循环,函数结束
- }
- }
- return -1; // 解析失败
- }
之后调用tcp发送接口espconn_send ,来将解析后的报文、发送给mqtt服务端,那么mqtt服务端接收到8266向他发送的这些mqtt控制报文,他会根据这些报文来进行相应的应答,
- // MQTT任务函数【任务:根据ESP8266运行状态,执行相应操作】
- //--------------------------------------------------------------------------------------------
- // TCP_RECONNECT_REQ TCP重连请求(等待5秒) 退出Tsak(5秒后,进入TCP_RECONNECT状态)
- //--------------------------------------------------------------------------------------------
- // TCP_RECONNECT TCP重新连接 执行MQTT连接准备,并设置ESP8266状态
- //--------------------------------------------------------------------------------------------
- // MQTT_DELETING MQTT正在删除 TCP断开连接
- // TCP_DISCONNECTING TCP正在断开
- // TCP_RECONNECT_DISCONNECTING TCP暂时断开(断开后会重连)
- //--------------------------------------------------------------------------------------------
- // TCP_DISCONNECTED TCP成功断开 删除TCP连接,并释放pCon内存
- //--------------------------------------------------------------------------------------------
- // MQTT_DELETED MQTT已删除 删除MQTT客户端,并释放相关内存
- //--------------------------------------------------------------------------------------------
- // MQTT_KEEPALIVE_SEND MQTT心跳 向服务器发送心跳报文
- //--------------------------------------------------------------------------------------------
- // MQTT_DATA MQTT数据传输 TCP发送队列中的报文
- //====================================================================================================================================
- void ICACHE_FLASH_ATTR MQTT_Task(os_event_t *e) // 不判断消息类型
- {
- INFO("\r\n------------- MQTT_Task -------------\r\n");
-
- MQTT_Client* client = (MQTT_Client*)e->par; // 【e->par】 == 【mqttClient指针的值】,所以需类型转换
-
- uint8_t dataBuffer[MQTT_BUF_SIZE]; // 数据缓存区(1204字节)
-
- uint16_t dataLen; // 数据长度
-
- if (e->par == 0) // 没有mqttClient指针,错误
- return;
-
-
- // 根据ESP8266运行状态,执行相应操作
- //………………………………………………………………………………………………………………………………………………………………………
- switch (client->connState)
- {
- // TCP重连请求(等待5秒),退出Tsak
- //---------------------------------
- case TCP_RECONNECT_REQ: break;
- //---------------------------------
-
-
- // TCP重新连接:执行MQTT连接准备,并设置ESP8266状态
- //--------------------------------------------------------------------------------
- case TCP_RECONNECT:
-
- mqtt_tcpclient_delete(client); // 删除TCP连接、释放pCon内存、清除TCP连接指针
-
- MQTT_Connect(client); // MQTT连接准备:TCP连接、域名解析等
-
- INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port);
-
- client->connState = TCP_CONNECTING; // TCP正在连接
-
- break;
- //--------------------------------------------------------------------------------
-
-
- // MQTT正在删除、TCP正在断开、【心跳请求】报文发送失败:TCP断开连接
- //------------------------------------------------------------------
- case MQTT_DELETING:
- case TCP_DISCONNECTING:
- case TCP_RECONNECT_DISCONNECTING:
- if (client->security) // 安全类型 != 0
- {
- #ifdef MQTT_SSL_ENABLE
- espconn_secure_disconnect(client->pCon);
- #else
- INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else // 安全类型 = 0 = NO_TLS
- {
- espconn_disconnect(client->pCon); // TCP断开连接
- }
-
- break;
- //------------------------------------------------------------------
-
-
- // TCP成功断开
- //--------------------------------------------------------------------------------
- case TCP_DISCONNECTED:
- INFO("MQTT: Disconnected\r\n");
- mqtt_tcpclient_delete(client); // 删除TCP连接、释放pCon内存、清除TCP连接指针
- break;
- //--------------------------------------------------------------------------------
-
-
- // MQTT已删除:ESP8266的状态为[MQTT已删除]后,将MQTT相关内存释放
- //--------------------------------------------------------------------
- case MQTT_DELETED:
- INFO("MQTT: Deleted client\r\n");
- mqtt_client_delete(client); // 删除MQTT客户端,并释放相关内存
- break;
-
-
- // MQTT客户端存活报告
- //--------------------------------------------------------------------
- case MQTT_KEEPALIVE_SEND:
- mqtt_send_keepalive(client); // 向MQTT服务器发送【心跳】报文
- break;
-
-
- // MQTT传输数据状态
- //-------------------------------------------------------------------------------------------------------------------------------
- case MQTT_DATA:
- if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0)
- {
- break; // 【队列为空 || 发送未结束】,不执行操作
- }
-
- // 【队列非空 && 发送结束】:解析并发送 队列中的报文
- //--------------------------------------------------------------------------------------------------------
- if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0) // 解析成功 = 0
- {
- client->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer); // 获取报文中的【报文类型】
- client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen); // 获取报文中的【报文标识符】
-
- client->sendTimeout = MQTT_SEND_TIMOUT; // 发送MQTT报文时,sendTimeout=5
-
- INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
-
-
- // 发送报文
- //-----------------------------------------------------------------------
- if (client->security) // 安全类型 != 0
- {
- #ifdef MQTT_SSL_ENABLE
- espconn_secure_send(client->pCon, dataBuffer, dataLen);
- #else
- INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else // 安全类型 = 0 = NO_TLS
- {
- espconn_send(client->pCon, dataBuffer, dataLen); // TCP发送数据包
- }
-
- client->mqtt_state.outbound_message = NULL; // 报文发送完后,清除出站报文指针
-
- break;
- }
- break;
- }
- //………………………………………………………………………………………………………………………………………………………………………
-
- } // 函数【MQTT_Task】结束
回到mqtt_tcpclient_recv ,
我们已经设置8266连接mqtt服务端,向服务端订阅主题,向服务端发布消息以及接收服务端分发的消息,但是不要忘了我们8266在连接mqtt服务端的时候,设置了一个非常重要的参数---保持连接时长,8266需要在保持连接市场内向mqtt服务端发起通信,否则mqtt服务端将会断开与8266的连接,在定时回调函数中,8266一直进行着心跳计数每秒钟加一
- if (client->connState == MQTT_DATA) // MQTT_DATA
- {
- client->keepAliveTick ++; // 客户端(ESP8266)心跳计数++
如果心跳计数大于我们设定的保持连接时长的1/2,我们就发送心跳报文
- // 判断客户端(ESP8266)心跳计数 ?> 保持连接的1/2时间
- //--------------------------------------------------------------------------------------------------------------------------
- if (client->keepAliveTick>(client->mqtt_state.connect_info->keepalive/2)) //【注:官方例程中是:判断是否超过保持连接时长】
- {
- client->connState = MQTT_KEEPALIVE_SEND; // MQTT_KEEPALIVE_SEND
在这里配置心跳请求报文
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); // 设置【PINGREQ】报文
配置完成后直接调用tcp接口,将心跳报文发送给mqtt服务端
result = espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
如果发送成功的话,将心跳计数清零,
- if(ESPCONN_OK == result) // 网络数据发送成功
- {
- client->keepAliveTick = 0; // 心跳计数 = 0
- // 向MQTT服务器发送【心跳】报文(报文不写入队列,TCP直接发送)
- //===================================================================================================================================
- void ICACHE_FLASH_ATTR mqtt_send_keepalive(MQTT_Client *client)
- {
- INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
-
- client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); // 设置【PINGREQ】报文
- client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ;
- client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); // 获取报文类型
- client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
-
- // TCP发送成功/5秒计时结束 => 报文发送结束(sendTimeout=0)
- //-------------------------------------------------------------------------
- client->sendTimeout = MQTT_SEND_TIMOUT; // 发送MQTT报文时,sendTimeout=5
-
- INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
-
- err_t result = ESPCONN_OK;
-
- if (client->security) // 安全类型 != 0
- {
- #ifdef MQTT_SSL_ENABLE
- result = espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
- #else
- INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else // 安全类型 = 0 = NO_TLS
- {
- result = espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
- }
-
- client->mqtt_state.outbound_message = NULL; // 报文发送完后,清除出站报文指针
-
-
- if(ESPCONN_OK == result) // 网络数据发送成功
- {
- client->keepAliveTick = 0; // 心跳计数 = 0
-
- client->connState = MQTT_DATA; // ESP8266当前状态 = MQTT传输数据
-
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务MQTT_Task
- }
-
- else // 【心跳请求】发送失败
- {
- client->connState = TCP_RECONNECT_DISCONNECTING; // TCP暂时断开(断开后会重连)
-
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务MQTT_Task
- }
- }
- // MQTT定时(1秒)【功能:心跳计时、重连计时、TCP发送计时】
- //================================================================================
- void ICACHE_FLASH_ATTR mqtt_timer(void *arg)
- {
- MQTT_Client* client = (MQTT_Client*)arg;
-
- // 如果当前是[MQTT_DATA]状态,则进行存活计时操作
- //--------------------------------------------------------------------------
- if (client->connState == MQTT_DATA) // MQTT_DATA
- {
- client->keepAliveTick ++; // 客户端(ESP8266)心跳计数++
-
- // 判断客户端(ESP8266)心跳计数 ?> 保持连接的1/2时间
- //--------------------------------------------------------------------------------------------------------------------------
- if (client->keepAliveTick>(client->mqtt_state.connect_info->keepalive/2)) //【注:官方例程中是:判断是否超过保持连接时长】
- {
- client->connState = MQTT_KEEPALIVE_SEND; // MQTT_KEEPALIVE_SEND
-
- system_os_post(MQTT_TASK_PRIO,0,(os_param_t)client);// 安排任务
- }
- }
-
- // 重连等待计时:当进入重连请求状态后,需等待5秒,之后进行重新连接
- //--------------------------------------------------------------------------
- else if (client->connState == TCP_RECONNECT_REQ) // TCP重连请求(等待5秒)
- {
- client->reconnectTick ++; // 重连计时++
-
- if (client->reconnectTick > MQTT_RECONNECT_TIMEOUT) // 重连请求超过5秒
- {
- client->reconnectTick = 0; // 重连计时 = 0
-
- client->connState = TCP_RECONNECT; // TCP重新连接
-
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务
-
- // 重连等待计时结束
- //-----------------------------------------------------------------
- if (client->timeoutCb) // 未创建回调函数
- client->timeoutCb((uint32_t*)client);
- }
- }
-
- // TCP发送成功/报文发送5秒计时结束 => 报文发送结束(sendTimeout=0)
- //----------------------------------------------------------------
- if (client->sendTimeout>0) // 发送MQTT报文时,sendTimeout=5
- client->sendTimeout --; // sendTimeout每秒递减(直到=0)
- }
来看下串口调试助手接收到的数据,首先8266进行了域名解析成功解析到ip地址之后进行tcp连接

当tcp连接建立成功之后发送mqtt:connect控制报文来连接到mqtt服务端,报文类型是1,也就是connect控制报文
之后8266接收到mqtt服务端向他返回的连接确认报文
之后8266向mqtt服务端订阅了这个主题
并且也向这个主题发布了消息,放到了队列当中
发布成功
并且也收到了自己向这个主题发布的消息

这个8266过了一分钟向mqtt服务端发送了心跳报文,类型是12

8266接收到mqtt客户端软件向他发送的LED_ON,他把LED打开