• 利用EMQX建立MQTT服务器通讯


    使用EMQX搭建MQTT服务,可参考另一个博客https://editor.csdn.net/md/?articleId=121849144
    一、建立连接

    	MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
        client.isconnected = 0;
        client.uri = flash_db_syscfg_ptr()->mqtt_uri;
        
        static char cid[20] = { 0 };
    
        /* generate the random client ID */
        rt_snprintf(cid, sizeof(cid), "FRYER_%s", sys_info_ptr()->dev_str);
        /* config connect param */
        memcpy(&client.condata, &condata, sizeof(condata));
        client.condata.clientID.cstring = cid;
        client.condata.keepAliveInterval = 30;
        client.condata.cleansession = 1;
        client.condata.username.cstring = MQTT_USERNAME;
        client.condata.password.cstring = MQTT_PASSWORD;
    
        /* config MQTT will param. */
        static char will_pubtopic[MQTT_TOPIC_LEN]={0};
        rt_snprintf(will_pubtopic,MQTT_TOPIC_LEN,MQTT_PUBTOPIC,sys_info_ptr()->dev_str,"will");
        client.condata.willFlag = 1;
        client.condata.will.qos = 1;
        client.condata.will.retained = 0;
        client.condata.will.topicName.cstring = will_pubtopic;
        client.condata.will.message.cstring = MQTT_WILLMSG;
    
        /* malloc buffer. */
        client.buf_size = client.readbuf_size = 1024;
        client.buf = rt_calloc(1, client.buf_size);
        client.readbuf = rt_calloc(1, client.readbuf_size);
        if (!(client.buf && client.readbuf))
        {
            LOG_E("no memory for MQTT client buffer!");
        }
    
        /* set event callback function */
        client.connect_callback = mqtt_connect_callback;
        client.online_callback = mqtt_online_callback;
        client.offline_callback = mqtt_offline_callback;
    
        /* set subscribe table and event callback */
        static char subtopic[MQTT_TOPIC_LEN]={0};
        rt_snprintf(subtopic,MQTT_TOPIC_LEN,MQTT_SUBTOPIC,sys_info_ptr()->dev_str,"ctrl");
        client.messageHandlers[0].topicFilter = subtopic;
        client.messageHandlers[0].callback = mqtt_sub_callback;
        client.messageHandlers[0].qos = QOS1;
    
        /* set default subscribe event callback */
        client.defaultMessageHandler = mqtt_sub_default_callback;
        
        //断开连接后重连时间
        client.reconnect_interval = 60000;
        //连接超时时间
        client.connect_timeout = 10000;
    
        /* run mqtt client */
        paho_mqtt_start(&client);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
        client.connect_callback = mqtt_connect_callback;
        client.online_callback = mqtt_online_callback;
        client.offline_callback = mqtt_offline_callback;
    
    • 1
    • 2
    • 3

    连接、在线、离线回调函数配置

    client.messageHandlers[0].callback = mqtt_sub_callback;
    
    • 1

    订阅消息回调函数配置

    二、消息处理函数

    client_cmd_t    client_cmd[]={
        "run",  cmd_run,
        "pid",  cmd_pid,
        "para", cmd_para,
        "menu", cmd_menu,
    };
    
    static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
    {
        *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
    //    LOG_D("mqtt sub callback: %.*s %.*s",
    //               msg_data->topicName->lenstring.len,
    //               msg_data->topicName->lenstring.data,
    //               msg_data->message->payloadlen,
    //               (char *)msg_data->message->payload);
        
        cJSON * root = NULL;
        cJSON * item = NULL;//cjson对象
        
        root = cJSON_Parse(msg_data->message->payload);   
        if (root) 
        {
            item = cJSON_GetObjectItem(root, "cmd");
            if( item ){
                for( uint8_t i=0; ivaluestring,client_cmd[i].cmd) ){
                        LOG_D("receive the %s command",client_cmd[i].cmd);
                        client_cmd[i].cmd_func(root);
                        break;
                    }
                }
            }
            cJSON_Delete(root);
        }
        else{
            LOG_D("can not parse the mqtt message payload");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    if( rt_strstr(item->valuestring,client_cmd[i].cmd) ){
    	LOG_D("receive the %s command",client_cmd[i].cmd);
    	client_cmd[i].cmd_func(root);
    	break;
    }
    
    client_cmd_t    client_cmd[]={
        "run",  cmd_run,
        "pid",  cmd_pid,
        "para", cmd_para,
        "menu", cmd_menu,
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    利用消息数据结构处理不同分支消息,rt_strstr比较下发topic中是否带有run、pid、para、menu等的字符串进行函数跳转

    三、消息发送

    void client_send_msg(char *fryer)
    {
    
        if( sys_info_ptr()->net_connect == NET_SERVER ){
            char *cjson_str = NULL;
            cJSON * root =  cJSON_CreateObject();
            cJSON * slot =  cJSON_CreateObject();
            
            cJSON_AddItemToObject(root, "voltage",      cJSON_CreateNumber(sys_info_ptr()->voltage));
            cJSON_AddItemToObject(root, "current",      cJSON_CreateNumber(sys_info_ptr()->current));
            cJSON_AddItemToObject(root, "temperature",  cJSON_CreateNumber(sys_info_ptr()->temp));
            cJSON_AddItemToObject(root, "slot",         cJSON_CreateString(fryer));
            cJSON_AddItemToObject(root, "work",         slot);
            
            cJSON_AddItemToObject(slot, "curr_temp",    cJSON_CreateNumber((((fryer_state_t*)fryer_get_handle(fryer))->temp)));
            cJSON_AddItemToObject(slot, "target_temp",  cJSON_CreateNumber((((fryer_state_t*)fryer_get_handle(fryer))->target_temp)));
            cJSON_AddItemToObject(slot, "mode",         cJSON_CreateNumber((int)(((fryer_state_t*)fryer_get_handle(fryer))->mlcd_status->cooking_mode)));
            cJSON_AddItemToObject(slot, "menu",         cJSON_CreateNumber((int)(((fryer_state_t*)fryer_get_handle(fryer))->mlcd_ctrl->menu_sel)));
            cJSON_AddItemToObject(slot, "run",          cJSON_CreateNumber((int)(((fryer_state_t*)fryer_get_handle(fryer))->mlcd_ctrl->cooking_run)));
            cJSON_AddItemToObject(slot, "seg",          cJSON_CreateNumber((int)(((fryer_state_t*)fryer_get_handle(fryer))->mlcd_status->cooking_seg)));
            cJSON_AddItemToObject(slot, "P",            cJSON_CreateNumber((((fryer_state_t*)fryer_get_handle(fryer))->pid.uKP_Coe)));
            cJSON_AddItemToObject(slot, "I",            cJSON_CreateNumber((((fryer_state_t*)fryer_get_handle(fryer))->pid.uKI_Coe)));
            cJSON_AddItemToObject(slot, "D",            cJSON_CreateNumber((((fryer_state_t*)fryer_get_handle(fryer))->pid.uKD_Coe)));
    
            cjson_str = cJSON_Print(root);
        //    LOG_D("json:\n%s\n", cjson_str);
            
            client_publish("work",cjson_str);
            
            free(cjson_str);
            cJSON_Delete(root);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    使用JSON结构组织成上传数据,最后需要使用cJSON_Delete删除的动态生成JSON结构,不然发生内存泄露。

    四、topic说明

    #define MQTT_SUBTOPIC           "sub/%s/%s"
    #define MQTT_PUBTOPIC           "pub/%s/%s"
    
    • 1
    • 2

    sub为订阅topic,pub为发布topic,第一个%s为设备ID,可利用通配符过滤只接收指定ID的消息避免资源浪费,第二个%s为命令字符串,即控制设备或请求设备数据命令

  • 相关阅读:
    预约陪诊就诊小程序源码多城市开发版
    【AI视野·今日NLP 自然语言处理论文速览 第四十一期】Tue, 26 Sep 2023
    使用API接口获取商品数据:从入门到实践
    力扣SQL50 可回收且低脂的产品 简单条件查询
    Logback日志配置
    ShardingSphereJDBC5.4.0支持Nacos配置(SpringCloud版)
    vue项目使用lodash节流防抖函数问题与解决
    GP与LP的区别,有限责任、无限责任、无限连带责任
    C++构造函数中不能使用多态
    最全的各版本PostGis下载
  • 原文地址:https://blog.csdn.net/dmjkun/article/details/127676517