• EMQX Cloud全托管的 MQTT 消息云服务


    从前,我向往远方,喜欢到处旅游🚶🚶🚶;后来,我追忆往事,最爱故地重游。每到一处,勾起故人往事,总能被人生的沧桑感所震撼。


    完整代码已上传Gitee

    前言

    MQTT协议是一种消息列队传输协议,采用订阅、发布机制,订阅者只接收自己已经订阅的数据,非订阅数据则不接收,既保证了必要的数据的交换,又避免了无效数据造成的储存与处理。因此在工业物联网中得到广泛的应用。

    EMQX Cloud 是 EMQ 公司推出的一款面向物联网领域的 MQTT 消息中间件产品。作为全球首个全托管的 MQTT 5.0 公有云服务,EMQX Cloud 提供了一站式运维代管、独有隔离环境的 MQTT 消息服务。在万物互联的时代,EMQX Cloud 可以帮助您快速构建面向物联网领域的行业应用,轻松实现物联网数据的采集、传输、计算和持久化。

    在这里插入图片描述

    1. 物联网消息收发模型

    1.1 双向通信

    EMQX Cloud 支持海量设备及应用端连接,为应用程序及物联网设备提供安全可靠的双向通信能力:

    在这里插入图片描述

    在该模型中,EMQX Cloud 提供的 MQTT 服务将海量设备与应用连接起来,支持应用与设备间的双向通信,也支持设备与设备间的双向通信。该模型适用于有类即时通讯需求的物联网应用,比较典型的如:智能家居场景中,手机 APP 获取智能设备的状态信息,并且用户可以通过 APP 向智能设备发送控制指令。又如在工业场景中,AGV 机器人之间通过 MQTT 协议来进行即时通信,实现多机协作。EMQX Cloud 提供的 MQTT 服务不仅支持标准 MQTT 协议,也支持 MQTT over WebSocket,以及 CoAP、 MQTT-SN、LwM2M、JT/T808等协议,只需一个消息中间件即可满足多类终端同时接入的需求。

    1.2 数据采集

    EMQX Cloud 支持设备数据上云,通过海量 Topic 及数据集成的支持,低代码即可实现数据的采集、过滤、转换、计算及持久化。

    在这里插入图片描述

    在该模型中,EMQX Cloud 提供的 MQTT 服务可以实现数据的采集、计算和持久化。该模型适用于有数据采集和持久化需求的物联网应用,比较典型的如:在工业场景中,各个物联网传感器将实时采集的数据汇集到边缘网关,通过边缘网关将数据上传到 MQTT 服务器上,再由数据集成触发数据的过滤、转换和简单计算,并将最终结果转发至其他服务或持久化至目标数据库中。EMQX Cloud 提供了多种接入方案,涵盖了不同的网络条件、各种类型终端设备和边缘网关设备,支持70多种工业协议接入。

    1.3 混合模型

    EMQX Cloud 提供的 MQTT 服务支持双向通信和数据采集模型的混合应用。通过共享订阅、数据集成等能力,实现数据在物与物、物与应用间流转的同时进行持久化。

    在这里插入图片描述

    在该模型中,EMQX Cloud 提供的 MQTT 服务不仅为设备与设备、设备与应用间架起桥梁,同时可将需要的数据进行持久化,以便非实时应用在后续对获取的数据加以利用。比较典型的如一些人工智能应用,终端获取的数据需要发送至云端,通过云端运行的计算模型经过计算后即时反馈给终端,如物品或人脸识别应用。同时数据的副本需要持久化到数据库中,以便于后续离线训练和改进人工智能计算模型。

    2. 服务部署

    2.1 新建部署

    首先在平台注册账号,新用户可以试用14天

    在这里插入图片描述

    新建部署:

    在这里插入图片描述

    在这里插入图片描述

    新建项目,将部署服务以组的形式进行管理

    在这里插入图片描述

    新建项目完成后,可以将上边部署服务移动到所属项目下,方便管理。

    2.2 添加认证

    在这里插入图片描述

    添加账号密码,连接认证的时候需要用到。

    管理控制台:

    在这里插入图片描述

    3. 编码实践

    案例采用Java整合

    3.1 编写代码

    pom

    <dependencies>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-webartifactId>
        dependency>
         
        <dependency>
            <groupId>org.eclipse.pahogroupId>
            <artifactId>org.eclipse.paho.client.mqttv3artifactId>
            <version>1.2.5version>
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    启动类:

    /**
     * @description: EMQX CLOUD-DEMO
     * @author: yh
     * @date: 2022/9/10
     */
    @SpringBootApplication
    public class ExampleApplication {
        public static void main(String[] args) {
            SpringApplication.run(ExampleApplication.class,args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    发布订阅:

    /**
     * CMQX CLOUD 发布订阅
     *
     * @author: yh
     * @date: 2022/9/10
     */
    @RequestMapping(value = "/MqttClient")
    @RestController
    public class MqttSample {
        MqttClient client = null;
        // 发布、订阅主题
        String topic = "test/topic";
        // 消息内容
        String content = "Hello World EMQ";
        // qos消息的服务质量可选值:0 1 2
        int qos = 2;
        // EMQ 部署控制台的连接地址
        String broker = "tcp://q6dec0f4.cn-shenzhen.emqx.cloud:11578";
        String clientId = MqttClient.generateClientId();
    
        public MqttSample() {
            //  持久化
            MemoryPersistence persistence = new MemoryPersistence();
            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            // 设置认证信息,配置的账号 密码
            connOpts.setUserName("exqcloud");
            connOpts.setPassword("hello".toCharArray());
            try {
                client = new MqttClient(broker, clientId, persistence);
                // 设置回调
                client.setCallback(new SampleCallback());
                // 建立连接
                System.out.println("Connecting to broker: " + broker);
                client.connect(connOpts);
                System.out.println("Connected to broker: " + broker);
                // 订阅 topic
                client.subscribe(topic, qos);
                System.out.println("Subscribed to topic: " + topic);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 消息发布
         *
         * @author: yh
         * @date: 2022/9/10
         */
        @RequestMapping(value = "/send")
        public void send() {
            try {
                // 发布消息
                MqttMessage message = new MqttMessage(content.getBytes());
                message.setQos(qos);
                //向服务器上的topic发布消息
                client.publish(topic, message);
                System.out.println("Message published");
                // 断开连接
                //client.disconnect();
                //System.out.println("Disconnected");
                // 关闭客户端
                //client.close();
                //System.exit(0);
            } catch (MqttException me) {
                System.out.println("reason " + me.getReasonCode());
                System.out.println("msg " + me.getMessage());
                System.out.println("loc " + me.getLocalizedMessage());
                System.out.println("cause " + me.getCause());
                System.out.println("excep " + me);
                me.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    回调类:

    /**
     * 回调类
     * @author:  yh
     * @date:  2022/9/10
     */
    public class SampleCallback implements MqttCallback {
        /**
         * 连接丢失
         * @author:  yh
         * @date:  2022/9/10
         */
        @Override
        public void connectionLost(Throwable cause) {
            System.out.println("连接断开:" + cause.getMessage());
        }
    
        /**
         * 收到消息
         * @author:  yh
         * @date:  2022/9/10
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) {
            System.out.println("接收到消息-- topic:" + topic + ",Qos:" + message.getQos() + ", 内容:" + new String(message.getPayload()));
        }
    
        /**
         * 消息传递成功
         * @author:  yh
         * @date:  2022/9/10
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            System.out.println("消息发送成功!");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    3.2 测试

    启动程序

    在这里插入图片描述

    在这里插入图片描述

    控制台连接数有点延迟,没出来的话多等几秒看看。

    接下来就开始:编程第一步 Hello World,访问 http://127.0.0.1:8080/MqttClient/send

    在这里插入图片描述

    通过控制台可以看到,消息被成功发布、消费,多发几次消息看看

    在这里插入图片描述

    稍等一段时间后,就可以在 部署→指标 中看到相关的数据统计

    在这里插入图片描述

    我这里连续着发送了10条消息

    在这里插入图片描述

    4. 在线调试

    有时候消息的发布不是由我们自己来发布,我们只负责消费。这种场景下在开发阶段模拟一个发布者是非常必要的,通过控制台的在线调试功能就可以直接发布消息方便调试。

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    发送内容:

    {"key":"hello","value":"你好"}
    
    • 1

    在这里插入图片描述

    多试几次:

    在这里插入图片描述

    在这里插入图片描述


    推荐一个项目 Spring整合常用组件

    到此,本章内容就介绍完啦,如果有帮助到你 欢迎点个赞👍👍👍吧!!您的鼓励是博主的最大动力! 有问题评论区交流。

  • 相关阅读:
    Retrofit解密:接口请求是如何适配suspend协程?
    B端产品实战课读书笔记:第六章产品设计
    SpringSecurity - 自定义用户认证
    [Error]在Swift项目Build Settings的Preprocessor Macros中定义的宏无效的问题
    优优嗨聚集团:旅游经济繁荣,助力当地外卖市场崛起
    [LeetCode周赛复盘] 第 306 场周赛20220814
    Tomcat 源码分析 (Tomcat 类加载器之为何违背双亲委派模型) (九)
    项目踩坑—跨域问题
    阿里 P7 前端高级工程师,都需要掌握哪些技术栈?
    Java多线程笔记
  • 原文地址:https://blog.csdn.net/weixin_43847283/article/details/126797068