• 一文搞懂Qt-MQTT开发


    消息队列

    消息队列

    “消息队列(MQ)”是在消息的传输过程中保存消息的容器。

    消息队列正如同一种先进先出的队列结构,它将发送方的消息推入队列中,并依序推送给接收方。消息队列相关的通信协议都属于应用层协议,位于OSI模型第七层,是基于TCP/IP的通信协议。

    与TCP、UDP或是HTTP协议不同,MQ相关协议没有服务端和客户端的概念。原本的客户端和服务端,现在都通过一个中间件服务器(broker)交互,消息的发送方称为生产者,消息的接收方成为消费者,生产者和消费者都可以视同broker的客户端。

    通过这种设计,所有消息都被存放于一个中间服务器中,通信的双方不再需要创建服务。这样做带来了几个好处:解耦,异步调用,削峰。
    解耦:通过中间件,各个系统之间可以独立运行,不会因为其中一个系统的崩溃影响其他系统,且整个系统的可拓展性也大大加强。
    异步:发送方的消息推入了中间件,这条消息可以被所有相关的接收方看到,因此它们可以同时开始处理,这种串联的结构的时间消耗比其他的串行结构小得多。
    削峰:在高并发环境下,短时间的大量请求会导致系统和数据库发生很多问题,所以需要对流量进行控制,通过消息队列设置每秒向消费者投递的消息数量,可以控制并发环境下的系统稳定性。

    但是,消息队列同样有它的不足。如降低系统可用性,增加系统的复杂性和一致性问题等。因此,是否使用消息队列也必须根据实际应用来决定。

    基于消息队列的通信协议有很多,常见的有RabbitMQ,Kafka,还有本文介绍的MQTT。

    MQTT

    MQTT
    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

    实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

    MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。

    MQTT中的几个重要概念:

    • 订阅(Subscription)
      订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

    • 会话(Session)
      每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

    • 主题名(Topic Name)
      连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

    • 主题筛选器(Topic Filter)
      一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

    • 负载(Payload)
      消息订阅者所具体接收的内容。

    配置Qt-MQTT环境

    默认的Qt环境是不能使用MQTT的,但Qt官方提供了基于MQTT的封装,需要通过源码进行编译。可以在下面的链接中获取到。
    https://github.com/qt/qtmqtt
    在dev分支中可以选择MQTT版本,选择最新的下载到本地。

    下载下来的是一个Qt项目,在Qt Creator中打开.pro文件,然后使用Release模式,用你所需要的编译器(VS,MinGW…),开始编译。

    如果你的系统没有安装过Perl,需要先安装Perl,并加入到系统环境变量中。
    https://www.perl.org/get.html

    完成编译后,可以在你的编译路径的/bin目录中得到所需的动态链接库文件Qt5Mqtt.dllQt5Mqttd.dll。前者是release版库,后者是debug版。

    为了实现一次配置,所有项目可用的目的,我们可以直接将MQTT配置到系统的Qt环境中去。C++的编译机制是通过头文件和静态链接库编译出动态链接库,再通过头文件和动态链接库运行程序。所以这里我们要将前面编译出的静态链接库和动态链接库都复制到Qt环境中去。

    首先,将qtmqtt源码目录下(qtmqtt/src/mqtt)的所有.h头文件拷贝,在Qt安装目录下的include文件夹中创建一个mqtt目录,将拷贝的文件粘贴进去。
    导入头文件

    然后,将源码编译生成目录下的静态链接库相关文件拷贝到Qt安装目录的/lib下,
    依次为Qt5Mqtt.lib(.a) Qt5Mqtt.prl Qt5mqttd.lib(.a) Qt5Mqttd.prl

    再将编译生成的两个动态链接库拷贝到Qt安装目录的/bin下,
    依次为Qt5Mqtt.dll Qt5Mqttd.dll

    最后再拷贝模块配置文件到Qt安装目录中。
    模块配置文件
    这样MQTT就已经配置到我们本地的Qt环境中了。后续所有使用此Qt环境的项目都可以直接使用MQTT了。

    使用MQTT时,首先要在.pro中添加模块:

    QT += mqtt
    
    • 1

    在使用前引入包:

    #include 
    
    • 1

    编写代码可以参考Qt官方的MQTT说明文档:
    https://doc.qt.io/qt-6/qtmqtt-index.html

    搭建EMQ X服务器

    为了调试程序,我们需要一台MQTT服务器。EMQ公司官方提供了测试的MQTT服务器,但由于连接数众多,不太稳定,我们需要自己搭建一台MQTT服务器。

    EMQ X提供了开源版的EMQ X服务器安装包,支持Windows,Ubuntu等多种使用环境。
    https://www.emqx.com/zh/try?product=broker

    安装后,Windows用户使用管理员权限命令行进入安装路径下,进入/emqx/bin/,依次执行命令

    #先运行该命令
    emqx install
    #成功后界面上会ChangeServiceConfig 成功
    #再运行
    emqx console
    #运行成功后会显示emqx is started!
    #然后会跳出一个界面,打开emqx运行所需要的各个端口
    #最后运行
    emqx start
    #没有报错就执行成功了
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Linux用户在安装路径下执行下述命令即可

    emqx start
    
    • 1

    这样本地就开启了MQTT服务,这里有两个重要的端口号要记住:1883(暴露给外部的MQTT服务端口),18083(服务器控制面板端口)。在本地浏览器输入http://127.0.0.1:18083/,打开服务器控制面板。输入初始用户名admin和用户密码public,即可进入控制面板,并进行MQTT服务器相关配置。

    调试软件MQTT X

    为了调试程序,我们通常需要一个调试软件来模拟消息的收发,这里推荐使用MQTT X软件进行调试。
    MQTT X下载连接:https://mqttx.app/zh
    MQTT X使用文档:https://mqttx.app/zh/docs

    安装完成后,点击+图标可以添加连接。
    编辑连接
    这里的Name和Client ID随意,Host填写我们本地配置的MQTT服务器地址127.0.0.1,端口号填1883。点击Connect即可连接到本地。

    连接后,点击New Subscription创建topic,然后就可以在该topic下收发消息。

    Qt-MQTT编程

    这里给出一个Qt-MQTT的程序样例,包含了基础的连接,收,发,断开等功能,读者可以在此基础上二次开发。
    .h

    #ifndef MY_MQTT_CLIENT_H
    #define MY_MQTT_CLIENT_H
    
    #include 
    #include 
    #include 
    
    namespace Ui {
    class MyMQTTClient;
    }
    
    using namespace std;
    
    class MyMQTTClient : public QObject
    {
        Q_OBJECT
    public:
        explicit MyMQTTClient(QObject *parent = nullptr);
        ~MyMQTTClient(){
        };
    
        QMqttClient *m_client = nullptr;
    
        void MyMQTTSubscribe(QString);
        void MyMQTTSendMessage(const QString, const QString);
    
    signals:
    
    public slots:
        void brokerConnected();
        void updateLogStateChange();
        void brokerDisconnected();
        void receiveMess(const QByteArray &, const QMqttTopicName &);
    
    private:
    
    };
    
    #endif // MY_MQTT_CLIENT_H
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    .cpp

    #include "my_mqtt_client.h"
    
    MyMQTTClient::MyMQTTClient(QObject *parent) : QObject(parent)
    {
        m_client = new QMqttClient(this);
        m_client->setHostname("127.0.0.1");
        m_client->setPort(1883);
        m_client->connectToHost();
    
        connect(m_client, &QMqttClient::connected, this, &MyMQTTClient::brokerConnected);
        connect(m_client, &QMqttClient::stateChanged, this, &MyMQTTClient::updateLogStateChange);
        connect(m_client, &QMqttClient::disconnected, this, &MyMQTTClient::brokerDisconnected);
    
        connect(m_client, &QMqttClient::pingResponseReceived, this, [this]() {
            const QString content = QDateTime::currentDateTime().toString()
                        + QLatin1String(" PingResponse")
                        + QLatin1Char('\n');
            qDebug() << content;
        });
    }
    
    void MyMQTTClient::MyMQTTSubscribe(QString str)
    {
        auto subscription = m_client->subscribe(str, 0);
        if (!subscription) {
            qDebug() << "Could not subscribe. Is there a valid connection?";
            return;
        }
    }
    
    void MyMQTTClient::updateLogStateChange()
    {
        const QString content = QDateTime::currentDateTime().toString()
                        + QLatin1String(": State Change")
                        + QString::number(m_client->state())
                        + QLatin1Char('\n');
        qDebug() << content;
    }
    
    void MyMQTTClient::brokerConnected()
    {
        qDebug() << "Connected!";
        if(m_client->state() == QMqttClient::Connected){
            m_client->subscribe(QString(MQTT_AUTO_TOPIC), 0);
            connect(m_client, SIGNAL(messageReceived(QByteArray,QMqttTopicName)), this, SLOT(receiveMess(QByteArray,QMqttTopicName)));
        }
    }
    
    void MyMQTTClient::brokerDisconnected()
    {
        qDebug() << "Disconnected!";
    }
    
    void MyMQTTClient::receiveMess(const QByteArray &message, const QMqttTopicName &topic)
    {
       QString content;
       content = QDateTime::currentDateTime().toString() + QLatin1Char('\n');
       content += QLatin1String(" Received Topic: ") + topic.name() + QLatin1Char('\n');
       content += QLatin1String(" Message: ") + message + QLatin1Char('\n');
       qDebug() << content;
    }
    
    void MyMQTTClient::MyMQTTSendMessage(const QString topic, const QString message)
    {
        if (m_client->publish(topic, message.toUtf8()) == -1){
            qDebug() << "Could not publish message";
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
  • 相关阅读:
    影刀sqlite的插入方法
    Linux的基本命令
    java 面试题
    【函数模板】
    C 语言进阶
    11.FreeRTOS_事件组
    NonlinearFactorGraph.h/NonlinearFactorGraph.cpp
    Nginx几种负载均衡方式介绍
    分页对象使用
    杭州动环监控系统供应商,动环监控设备
  • 原文地址:https://blog.csdn.net/nchu_zhangyiqing/article/details/126781994