• MQTT介绍和使用


    1:QoS(Quality of Service)等级

    MQTT设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。在这套机制下,提供了三种不同层次QoS(Quality of Service):

    QoS0,At most once,至多一次;
    QoS1,At least once,至少一次;
    QoS2,Exactly once,确保只有一次;
      QoS 是消息的发送方(Sender)和接受方(Receiver)之间达成的一个协议:

    QoS0 代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;
    QoS1 代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;
    QoS2 代表,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
    注意:
    QoS是Sender和Receiver之间的协议,而不是Publisher和Subscriber之间的协议。换句话说,Publisher发布了一条QoS1的消息,只能保证Broker能至少收到一次这个消息;而对于Subscriber能否至少收到一次这个消息,还要取决于Subscriber在Subscibe的时候和Broker协商的QoS等级。

    二、QoS0
      QoS0等级下,Sender和Receiver之间一次消息的传递流程如下:

    Sender向Receiver发送一个包含消息数据的PUBLISH包,然后不管结果如何,丢掉已发送的PUBLISH包,一条消息的发送完成。

    三、QoS1
      QoS1要保证消息至少到达一次,所以有一个应答的机制。Sender和Receiver的一次消息的传递流程如下:

    Sender向Receiver发送一个带有数据的PUBLISH包,并在本地保存这个PUBLISH包;
    Receiver收到PUBLISH包以后,向Sender发送一个PUBACK数据包,PUBACK数据包没有消息体(Payload),在可变头中有一个包标识(Packet Identifier),和它收到的PUBLISH包中的Packet Identifier一致。
    Sender收到PUBACK之后,根据PUBACK包中的Packet Identifier找到本地保存的PUBLISH包,然后丢弃掉,一次消息的发送完成。
    但是消息传递流程中可能会出现问题:

    如果Sender在一段时间内没有收到PUBLISH包对应的PUBACK,它将该PUBLISH包的DUP标识设为1(代表是重新发送的PUBLISH包),然后重新发送该PUBLISH包。
    Receiver可能会重复收到消息,需自行去重。
    四、QoS2
      相比QoS0和QoS1,QoS2不仅要确保Receiver能收到Sender发送的消息,还需要确保消息不重复。它的重传和应答机制就要复杂一些,同时开销也是最大的。QoS2下,一次消息的传递流程如下所示:

    Sender发送QoS为2的PUBLISH数据包,数据包 Packet Identifier 为 P,并在本地保存该PUBLISH包;
    Receiver收到PUBLISH数据包后,在本地保存PUBLISH包的Packet Identifier P,并回复Sender一个PUBREC数据包,PUBREC数据包可变头中的Packet Identifier为P,没有消息体(Payload);
    当Sender收到PUBREC,它就可以安全的丢弃掉初始Packet Identifier为P的PUBLISH数据包。同时保存该PUBREC数据包,并回复Receiver一个PUBREL数据包,PUBREL数据包可变头中的Packet Identifier为P,没有消息体;
    当Receiver收到PUBREL数据包,它可以丢掉保存的PUBLISH包的Packet Identifier P,并回复Sender一个可变头中 Packet Identifier 为 P,没有消息体(Payload)的PUBCOMP数据包;
    当Sender收到PUBCOMP包,那么认为传输已完成,则丢掉对应的PUBREC数据包;
    上面是一次完整无误的传输过程,然而传输过程中可能会出现以下情况:

    情况1:Sender发送PUBLISH数据包给Receiver的时候,发送失败;
    情况2:Sender已经成功发送PUBLISH数据包给Receiver了,但是Receiver发送PUBREC数据包失败;
    情况3:Sender已经成功收到了PUBREC数据包,但是PUBREL数据包发送失败;
    情况4:Receiver已经收到了PUBREL数据包,但是发送PUBCOMP数据包时发送失败
    针对上述的问题,较为详细的处理方法如下:

    不管是情况1还是情况2,因为Sender在一定时间内没有收到PUBREC,那么它会把PUBLISH包的DUP标识设为1,重新发送该PUBLISH数据包;
    不管是情况3还是情况4,因为Sender在一定时间内没有收到PUBCOMP包,那么它会重新发送PUBREL数据包;
    针对情况2,Receiver可能会收到多个重复的PUBLISH包,更加完善的处理如下:
    Receiver在收到PUBLISH数据包之后,马上回复一个PUBREC数据包。并会在本地保存PUBLISH包的Packet Identifier P,不管之后因为重传多少次这个Packet Identifier 为P的数据包,Receiver都认为是重复的,丢弃。同时Receiver接收到QoS为2的PUBLISH数据包后,并不马上投递给上层,而是在本地做持久化,将消息保存起来(这里需要是持久化而不是保存在内存)。
    针对情况4,更加完善的处理如下:
    Receiver收到PUBREL数据包后,正式将消息递交给上层应用层,投递之后销毁Packet Identifier P,并发送PUBCOMP数据包,销毁之前的持久化消息。之后不管接收到多少个PUBREL数据包,因为没有Packet Identifier P,直接回复PUBCOMP数据包即可。

    2:可以下载MQTTX客户端链接测试MQTT服务器,可以直接windows软件包下载 EMQX

    3: MQTT主要参数

    ClientId:ClientId 的长度可以是 1-23 个字符,在一个服务器上 ClientId 不能重复。如果超过 23 个字符,则服务器返回 CONNACK 消息中的返回码为 Identifier Rejected。在 MQTT 3.1.1 中,如果您不需要代理持有状态,您可以发送一个空的 ClientId。空的 ClientId 导致连接没有任何状态。在这种情况下,clean session 标志必须设置为 true,否则代理将拒绝连接。

    Clean Session:Clean Session 标志告诉代理客户端是否要建立持久会话。在持久会话 (CleanSession = false) 中,代理存储客户端的所有订阅以及以服务质量(QoS)级别 1 或 2 订阅的客户端的所有丢失消息。 如果会话不是持久的 (CleanSession = true ),代理不为客户端存储任何内容,并清除任何先前持久会话中的所有信息。

    Username/Password:MQTT 可以发送用户名和密码进行客户端认证和授权。但是,如果此信息未加密或散列,则密码将以纯文本形式发送。我们强烈建议将用户名和密码与安全传输一起使用。像 HiveMQ 这样的代理可以使用 SSL 证书对客户端进行身份验证,因此不需要用户名和密码。

    Will Message:LastWillxxx 表示的是遗愿,client 在连接 broker 的时候将会设立一个遗愿,这个遗愿会保存在 broker 中,当 client 因为非正常原因断开与 broker 的连接时,broker 会将遗愿发送给订阅了这个 topic(订阅遗愿的 topic)的 client。

    KeepAlive:keepAlive 是 client 在连接建立时与 broker 通信的时间间隔,通常以秒为单位。这个时间指的是 client 与 broker 在不发送消息下所能承受的最大时长。

    QOS:此数字表示消息的服务质量 (QoS)。有三个级别:0、1 和 2。服务级别决定了消息到达预期接收者(客户端或代理)的保证类型。

    Payload:这个是每条消息的实际内容。MQTT 是数据无关性的。可以发送任何文本、图像、加密数据以及二进制数据。

    timeout:MQTT会尝试接收数据,直到timeout时间到后才会退出。

    4:MQTT项目例子1

    4-1:配置pom.xml

    <dependencies>
        
    		<dependency>
    			<groupId>org.springframework.bootgroupId>
    			<artifactId>spring-boot-starter-webartifactId>
    		dependency>
    		<dependency>
    			<groupId>org.springframework.bootgroupId>
    			<artifactId>spring-boot-starter-integrationartifactId>
    		dependency>
    		<dependency>
    			<groupId>org.springframework.integrationgroupId>
    			<artifactId>spring-integration-streamartifactId>
    		dependency>
    		<dependency>
    			<groupId>org.springframework.integrationgroupId>
    			<artifactId>spring-integration-mqttartifactId>
    		dependency>
    		<dependency>
    			<groupId>org.springframework.bootgroupId>
    			<artifactId>spring-boot-configuration-processorartifactId>
    			<optional>trueoptional>
    		dependency>
    		<dependency>
    			<groupId>com.alibabagroupId>
    			<artifactId>fastjsonartifactId>
    			<version>1.2.47version>
    		dependency>
    		
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    4-2:配置MQTT服务器基本信息

    在springBoot配置文件application.yml中配置,添加如下:

    #mqtt配置
    com:
      mqtt:
        url: tcp://192.168.16.9:21883
        clientId: mqtt_test66
        topics: topic01,topic02
        username: hdxadmin
        password: 1qaz@WSX
        timeout: 10
        #    completionTimeout: 3000
        keepalive: 20
    
    #指定服务端口
    server:
      port: 8080   #一般没改过tomcat服务器的端口不用修改
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    4-3:配置读取yml文件的类MqttConfiguration

    package com.example.mqtt.config;
    
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author dell
     * @create 2022/11/12
     * 读取yml
     */
    @Component
    @ConfigurationProperties(prefix = "com.mqtt")     //对应yml文件中的com下的mqtt文件配置
    public class MqttConfiguration {
        private String url;
        private String clientId;
        private String topics;
        private String username;
        private String password;
        private String timeout;
        private String keepalive;
        public String getUrl() {
            return url;
        }
        public void setUrl(String url) {
            this.url = url;
        }
        public String getUsername() {
            return username;
        }
        public void setUsername(String username) {
            this.username = username;
        }
        public String getPassword() {
            return password;
        }
        public void setPassword(String password) {
            this.password = password;
        }
        public String getClientId() {
            return clientId;
        }
        public void setClientId(String clientId) {
            this.clientId = clientId;
        }
        public String getTopics() {
            return topics;
        }
        public void setTopics(String topics) {
            this.topics = topics;
        }
        public String getTimeout() {
            return timeout;
        }
        public void setTimeout(String timeout) {
            this.timeout = timeout;
        }
        public String getKeepalive() {
            return keepalive;
        }
        public void setKeepalive(String keepalive) {
            this.keepalive = keepalive;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    4-4:MQTT生产端的Handler处理

    package com.example.mqtt.config;
    
    
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    /**
     * MQTT生产端
     * @Author dell
     * @create 2022/11/12
     */
    @Configuration
    public class MqttOutboundConfiguration {
        @Autowired
        private MqttConfiguration mqttProperties;
    
        @Bean
        public MessageChannel mqttOutboundChannel() {
            return new DirectChannel();
        }
    
        @Bean
        public MqttPahoClientFactory mqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            String[] array = mqttProperties.getUrl().split(",");
            MqttConnectOptions options = new MqttConnectOptions();
            options.setServerURIs(array);
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            // 接受离线消息
            options.setCleanSession(false); //告诉代理客户端是否要建立持久会话   false为建立持久会话
    
            factory.setConnectionOptions(options);
            return factory;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "mqttOutboundChannel")
        public MessageHandler mqttOutbound() {
        	String clientId = mqttProperties.getClientId() + "outbound" + System.currentTimeMillis();
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( clientId , mqttClientFactory());
            messageHandler.setAsync(true);
    		//设置生产者的消息级别,0-最多一条,1-最少一条,2-只能一条
    		messageHandler.setDefaultQos(1);
            return messageHandler;
        }
    
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    4-5:MQTT消费端的Handler处理

    实现了对inboundtopic中的主题监听,当有消息推送到inboundtopic主题上时可以接受

    package com.example.mqtt.config;
    
    /**
     * @Author dell
     * @create 2022/11/12
     */
    
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.IntegrationComponentScan;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.core.MessageProducer;
    import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
    import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    import org.springframework.integration.mqtt.support.MqttHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.messaging.MessagingException;
    
    /**
     * MQTT消费端
     *
     */
    @Configuration
    @IntegrationComponentScan
    public class MqttInboundConfiguration {
    
        @Autowired
        private MqttConfiguration mqttProperties;
    
    
        @Bean
        public MessageChannel mqttInputChannel() {
            return new DirectChannel();
        }
    
        @Bean
        public MqttPahoClientFactory mqttClientFactoryIn() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            String[] array = mqttProperties.getUrl().split(",");
            MqttConnectOptions options = new MqttConnectOptions();
            options.setServerURIs(array);
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setKeepAliveInterval(2);
    
            //接受离线消息
            options.setCleanSession(false);
            factory.setConnectionOptions(options);
            return factory;
        }
    
        //配置client,监听的topic
        @Bean
        public MessageProducer inbound() {
            String[] inboundTopics = mqttProperties.getTopics().split(",");
            MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                    mqttProperties.getClientId()+"_inbound",mqttClientFactoryIn(), inboundTopics);  //对inboundTopics主题进行监听
            adapter.setCompletionTimeout(5000);
            //可以限制到publish的QOS值,一定小于或等于此设置的QOS值
            adapter.setQos(1);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }
    
        //通过通道获取数据
        @Bean
        @ServiceActivator(inputChannel = "mqttInputChannel")  //异步处理
        public MessageHandler handler() {
            return new MessageHandler() {
                @Override
                public void handleMessage(Message<?> message) throws MessagingException {
    //				System.out.println("message:"+message);
                    System.out.println("----------------------");
                    //一个通道可以监听很多主机,通过是否往MQTT推送的主机名字执行以下命令
    				String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
                    System.out.println("topic:"+topic);
                    System.out.println("message:"+message.getPayload());
                    System.out.println("PacketId:"+message.getHeaders().getId());
                    //获取到publish的QOS,一定小于或等于received的QOS值
                    System.out.println("Qos:"+message.getHeaders().get("mqtt_receivedQos"));
    
                    
                }
            };
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    4-6:在SpringbootApplication.class里添加注解

    @EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class}),不然会报错,原因是导入了jdbc的依赖,使用@Configuration注解向spring注入了dataSource bean。
    但是因为工程中没有关于dataSource相关的配置信息,当spring创建dataSource bean因缺少相关的信息就会报错。

    package com.example.mqtt;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
    
    @SpringBootApplication
    @EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class})
    public class MqttApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(MqttApplication.class, args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    4-7:MqttGateway 消息推送接口

    package com.example.mqtt.sevice;
    
    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.mqtt.support.MqttHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    
    /**
     * @Author dell
     * @create 2022/11/12
     */
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MqttGateway {
        // 定义重载方法,用于消息发送
        void sendToMqtt(String payload);
    
        // 指定topic进行消息发送
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
  • 相关阅读:
    JS 基础: 取消 Ajax 请求(fetch abort)
    Flutter IOS 前后台切换主题自动变化的问题
    牛客网刷题(三)
    登录验证的步骤
    Elasticsearch
    k8s /apis/batch/v1beta1 /apis/policy/v1beta1 接口作用
    通俗易懂玩QT:QT程序发布打包
    Spring 后置处理器【1】
    Unity入门教程(上)
    Webrtc丢包率的计算
  • 原文地址:https://blog.csdn.net/qq_19891197/article/details/127746346