MQTT是一个极其轻量级的发布/订阅消息传输协议,适用于网络带宽较低的场合.
它通过一个代理服务器(broker),任何一个客户端(client)都可以订阅或者发布某个主题的消息,然后订阅了该主题的客户端则会收到该消息
硬件采集的数据传入EMQX平台(采用MQTT协议),java通过代码连接MQTT服务器,进行采集数据接收、解析、业务处理、存储入库、数据展示。
MQTT 是基于 发布(Publish)/订阅(Subscribe) 模式来进行通信及数据交换的。
MQTT是基于二进制消息的发布/订阅编程模式的消息协议,最早由IBM提出的,如今已经成为OASIS规范。由于规范很简单,非常适合需要低功耗和网络带宽有限的IoT场景,比如:
由于物联网的环境是非常特别的,所以MQTT遵循以下设计原则:

首先看一下HTTP请求:

再来看一下MQTT:
在 IoT 领域,大量设备以及很可能不可靠或高延迟的网络使得同步通信成为问题。异步消息协议更适合 IoT 应用程序。传感器发送读数,让网络确定将其传送到目标设备和服务的最佳路线和时间。在 IoT 应用程序中,设备或传感器通常是客户端,这意味着它们无法被动地接收来自网络的命令。

MQTT 协议在网络中定义了两种实体类型:一个消息代理和一些客户端。
代理是一个服务器,它从客户端接收所有消息,然后将这些消息路由到相关的目标客户端。
客户端是能够与代理交互来发送和接收消息的任何事物。客户端可以是现场的 IoT 传感器,或者是数据中心内处理 IoT 数据的应用程序。
同时,MQTT 是轻量级的。它有一个用来指定消息类型的简单标头,有一个基于文本的主题,还有一个任意的二进制有效负载。应用程序可对有效负载采用任何数据格式,比如 JSON、XML、加密二进制或 Base64,只要目标客户端能够解析该有效负载。
1.通过包管理工具 Maven导入依赖
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.2.2</version>
- </dependency>
2.编写订阅方的代码,并启动。
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-
- /**
- * 订阅方
- */
- public class SubscribeSample {
- public static void main(String[] args) {
- //EMQ X 默认端口 1883
- String broker = "tcp://broker.emqx.io:1883";
- String TOPIC = "test";
- int qos = 1;
- String clientid = "subClient";
- String userName = "admin";
- String passWord = "password";
- try {
- // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
- MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
- // MQTT的连接设置
- MqttConnectOptions options = new MqttConnectOptions();
- // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
- options.setCleanSession(true);
- // 设置连接的用户名
- options.setUserName(userName);
- // 设置连接的密码
- options.setPassword(passWord.toCharArray());
- // 设置超时时间 单位为秒
- options.setConnectionTimeout(10);
- // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
- options.setKeepAliveInterval(20);
- // 设置回调函数
- client.setCallback(new MqttCallback() {
-
- public void connectionLost(Throwable cause) {
- System.out.println("connectionLost");
- }
-
- public void messageArrived(String topic, MqttMessage message) {
- System.out.println("======监听到来自[" + topic + "]的消息======");
- System.out.println("message content:"+new String(message.getPayload()));
- System.out.println("============");
- }
-
- public void deliveryComplete(IMqttDeliveryToken token) {
- System.out.println("deliveryComplete---------"+ token.isComplete());
- }
-
- });
-
- // 建立连接
- System.out.println("连接到 broker: " + broker);
- client.connect(options);
-
- System.out.println("连接成功.");
- //订阅消息
- client.subscribe(TOPIC, qos);
- System.out.println("开始监听" + TOPIC);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
启动订阅方运行结果如下:

3.编写发布方的代码并启动
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 发布方 */ public class PublishSample { public static void main(String[] args) { String topic = "test"; String content = "你好,我给你发了条消息呀!!!!!!!!!!!"; int qos = 1; String broker = "tcp://broker.emqx.io:1883"; String userName = "admin"; String password = "password"; String clientId = "pubClient"; // 内存存储 MemoryPersistence persistence = new MemoryPersistence(); try { // 创建客户端 MqttClient sampleClient = new MqttClient(broker, clientId, persistence); // 创建链接参数 MqttConnectOptions connOpts = new MqttConnectOptions(); // 在重新启动和重新连接时记住状态 connOpts.setCleanSession(false); // 设置连接的用户名 connOpts.setUserName(userName); connOpts.setPassword(password.toCharArray()); // 建立连接 System.out.println("连接到 broker: " + broker); sampleClient.connect(connOpts); System.out.println("连接成功."); // 创建消息 MqttMessage message = new MqttMessage(content.getBytes()); // 设置消息的服务质量 message.setQos(qos); // 发布消息 System.out.println("向" + topic + "发送消息:" + message); sampleClient.publish(topic, message); // 断开连接 sampleClient.disconnect(); // 关闭客户端 sampleClient.close(); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
启动发布方运行结果如下:

4.最后查看订阅方的控制台,订阅方收到消息
