MQTT设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。在这套机制下,提供了三种不同层次QoS(Quality of Service):
QoS0,At most once,至多一次;
QoS1,At least once,至少一次;
QoS2,Exactly once,确保只有一次;
QoS 是消息的发送方(Sender)和接受方(Receiver)之间达成的一个协议:
QoS0 代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;
QoS1 代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;
QoS2 代表,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
注意:
QoS是Sender和Receiver之间的协议,而不是Publisher和Subscriber之间的协议。换句话说,Publisher发布了一条QoS1的消息,只能保证Broker能至少收到一次这个消息;而对于Subscriber能否至少收到一次这个消息,还要取决于Subscriber在Subscibe的时候和Broker协商的QoS等级。
二、QoS0
QoS0等级下,Sender和Receiver之间一次消息的传递流程如下:
Sender向Receiver发送一个包含消息数据的PUBLISH包,然后不管结果如何,丢掉已发送的PUBLISH包,一条消息的发送完成。
三、QoS1
QoS1要保证消息至少到达一次,所以有一个应答的机制。Sender和Receiver的一次消息的传递流程如下:
Sender向Receiver发送一个带有数据的PUBLISH包,并在本地保存这个PUBLISH包;
Receiver收到PUBLISH包以后,向Sender发送一个PUBACK数据包,PUBACK数据包没有消息体(Payload),在可变头中有一个包标识(Packet Identifier),和它收到的PUBLISH包中的Packet Identifier一致。
Sender收到PUBACK之后,根据PUBACK包中的Packet Identifier找到本地保存的PUBLISH包,然后丢弃掉,一次消息的发送完成。
但是消息传递流程中可能会出现问题:
如果Sender在一段时间内没有收到PUBLISH包对应的PUBACK,它将该PUBLISH包的DUP标识设为1(代表是重新发送的PUBLISH包),然后重新发送该PUBLISH包。
Receiver可能会重复收到消息,需自行去重。
四、QoS2
相比QoS0和QoS1,QoS2不仅要确保Receiver能收到Sender发送的消息,还需要确保消息不重复。它的重传和应答机制就要复杂一些,同时开销也是最大的。QoS2下,一次消息的传递流程如下所示:
Sender发送QoS为2的PUBLISH数据包,数据包 Packet Identifier 为 P,并在本地保存该PUBLISH包;
Receiver收到PUBLISH数据包后,在本地保存PUBLISH包的Packet Identifier P,并回复Sender一个PUBREC数据包,PUBREC数据包可变头中的Packet Identifier为P,没有消息体(Payload);
当Sender收到PUBREC,它就可以安全的丢弃掉初始Packet Identifier为P的PUBLISH数据包。同时保存该PUBREC数据包,并回复Receiver一个PUBREL数据包,PUBREL数据包可变头中的Packet Identifier为P,没有消息体;
当Receiver收到PUBREL数据包,它可以丢掉保存的PUBLISH包的Packet Identifier P,并回复Sender一个可变头中 Packet Identifier 为 P,没有消息体(Payload)的PUBCOMP数据包;
当Sender收到PUBCOMP包,那么认为传输已完成,则丢掉对应的PUBREC数据包;
上面是一次完整无误的传输过程,然而传输过程中可能会出现以下情况:
情况1:Sender发送PUBLISH数据包给Receiver的时候,发送失败;
情况2:Sender已经成功发送PUBLISH数据包给Receiver了,但是Receiver发送PUBREC数据包失败;
情况3:Sender已经成功收到了PUBREC数据包,但是PUBREL数据包发送失败;
情况4:Receiver已经收到了PUBREL数据包,但是发送PUBCOMP数据包时发送失败
针对上述的问题,较为详细的处理方法如下:
不管是情况1还是情况2,因为Sender在一定时间内没有收到PUBREC,那么它会把PUBLISH包的DUP标识设为1,重新发送该PUBLISH数据包;
不管是情况3还是情况4,因为Sender在一定时间内没有收到PUBCOMP包,那么它会重新发送PUBREL数据包;
针对情况2,Receiver可能会收到多个重复的PUBLISH包,更加完善的处理如下:
Receiver在收到PUBLISH数据包之后,马上回复一个PUBREC数据包。并会在本地保存PUBLISH包的Packet Identifier P,不管之后因为重传多少次这个Packet Identifier 为P的数据包,Receiver都认为是重复的,丢弃。同时Receiver接收到QoS为2的PUBLISH数据包后,并不马上投递给上层,而是在本地做持久化,将消息保存起来(这里需要是持久化而不是保存在内存)。
针对情况4,更加完善的处理如下:
Receiver收到PUBREL数据包后,正式将消息递交给上层应用层,投递之后销毁Packet Identifier P,并发送PUBCOMP数据包,销毁之前的持久化消息。之后不管接收到多少个PUBREL数据包,因为没有Packet Identifier P,直接回复PUBCOMP数据包即可。
ClientId:ClientId 的长度可以是 1-23 个字符,在一个服务器上 ClientId 不能重复。如果超过 23 个字符,则服务器返回 CONNACK 消息中的返回码为 Identifier Rejected。在 MQTT 3.1.1 中,如果您不需要代理持有状态,您可以发送一个空的 ClientId。空的 ClientId 导致连接没有任何状态。在这种情况下,clean session 标志必须设置为 true,否则代理将拒绝连接。
Clean Session:Clean Session 标志告诉代理客户端是否要建立持久会话。在持久会话 (CleanSession = false) 中,代理存储客户端的所有订阅以及以服务质量(QoS)级别 1 或 2 订阅的客户端的所有丢失消息。 如果会话不是持久的 (CleanSession = true ),代理不为客户端存储任何内容,并清除任何先前持久会话中的所有信息。
Username/Password:MQTT 可以发送用户名和密码进行客户端认证和授权。但是,如果此信息未加密或散列,则密码将以纯文本形式发送。我们强烈建议将用户名和密码与安全传输一起使用。像 HiveMQ 这样的代理可以使用 SSL 证书对客户端进行身份验证,因此不需要用户名和密码。
Will Message:LastWillxxx 表示的是遗愿,client 在连接 broker 的时候将会设立一个遗愿,这个遗愿会保存在 broker 中,当 client 因为非正常原因断开与 broker 的连接时,broker 会将遗愿发送给订阅了这个 topic(订阅遗愿的 topic)的 client。
KeepAlive:keepAlive 是 client 在连接建立时与 broker 通信的时间间隔,通常以秒为单位。这个时间指的是 client 与 broker 在不发送消息下所能承受的最大时长。
QOS:此数字表示消息的服务质量 (QoS)。有三个级别:0、1 和 2。服务级别决定了消息到达预期接收者(客户端或代理)的保证类型。
Payload:这个是每条消息的实际内容。MQTT 是数据无关性的。可以发送任何文本、图像、加密数据以及二进制数据。
timeout:MQTT会尝试接收数据,直到timeout时间到后才会退出。
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-integrationartifactId>
dependency>
<dependency>
<groupId>org.springframework.integrationgroupId>
<artifactId>spring-integration-streamartifactId>
dependency>
<dependency>
<groupId>org.springframework.integrationgroupId>
<artifactId>spring-integration-mqttartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-configuration-processorartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.47version>
dependency>
dependencies>
在springBoot配置文件application.yml中配置,添加如下:
#mqtt配置
com:
mqtt:
url: tcp://192.168.16.9:21883
clientId: mqtt_test66
topics: topic01,topic02
username: hdxadmin
password: 1qaz@WSX
timeout: 10
# completionTimeout: 3000
keepalive: 20
#指定服务端口
server:
port: 8080 #一般没改过tomcat服务器的端口不用修改
package com.example.mqtt.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @Author dell
* @create 2022/11/12
* 读取yml
*/
@Component
@ConfigurationProperties(prefix = "com.mqtt") //对应yml文件中的com下的mqtt文件配置
public class MqttConfiguration {
private String url;
private String clientId;
private String topics;
private String username;
private String password;
private String timeout;
private String keepalive;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getTopics() {
return topics;
}
public void setTopics(String topics) {
this.topics = topics;
}
public String getTimeout() {
return timeout;
}
public void setTimeout(String timeout) {
this.timeout = timeout;
}
public String getKeepalive() {
return keepalive;
}
public void setKeepalive(String keepalive) {
this.keepalive = keepalive;
}
}
package com.example.mqtt.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* MQTT生产端
* @Author dell
* @create 2022/11/12
*/
@Configuration
public class MqttOutboundConfiguration {
@Autowired
private MqttConfiguration mqttProperties;
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
String[] array = mqttProperties.getUrl().split(",");
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(array);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
// 接受离线消息
options.setCleanSession(false); //告诉代理客户端是否要建立持久会话 false为建立持久会话
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
String clientId = mqttProperties.getClientId() + "outbound" + System.currentTimeMillis();
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( clientId , mqttClientFactory());
messageHandler.setAsync(true);
//设置生产者的消息级别,0-最多一条,1-最少一条,2-只能一条
messageHandler.setDefaultQos(1);
return messageHandler;
}
}
实现了对inboundtopic中的主题监听,当有消息推送到inboundtopic主题上时可以接受
package com.example.mqtt.config;
/**
* @Author dell
* @create 2022/11/12
*/
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* MQTT消费端
*
*/
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {
@Autowired
private MqttConfiguration mqttProperties;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory mqttClientFactoryIn() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
String[] array = mqttProperties.getUrl().split(",");
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(array);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setKeepAliveInterval(2);
//接受离线消息
options.setCleanSession(false);
factory.setConnectionOptions(options);
return factory;
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
String[] inboundTopics = mqttProperties.getTopics().split(",");
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttProperties.getClientId()+"_inbound",mqttClientFactoryIn(), inboundTopics); //对inboundTopics主题进行监听
adapter.setCompletionTimeout(5000);
//可以限制到publish的QOS值,一定小于或等于此设置的QOS值
adapter.setQos(1);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel") //异步处理
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
// System.out.println("message:"+message);
System.out.println("----------------------");
//一个通道可以监听很多主机,通过是否往MQTT推送的主机名字执行以下命令
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
System.out.println("topic:"+topic);
System.out.println("message:"+message.getPayload());
System.out.println("PacketId:"+message.getHeaders().getId());
//获取到publish的QOS,一定小于或等于received的QOS值
System.out.println("Qos:"+message.getHeaders().get("mqtt_receivedQos"));
}
};
}
}
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class}),不然会报错,原因是导入了jdbc的依赖,使用@Configuration注解向spring注入了dataSource bean。
但是因为工程中没有关于dataSource相关的配置信息,当spring创建dataSource bean因缺少相关的信息就会报错。
package com.example.mqtt;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@SpringBootApplication
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class})
public class MqttApplication {
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class, args);
}
}
package com.example.mqtt.sevice;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
/**
* @Author dell
* @create 2022/11/12
*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
// 定义重载方法,用于消息发送
void sendToMqtt(String payload);
// 指定topic进行消息发送
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}