使用EMQX搭建MQTT服务,可参考另一个博客https://editor.csdn.net/md/?articleId=121849144
一、建立连接
MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
client.isconnected = 0;
client.uri = flash_db_syscfg_ptr()->mqtt_uri;
static char cid[20] = { 0 };
/* generate the random client ID */
rt_snprintf(cid, sizeof(cid), "FRYER_%s", sys_info_ptr()->dev_str);
/* config connect param */
memcpy(&client.condata, &condata, sizeof(condata));
client.condata.clientID.cstring = cid;
client.condata.keepAliveInterval = 30;
client.condata.cleansession = 1;
client.condata.username.cstring = MQTT_USERNAME;
client.condata.password.cstring = MQTT_PASSWORD;
/* config MQTT will param. */
static char will_pubtopic[MQTT_TOPIC_LEN]={0};
rt_snprintf(will_pubtopic,MQTT_TOPIC_LEN,MQTT_PUBTOPIC,sys_info_ptr()->dev_str,"will");
client.condata.willFlag = 1;
client.condata.will.qos = 1;
client.condata.will.retained = 0;
client.condata.will.topicName.cstring = will_pubtopic;
client.condata.will.message.cstring = MQTT_WILLMSG;
/* malloc buffer. */
client.buf_size = client.readbuf_size = 1024;
client.buf = rt_calloc(1, client.buf_size);
client.readbuf = rt_calloc(1, client.readbuf_size);
if (!(client.buf && client.readbuf))
{
LOG_E("no memory for MQTT client buffer!");
}
/* set event callback function */
client.connect_callback = mqtt_connect_callback;
client.online_callback = mqtt_online_callback;
client.offline_callback = mqtt_offline_callback;
/* set subscribe table and event callback */
static char subtopic[MQTT_TOPIC_LEN]={0};
rt_snprintf(subtopic,MQTT_TOPIC_LEN,MQTT_SUBTOPIC,sys_info_ptr()->dev_str,"ctrl");
client.messageHandlers[0].topicFilter = subtopic;
client.messageHandlers[0].callback = mqtt_sub_callback;
client.messageHandlers[0].qos = QOS1;
/* set default subscribe event callback */
client.defaultMessageHandler = mqtt_sub_default_callback;
//断开连接后重连时间
client.reconnect_interval = 60000;
//连接超时时间
client.connect_timeout = 10000;
/* run mqtt client */
paho_mqtt_start(&client);
client.connect_callback = mqtt_connect_callback;
client.online_callback = mqtt_online_callback;
client.offline_callback = mqtt_offline_callback;
连接、在线、离线回调函数配置
client.messageHandlers[0].callback = mqtt_sub_callback;
订阅消息回调函数配置
二、消息处理函数
client_cmd_t client_cmd[]={
"run", cmd_run,
"pid", cmd_pid,
"para", cmd_para,
"menu", cmd_menu,
};
static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
{
*((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
// LOG_D("mqtt sub callback: %.*s %.*s",
// msg_data->topicName->lenstring.len,
// msg_data->topicName->lenstring.data,
// msg_data->message->payloadlen,
// (char *)msg_data->message->payload);
cJSON * root = NULL;
cJSON * item = NULL;//cjson对象
root = cJSON_Parse(msg_data->message->payload);
if (root)
{
item = cJSON_GetObjectItem(root, "cmd");
if( item ){
for( uint8_t i=0; ivaluestring,client_cmd[i].cmd) ){
LOG_D("receive the %s command",client_cmd[i].cmd);
client_cmd[i].cmd_func(root);
break;
}
}
}
cJSON_Delete(root);
}
else{
LOG_D("can not parse the mqtt message payload");
}
}
if( rt_strstr(item->valuestring,client_cmd[i].cmd) ){
LOG_D("receive the %s command",client_cmd[i].cmd);
client_cmd[i].cmd_func(root);
break;
}
client_cmd_t client_cmd[]={
"run", cmd_run,
"pid", cmd_pid,
"para", cmd_para,
"menu", cmd_menu,
};
利用消息数据结构处理不同分支消息,rt_strstr比较下发topic中是否带有run、pid、para、menu等的字符串进行函数跳转
三、消息发送
void client_send_msg(char *fryer)
{
if( sys_info_ptr()->net_connect == NET_SERVER ){
char *cjson_str = NULL;
cJSON * root = cJSON_CreateObject();
cJSON * slot = cJSON_CreateObject();
cJSON_AddItemToObject(root, "voltage", cJSON_CreateNumber(sys_info_ptr()->voltage));
cJSON_AddItemToObject(root, "current", cJSON_CreateNumber(sys_info_ptr()->current));
cJSON_AddItemToObject(root, "temperature", cJSON_CreateNumber(sys_info_ptr()->temp));
cJSON_AddItemToObject(root, "slot", cJSON_CreateString(fryer));
cJSON_AddItemToObject(root, "work", slot);
cJSON_AddItemToObject(slot, "curr_temp", cJSON_CreateNumber((((fryer_state_t*)fryer_get_handle(fryer))->temp)));
cJSON_AddItemToObject(slot, "target_temp", cJSON_CreateNumber((((fryer_state_t*)fryer_get_handle(fryer))->target_temp)));
cJSON_AddItemToObject(slot, "mode", cJSON_CreateNumber((int)(((fryer_state_t*)fryer_get_handle(fryer))->mlcd_status->cooking_mode)));
cJSON_AddItemToObject(slot, "menu", cJSON_CreateNumber((int)(((fryer_state_t*)fryer_get_handle(fryer))->mlcd_ctrl->menu_sel)));
cJSON_AddItemToObject(slot, "run", cJSON_CreateNumber((int)(((fryer_state_t*)fryer_get_handle(fryer))->mlcd_ctrl->cooking_run)));
cJSON_AddItemToObject(slot, "seg", cJSON_CreateNumber((int)(((fryer_state_t*)fryer_get_handle(fryer))->mlcd_status->cooking_seg)));
cJSON_AddItemToObject(slot, "P", cJSON_CreateNumber((((fryer_state_t*)fryer_get_handle(fryer))->pid.uKP_Coe)));
cJSON_AddItemToObject(slot, "I", cJSON_CreateNumber((((fryer_state_t*)fryer_get_handle(fryer))->pid.uKI_Coe)));
cJSON_AddItemToObject(slot, "D", cJSON_CreateNumber((((fryer_state_t*)fryer_get_handle(fryer))->pid.uKD_Coe)));
cjson_str = cJSON_Print(root);
// LOG_D("json:\n%s\n", cjson_str);
client_publish("work",cjson_str);
free(cjson_str);
cJSON_Delete(root);
}
使用JSON结构组织成上传数据,最后需要使用cJSON_Delete删除的动态生成JSON结构,不然发生内存泄露。
四、topic说明
#define MQTT_SUBTOPIC "sub/%s/%s"
#define MQTT_PUBTOPIC "pub/%s/%s"
sub为订阅topic,pub为发布topic,第一个%s为设备ID,可利用通配符过滤只接收指定ID的消息避免资源浪费,第二个%s为命令字符串,即控制设备或请求设备数据命令