• Java实践-物联网loT入门-MQTT传输协议


    前言

    MQTT是一个极其轻量级发布/订阅消息传输协议,适用于网络带宽较低的场合.

    通过一个代理服务器(broker),任何一个客户端(client)都可以订阅或者发布某个主题的消息,然后订阅了该主题的客户端则会收到该消息

    业务场景

    硬件采集的数据传入EMQX平台(采用MQTT协议),java通过代码连接MQTT服务器,进行采集数据接收、解析、业务处理、存储入库、数据展示。

    MQTT 是基于 发布(Publish)/订阅(Subscribe) 模式来进行通信及数据交换的。

    什么是MQTT

    MQTT是基于二进制消息的发布/订阅编程模式的消息协议,最早由IBM提出的,如今已经成为OASIS规范。由于规范很简单,非常适合需要低功耗和网络带宽有限的IoT场景,比如:

    • 遥感数据
    • 汽车
    • 智能家居
    • 智慧城市
    • 医疗医护

    由于物联网的环境是非常特别的,所以MQTT遵循以下设计原则:

    1. 精简,不添加可有可无的功能。
    2. 发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递。
    3. 允许用户动态创建主题,零运维成本。
    4. 把传输量降到最低以提高传输效率。
    5. 把低带宽、高延迟、不稳定的网络等因素考虑在内。
    6. 支持连续的会话控制。
    7. 理解客户端计算能力可能很低。
    8. 提供服务质量管理。
    9. 假设数据不可知,不强求传输数据的类型与格式,保持灵活性。

    MQTT与HTTP的区别

    首先看一下HTTP请求:

    • HTTP 是一种同步协议。客户端需要等待服务器响应。Web 浏览器具有这样的要求,但它的代价是牺牲了可伸缩性。
    • HTTP 是单向的。客户端必须发起连接。
    • HTTP 是一种 1-1 协议。客户端发出请求,服务器进行响应。将消息传送到网络上的所有设备上,不但很困难,而且成本很高。
    • HTTP 是一种有许多标头和规则的重量级协议。它不适合受限的网络。

    再来看一下MQTT: 

            在 IoT 领域,大量设备以及很可能不可靠或高延迟的网络使得同步通信成为问题。异步消息协议更适合 IoT 应用程序。传感器发送读数,让网络确定将其传送到目标设备和服务的最佳路线和时间。在 IoT 应用程序中,设备或传感器通常是客户端,这意味着它们无法被动地接收来自网络的命令。

    MQTT的核心: 发布和订阅模型 

    MQTT 协议在网络中定义了两种实体类型:一个消息代理和一些客户端。

    代理是一个服务器,它从客户端接收所有消息,然后将这些消息路由到相关的目标客户端。

    客户端是能够与代理交互来发送和接收消息的任何事物。客户端可以是现场的 IoT 传感器,或者是数据中心内处理 IoT 数据的应用程序。

    1. 客户端连接到代理。它可以订阅代理中的任何消息“主题”。此连接可以是简单的 TCP/IP 连接,也可以是用于发送敏感消息的加密 TLS 连接。
    2. 客户端通过将消息和主题发送给代理,发布某个主题范围内的消息。
    3. 代理然后将消息转发给所有订阅该主题的客户端。

            同时,MQTT 是轻量级的。它有一个用来指定消息类型的简单标头,有一个基于文本的主题,还有一个任意的二进制有效负载。应用程序可对有效负载采用任何数据格式,比如 JSON、XML、加密二进制或 Base64,只要目标客户端能够解析该有效负载。

    Java实例

    1.通过包管理工具 Maven导入依赖

    1. <dependency>
    2. <groupId>org.eclipse.paho</groupId>
    3. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    4. <version>1.2.2</version>
    5. </dependency>

     2.编写订阅方的代码,并启动。

    1. import org.eclipse.paho.client.mqttv3.*;
    2. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    3. /**
    4. * 订阅方
    5. */
    6. public class SubscribeSample {
    7. public static void main(String[] args) {
    8. //EMQ X 默认端口 1883
    9. String broker = "tcp://broker.emqx.io:1883";
    10. String TOPIC = "test";
    11. int qos = 1;
    12. String clientid = "subClient";
    13. String userName = "admin";
    14. String passWord = "password";
    15. try {
    16. // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
    17. MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
    18. // MQTT的连接设置
    19. MqttConnectOptions options = new MqttConnectOptions();
    20. // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
    21. options.setCleanSession(true);
    22. // 设置连接的用户名
    23. options.setUserName(userName);
    24. // 设置连接的密码
    25. options.setPassword(passWord.toCharArray());
    26. // 设置超时时间 单位为秒
    27. options.setConnectionTimeout(10);
    28. // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
    29. options.setKeepAliveInterval(20);
    30. // 设置回调函数
    31. client.setCallback(new MqttCallback() {
    32. public void connectionLost(Throwable cause) {
    33. System.out.println("connectionLost");
    34. }
    35. public void messageArrived(String topic, MqttMessage message) {
    36. System.out.println("======监听到来自[" + topic + "]的消息======");
    37. System.out.println("message content:"+new String(message.getPayload()));
    38. System.out.println("============");
    39. }
    40. public void deliveryComplete(IMqttDeliveryToken token) {
    41. System.out.println("deliveryComplete---------"+ token.isComplete());
    42. }
    43. });
    44. // 建立连接
    45. System.out.println("连接到 broker: " + broker);
    46. client.connect(options);
    47. System.out.println("连接成功.");
    48. //订阅消息
    49. client.subscribe(TOPIC, qos);
    50. System.out.println("开始监听" + TOPIC);
    51. } catch (Exception e) {
    52. e.printStackTrace();
    53. }
    54. }
    55. }

    启动订阅方运行结果如下:

     

    3.编写发布方的代码并启动

    1. import org.eclipse.paho.client.mqttv3.MqttClient;
    2. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    3. import org.eclipse.paho.client.mqttv3.MqttException;
    4. import org.eclipse.paho.client.mqttv3.MqttMessage;
    5. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    6. /**
    7. * 发布方
    8. */
    9. public class PublishSample {
    10. public static void main(String[] args) {
    11. String topic = "test";
    12. String content = "你好,我给你发了条消息呀!!!!!!!!!!!";
    13. int qos = 1;
    14. String broker = "tcp://broker.emqx.io:1883";
    15. String userName = "admin";
    16. String password = "password";
    17. String clientId = "pubClient";
    18. // 内存存储
    19. MemoryPersistence persistence = new MemoryPersistence();
    20. try {
    21. // 创建客户端
    22. MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
    23. // 创建链接参数
    24. MqttConnectOptions connOpts = new MqttConnectOptions();
    25. // 在重新启动和重新连接时记住状态
    26. connOpts.setCleanSession(false);
    27. // 设置连接的用户名
    28. connOpts.setUserName(userName);
    29. connOpts.setPassword(password.toCharArray());
    30. // 建立连接
    31. System.out.println("连接到 broker: " + broker);
    32. sampleClient.connect(connOpts);
    33. System.out.println("连接成功.");
    34. // 创建消息
    35. MqttMessage message = new MqttMessage(content.getBytes());
    36. // 设置消息的服务质量
    37. message.setQos(qos);
    38. // 发布消息
    39. System.out.println("向" + topic + "发送消息:" + message);
    40. sampleClient.publish(topic, message);
    41. // 断开连接
    42. sampleClient.disconnect();
    43. // 关闭客户端
    44. sampleClient.close();
    45. } catch (MqttException me) {
    46. System.out.println("reason " + me.getReasonCode());
    47. System.out.println("msg " + me.getMessage());
    48. System.out.println("loc " + me.getLocalizedMessage());
    49. System.out.println("cause " + me.getCause());
    50. System.out.println("excep " + me);
    51. me.printStackTrace();
    52. }
    53. }
    54. }

     

    启动发布方运行结果如下:

    4.最后查看订阅方的控制台,订阅方收到消息

     

  • 相关阅读:
    【测试代码 & 基于Pytorch】的卷积神经网络(CNN) || 【基于Pytorch】的深度卷积神经网络(DCNN)
    Android 9 导航栏添加截图按钮
    正则表达式对字符串处理
    深入UDP收发数据(上)
    chatgpt赋能python:Python数据处理中如何选取指定范围的数据
    03RabbitMQ工作队列
    Mybatis ResultType处理返回类型
    Web服务(03)——HTTP协议
    企业架构LNMP学习笔记40
    振南技术干货集:振南当年入门C语言和单片机的那些事儿(5)
  • 原文地址:https://blog.csdn.net/qq_46940224/article/details/132740600