MQTT 协议 是基于发布/订阅模式的物联网通信协议,凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山。
Mqtt 协议是建立在TCP连接之上的应用层协议,是为了解决复杂网络环境下的通信的解决方案。
官方网站:Getting started
mqtt 的主要核心在于发布订阅模式,所以常规的理解发布订阅存在三个角色
publisher : 消息的发布者,一般是设备端
subscriber :消息的订阅者,一般是设备端
Broker : 消息的中专者,在这里也叫broker,实现MQTT协议的服务器程序。

MQTT 协议将协议本身占用的额外消耗最小化,消息头部最小只需要占用 2 个字节。
MQTT 的消息格式分三部分:
| 固定长度头部,2 个字节,所有消息类型里都有 |
| 可变长度头部,只有某些消息类型里有 |
| Payload,只有某些消息类型里有 |
MQTT 的主要消息类型下面这些,也就是说只有这么几条协议。

上面做了分组,基本上都是请求响应的模式。
可以简单的记一下:
rec -> record 记录,保存消息
rel ->release 释放消息,也就是删除消息
com ->complete 操作完成。
订阅的单位是topic,但是这里面有一些规则类似restful ,也就是路径的匹配。
1.斜杠(“/” )用于分割主题的每个层级,为主题名提供一个分层结构
2.多层通配符 ,井字符号(“#”)是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级。只能放在尾部
3.单层通配符,加号 (“+”) 是只能用于单个主题层级匹配的通配符。
4.以 $ 开头的主题,客户端禁止使用,服务端实现可以将 $ 开头的主题名用作其他目的,也就是特殊用处

sport/tennis/player1/# 可以匹配 sport/tennis/player1 ,sport/tennis/player1/ranking,也就是只要以topic开头的都可以
sport/t+/player1 可以匹配 sport/a/player1 sport/b/player1 等等,+只能代表一组字符
QoS实际是客户端和Broker之间的一个服务可靠等级。这个可靠等级最终计算也是取决于订阅和发布双方。
MQTT 设计了 3 个 QoS 等级。
下面看下QoS 2 下消息的发送方式:
可以只看publisher和broker之间的消息往来。
1.publisher发布消息,broker接受之后,回复publisher pubrec,publisher 就删除本地的消息
2. publisher删除之后,通知broker 删除成功pubrel ,
3.broker 发布完成之后也删除消息,并且回复publisherpubcom

以下情况下可以选择 QoS 0
以下情况下可以选择 QoS 1
以下情况下可以选择 QoS 2
Mqtt 和 http 都属于应用层协议,并且http应用更广泛。为什么iot还是选择了Mqtt协议,我理解的是mqtt 做了物联网的一些逻辑,但是http 没有,并且不利于推送
Mqtt 和Tcp 协议在不同的层级,MQTT的底层实现依赖于TCP,你可以自己实现mqtt叫其他的协议名字也可以,但是现在mqtt已经是广泛接受的一种协议。
这里选择了一个免费的broker,tcp://broker-cn.emqx.io,端口1883
添加maven引用
- <dependency>
- <groupId>org.eclipse.pahogroupId>
- <artifactId>org.eclipse.paho.client.mqttv3artifactId>
- <version>1.2.5version>
- dependency>
MqttSample.java
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-
- public class MqttSample {
- public static void main(String[] args) {
- String topic = "xiangcai/topic";
- String content = "Hello World";
- int qos = 2;
- String broker = "tcp://broker.emqx.io:1883";
- String clientId = MqttClient.generateClientId();
- // 持久化
- MemoryPersistence persistence = new MemoryPersistence();
- // MQTT 连接选项
- MqttConnectOptions connOpts = new MqttConnectOptions();
-
- try {
- MqttClient client = new MqttClient(broker, clientId, persistence);
- // 设置回调
- client.setCallback(new SampleCallback());
- // 建立连接
- System.out.println("Connecting to broker: " + broker);
- client.connect(connOpts);
- System.out.println("Connected to broker: " + broker);
- // 订阅 topic
- client.subscribe(topic, qos);
- System.out.println("Subscribed to topic: " + topic);
- // 发布消息
- MqttMessage message = new MqttMessage(content.getBytes());
- message.setQos(qos);
- client.publish(topic, message);
- System.out.println("Message published");
- client.disconnect();
- System.out.println("Disconnected");
- client.close();
- System.exit(0);
- } catch (MqttException me) {
- System.out.println("reason " + me.getReasonCode());
- System.out.println("msg " + me.getMessage());
- System.out.println("loc " + me.getLocalizedMessage());
- System.out.println("cause " + me.getCause());
- System.out.println("excep " + me);
- me.printStackTrace();
- }
- }
- }
回调代码SampleCallback.java
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
-
- public class SampleCallback implements MqttCallback {
- // 连接丢失
- public void connectionLost(Throwable cause) {
- System.out.println("connection lost:" + cause.getMessage());
- }
-
- // 收到消息
- public void messageArrived(String topic, MqttMessage message) {
- System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:" + new String(message.getPayload()));
- }
-
- // 消息传递成功
- public void deliveryComplete(IMqttDeliveryToken token) {
- System.out.println("deliveryComplete");
- }
-
-
- }
其他语言的连接实例
mqtt broker的实现
到目前为止,比较流行的开源 MQTT 服务器有几个:
使用 C 语言实现的 MQTT 服务器。Eclipse 组织还还包含了大量的 MQTT 客户端项目:Eclipse Paho | The Eclipse Foundation
使用 Erlang 语言开发的 MQTT 服务器,内置强大的规则引擎,支持许多其他 IoT 协议比如 MQTT-SN、 CoAP、LwM2M 等。

客户端模拟工具可以理解为postman,就是为了测试,客户端工具比较多,当前使用的是
mqttx,可以看下界面,看起来还是比较清爽的,消息收发类似聊天的方式

在刚开始看的时候对这个协议还是感觉很神秘的,等到现在回头看可以理解为一个通用的服务器,
只不过定义好了消息通信的格式,我们要做的只是理解,知道这个服务器是怎么回事,知道怎么通信就可以了