• ubunbtu下基于c++实现MQTT客户端通信


    一、MQTT简介

      MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
      MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
    在这里插入图片描述

    1.1 MQTT 服务器(Broker)是发布-订阅架构的核心

      MQTT Broker可以非常简单地在Raspberry Pi或NAS等单板计算机上实现,当然也可以在大型机或 Internet 服务器上实现。服务器分发消息,因此必须是发布者,但绝不是订阅者!客户端可以发布消息(发送方)、订阅消息(接收方)或两者兼而有之。
      客户端(也称为节点)是一种智能设备,如微控制器或具有 TCP/IP 堆栈和实现 MQTT 协议的软件的计算机。消息在允许过滤的主题下发布。主题是分层划分的 UTF-8 字符串。不同的主题级别用斜杠/作为分隔符号。

    以下是一个简单的MQTT的应用场景,具体如下图所示:

    • 光伏发电站是发布者(Publisher)。
    • 主要主题(Topic)级别是"PV",这个工厂发布两个子级别"sunshine"和"data";
    • "PV/sunshine"是一个布尔值(true/fault,也可以是 1/0),充电站需要它来知道是否应该装载电动汽车(仅在阳光普照时)。
    • 充电站(EVSE)是订阅者,订阅"PV/sunshine"从服务器获取信息。
    • “PV/data” 另一方面,以 kW 为单位传输工厂产生的瞬时功率,并且该主题可以例如通过计算机或平板电脑订阅,以生成一天内传输功率的图表。

    在这里插入图片描述

    1.2 MQTT 网络协议

      MQTT协议的版本有MQTT v3.1.1/3.1,最新的是MQTT5.0(后续以5.0协议进行演示)。除标准版外,还有一个简化版MQTT-SN,该协议主要针对嵌入式设备,这些设备一般工作于TCP/IP网络,如:ZigBee。MQTT 与 HTTP 一样,MQTT 运行在传输控制协议/互联网协议 (TCP/IP) 堆栈之上。
    在这里插入图片描述

    1.3 服务质量

      为了满足不同的场景,MQTT支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:

    • 级别0尽力而为。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。
    • 级别1至少一次。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
    • 级别2恰好一次。保证这种语义肯待会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别2是最合适的。

      服务质量是个老话题了。级别2所提供的不重不丢很多情况下是最理想的,不过往返多次的确认一定对并发和延迟带来影响。级别1提供的至少一次语义在日志处理这种场景下是完全OK的,所以像Kafka这类的系统利用这一特点减少确认从而大大提高了并发。级别0适合鸡肋数据场景,食之无味弃之可惜,就这么着吧。

    1.4 MQTT 数据包结构

    • 固定头(Fixed header),存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识;
    • 可变头(Variable header),存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容;
    • 消息体(Payload),存在于部分MQTT数据包中,表示客户端收到的具体内容;

    整体MQTT的消息格式如下图所示;
    在这里插入图片描述

    1.4.1 MQTT固定头

    固定头存在于所有MQTT数据包中,其结构如下:
    在这里插入图片描述
    MQTT消息类型 / message type
    位置:byte 1, bits 7-4。
    4位的无符号值,类型如下:
    名称 值 流方向 描述

    标识位 / DUP
    位置:byte 1, bits 3-0。
    在不使用标识位的消息类型中,标识位被作为保留位。如果收到无效的标志时,接收端必须关闭网络连接:
    数据包 标识位 Bit 3 Bit 2 Bit 1 Bit 0
    DUP:发布消息的副本。用来在保证消息的可靠传输,如果设置为 1,则在下面的变长中增加MessageId,并且需要回复确认,以保证消息传输完成,但不能用于检测消息重复发送。
    QoS发布消息的服务质量(前面已经做过介绍),即:保证消息传递的次数
    00:最多一次,即:<=1
    01:至少一次,即:>=1
    10:一次,即:=1
    11:预留
    RETAIN:发布保留标识,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它,如果设有那么推送至当前订阅者后释放。

    剩余长度(Remaining Length)
    位置:byte 1。
    固定头的第二字节用来保存变长头部和消息体的总大小的,但不是直接保存的。这一字节是可以扩展,其保存机制,前7位用于保存长度,后一部用做标识。当最后一位为 1时,表示长度不足,需要使用二个字节继续保存。例如:计算出后面的大小为0

    1.4.2 MQTT可变头 / Variable header

      MQTT数据包中包含一个可变头,它驻位于固定的头和负载之间。可变头的内容因数据包类型而不同,较常的应用是做为包的标识:
    Bit 7 — 0
    很多类型数据包中都包括一个2字节的数据包标识字段,这些类型的包有:
    PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、
    SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK

    1.4.3 Payload消息体

      Payload消息体是MQTT数据包的第三部分,CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息 有消息体:

    • CONNECT,消息体内容主要是:客户端的ClientID、订阅的Topic、Message以及用户名和密码
    • SUBSCRIBE,消息体内容是一系列的要订阅的主题以及QoS。
    • SUBACK,消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。
    • UNSUBSCRIBE,消息体内容是要订阅的主题。

    二、MQTT在线工具测试

    目前MQTT代理的主流平台有下面几个:
    Mosquitto:https://mosquitto.org/
    VerneMQ:https://vernemq.com/
    EMQTT:http://emqtt.io/在这里插入图片描述

    2.1 mosquitto安装及测试

    2.1.1 通过命令端

    1、安装服务器端

    sudo apt-get install mosquitto

    完成安装后,服务器就搭建好了,系统会自动运行mosquitto,默认端口为1883。

    2、安装客户端
      前面服务器端搭建好了,但是客户端还没有安装。这一步是可选的,如果需要在终端上测试MQTT订阅/发布的通信就需要执行这一步,这里我们也安装上去才有后续的这些测试。

    sudo apt install mosquitto-clients

    3、启动mosquitto服务

    mosquitto -v

    -v 详细模式——启用所有日志记录类型。
    在这里插入图片描述

    4、通过service启动/关闭mosquitto服务

    sudo service mosquitto start
    sudo service mosquitto stop

    5、查看运行状态

    sudo systemctl status mosquitto

    在这里插入图片描述
    在这里插入图片描述
    6、查看帮助信息

    mosquitto --help

    在这里插入图片描述
    7、关闭mosquitto 服务

    • 查看运行进程号:ps -aux | grep mosquitto
      执行命令杀死进程:kill -9 进程号
    • sudo service mosquitto stop

    8、测试(默认配置)
    首先打开三个终端,
    1、启动代理服务:mosquitto -v
       -v 详细模式 打印调试信息
    2、订阅主题:mosquitto_sub -v -t hello
      -t 指定订阅的主题,主题为:hello
      -v 详细模式 打印调试信息
    3、发布内容:mosquitto_pub -t hello -m world
      -t 指定订阅的主题,主题为:hello
      -m 指定发布的消息的内容
    具体运行效果如下图:
    在这里插入图片描述

    2.1.2 通过源码安装

      采用命令行安装mosquitto简单方便,但是安装的版本往往是旧的版本,不能满足最新的协议要求(比如不支持MQTT5.0协议),因此就需要去官网(https://mosquitto.org/)下载源码,安装需要的新版本(2.0.15支持MQTT5.0),下面简单介绍安装流程。

    1、安装mosquitto所需要依赖

    sudo apt-get install libssl-dev
    sudo apt-get install uuid-dev
    sudo apt-get install cmake

    2、源码下载

    wget http://mosquitto.org/files/source/mosquitto-2.0.15.tar.gz

    3、解压源码

    tar -zxvf mosquitto-2.0.15.tar.gz
    进入源码目录:
    cd mosquitto-2.0.15/

    4、编译与安装源码

    make
    sudo make install

      上面使用的都是默认配置,如需修改服务器的配置信息需要修改:
    mosquitto源码目录下的配置文件mosquitto.conf 或者**/etc/mosquitto/mosquitto.conf**文件
    在启动服务器时使用命令:mosquitto -c mosquitto.conf -d(在mosquitto安装目录下)
      通过源码下载编译安装,会生成一些供我们使用的测试程序和动态库,用来我们自己进行开发使用。

    2.2 mqttx在线测试工具

    https://mqttx.app/zh

      mqttx提供了命令行+app两种方式实现MQTT测试,下载app后具体教程可参考上述网站。
    在这里插入图片描述
    在这里插入图片描述

    三、基于Eclipse Paho MQTT C++开发mqtt客户端程序

    3.1 Eclipse Paho MQTT C++库下载、安装

    源代码及库安装使用教程如下:

    https://github.com/eclipse/paho.mqtt.cpp

      此存储库包含内存管理操作系统(如 Linux/Posix 和 Windows)上的 Eclipse Paho MQTT C++ 客户端库的源代码。
      此代码构建了一个库,使 C++11 应用程序能够连接到 MQTT 代理、向代理发布消息、订阅主题和接收发布的消息。

    3.2 客户端代码设计

    mqtt_client.hpp

    #ifndef __MQTT_CLIENT_TO_CLOUD_HPP__
    #define __MQTT_CLIENT_TO_CLOUD_HPP__
    
    #include    // mqtt库头文件
    #include           // mqtt库头文件
    
    namespace cloud {
        //! Handler on cloud message
        using message_handler = std::function<void(const std::string&)>;
    
        class mqtt_client
        {
        public:
            mqtt_client();
            ~mqtt_client();
    
            void send(const std::string& message);
            void set_message_handler(message_handler cb);
    
        private:
            // static constexpr const char* BROKER_HOST = "localhost:1883";        //本地测试:mosquitto
            static constexpr const char* BROKER_HOST = "broker.emqx.io:1883";       //公共mqtt broker:MQTTX
            // static constexpr const char* BROKER_HOST = "124.XXX.XXX.XXX:1883";   //云端测试:mosquitto
    
        private:
            mqtt::async_client cli_;
            mqtt::topic        topic_;
        };
    }  // namespace cloud
    
    #endif
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    mqtt_client.cpp

    #include 
    #include 
    
    #include "mqtt_client.hpp"
    
    using namespace std;
    
    namespace cloud {
        // mqtt_client类构造函数实现
        mqtt_client::mqtt_client()
            // 1.The server URI string  2.The client ID string that we provided to the server   3.The MQTT protocol version we're connected at
            : cli_(BROKER_HOST, "client", mqtt::create_options(MQTTVERSION_5))
            // 1.The client to which this topic is connected	2.The topic name(pub & sub)   3.The default QoS
            , topic_(cli_,"haojuhu", 1)  
        {
            //! Handler on connection lost, do reconnect here
            cli_.set_connection_lost_handler([this](const string& info) {
                std::cout<<"mqtt connection lost <" << info << ">, reconnting"<<std::endl;
                cli_.reconnect();
            });
    
            //! Handler on connected, it'll subscribe the topic and publish online info
            cli_.set_connected_handler([this](const string& info) {
                std::cout << "mqtt connected <" << info << ">"<<std::endl;
                topic_.subscribe(mqtt::subscribe_options(true));   // client订阅topic[haojuhu]
                topic_.publish("online");                          // client发布消息"online"至topic[haojuhu]
            });
        }
        //2.mqtt_client类析构函数实现
        mqtt_client::~mqtt_client()
        {
            cli_.disconnect();
            cli_.disable_callbacks();
        }
    
        /**
         * @brief       Publish message to topic,发布消息给topic
         * @param[in]   message The message payload
         */
        void mqtt_client::send(const string& message) 
        { 
            topic_.publish(message); 
        }
    
        /**
         * @brief       Set mqtt message handler,设置mqtt消息处理句柄(也就是函数对象cb)
         * @note        The mqtt connection will established here
         * @param[in]   cb  The message handler
         */
        void mqtt_client::set_message_handler(message_handler cb)
        {
            //! Set message callback here
            cli_.set_message_callback([cb](mqtt::const_message_ptr message) 
            {
                cb(message->get_payload_str());   //执行函数cb
            });
    
            //! Set connect options and do connect
            auto opts = mqtt::connect_options_builder()
                            .mqtt_version(MQTTVERSION_5)
                            .clean_start(true)
                            .finalize();
            cli_.connect(opts);
        }
    }  // namespace cloud
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    main.cpp

    #include 
    #include 
    #include 
    
    #include "mqtt_client.hpp"
    
    using namespace std;
    
    /**
     * @brief               Handler on message from cloud
     * @param[in]   data    The message payload from cloud
     */
    void on_cloud_message(const string& data)
    {
        std::cout<<"received data is: "<<data<<std::endl;
    }
    
    int main(int argc,char **argv)
    {
        cloud::mqtt_client g_client;                         //定义一个mqtt客户端
        std::cout << "[CLOUD] listen starting"<<std::endl;
        g_client.set_message_handler(on_cloud_message);      //开启mqtt clinet监听消息,消息处理函数为on_cloud_message
    
        while(1)
        {
            std::cout << "运行中..."<<std::endl;
            this_thread::sleep_for(10s);
            g_client.send("online...");  // 确保与mqtt broker server建立连接之后再publish!!!
        }
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    3.3 编译 & 执行

    g++ main.cpp mqtt_client.cpp -lpaho-mqttpp3 -lpaho-mqtt3a
    ./a.out

    3.4 执行结果

    在这里插入图片描述

    四、总结

      上面主要介绍了以下三个部分的内容:

    1. MQTT协议基础知识;
    2. MQTT在线测试工具;
    3. 基于库实现mqtt客户端程序。
  • 相关阅读:
    读书笔记《Spring Boot+Vue全栈开发实战》(下)
    基于核心素养初中数学概念课设计策略研究课题开题报告
    *算法训练(leetcode)第二十五天 | 134. 加油站、135. 分发糖果、860. 柠檬水找零、406. 根据身高重建队列
    【Solidity】智能合约案例——②供应链金融合约
    纯前端模拟后端接口异步获取数据
    Windows Server 2012 R2 安装 .NET Framework 4.6.1
    java 企业工程管理系统软件源码+Spring Cloud + Spring Boot +二次开发+ MybatisPlus + Redis
    安利几个小妙招教你如何快速翻译PDF文件
    B_QuRT_User_Guide(31)
    static关键字详解~
  • 原文地址:https://blog.csdn.net/weixin_42700740/article/details/126385010