• springboot集成mqtt


    文章目录


    一、MQTT说明

    1.1、mqtt文档

    官网:https://mqtt.org/
    仅供参考:https://www.emqx.com/zh/mqtt

    1.2、MQTT消息服务质量

    MQTT规定了3种消息等级

    1. QoS 0:
      消息最多传递一次,不需要客户端给与回复,如果当时客户端不可用,则会丢失该消息。

    2. QoS 1:
      a、消息传递至少 1 次,发布者会发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内要收到接收者的应答,发布者若没收到应答,会将消息的 DUP 置为 1 并重发消息
      b、所以Qos 1消息级别取决于接受者在规定时间内给与发布者反馈,若没有反馈,则会再次接受到消息。

    3. QoS
      a、消息仅传送一次,发布者发布 QoS 为 2 的消息之后,会将发布的消息储存起来并等待接收者回复 PUBREC 的消息,发送者收到 PUBREC 消息后,它就可以安全丢弃掉之前的发布消息,因为它已经知道接收者成功收到了消息。
      b、发布者会保存 PUBREC 消息并应答一个 PUBREL,等待接收者回复 PUBCOMP 消息,当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。
      c、QoS 2 消息的核心是接收者给发布者反馈两次接收结果,相当于一次接收,一次确认接收

    1.1.1、归纳

    1. QoS 0 消息只发一次,不在乎是否被别的客户端收到,只要发送了就算结束。
    2. QoS 1 消息需要消息接收者在规定时间内给予反馈,结束的标志是在发送后规定时间内收到反馈,否则就会一直发送。
    3. QoS 2 消息需要发送者和接收者双方互相进行消息确认,只要有一方没有确定就不会结束。

    二、MQTT环境搭建

    有2种方式
    1、原生mqtt
    2、rabbitmq的mqtt插件


    第一种:centos、Ubuntu 安装mqtt和使用https://blog.csdn.net/qq_44413835/article/details/120606097

    mqtt客户端下载
    我是使用MQTTBox: https://dl.pconline.com.cn/download/1323304.html

    mqttx下载:https://mqttx.app/zh

    第二种:安装rabbitmq在开启mqtt插件-好处rabbitmq有web管理平台
    注明:如果不会使用rabbitmq查看我的消息队列的专栏,里面有集成篇

    docker安装rabbitmq
    https://blog.csdn.net/qq_44413835/article/details/123648048

    进入docker-rabbitmq容器

    docker exec  -it rabbitmq  /bin/bash
    
    • 1

    安装后开启mqq插件

    # 打开rabbitmq_mqtt
    rabbitmq-plugins enable rabbitmq_mqtt
    
    
    #打开rabbitmq_web_mqtt
    rabbitmq-plugins enable rabbitmq_web_mqtt
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如图:
    在这里插入图片描述
    在这里插入图片描述

    三、boot集成原生mqtt

    1.1、项目结构

    版本boot:2.3.6.RELEASE、web工程
    在这里插入图片描述

    1.2、依赖

     
    
        org.springframework.integration
        spring-integration-mqtt
    
    
    
        org.springframework.integration
        spring-integration-stream
    
    
    
    
        com.google.code.gson
        gson
    
    
    
    
        org.projectlombok
        lombok
        true
    
    
    
    
    
        io.springfox
        springfox-swagger2
        2.7.0
    
    
        io.springfox
        springfox-swagger-ui
        2.7.0
    
    
    
    
    
        org.springframework.boot
        spring-boot-starter-actuator
    
    
    
    
        org.springframework.boot
        spring-boot-configuration-processor
        true
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    1.3、application.properties配置

    spring.application.name=mqtt_demo
    server.port=8080
    # --------------mqtt配置-----------------------------
    # 默认接受消息的主题--指定多个多级主题【物联网数据主题、对话主题-聊天室】
    mqtt.receiver.defaultTopic=receive_iot_topic/#,receive_chat_topic/#
    # 默认发送消息的主题
    mqtt.sender.defaultTopic=test_send
    # mqtt发送者的id
    mqtt.sender.clientId=mqttProducer
    # mqtt接收者的id-随机id来拼串
    mqtt.receiver.clientId=${random.value}
    # 地址和用户名密码
    mqtt.url=tcp://服务器ip地址:1883
    mqtt.username=用户名
    mqtt.password=密码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1.4、实体类

    IotData

    package sqy.bean;
    
    import com.google.gson.annotations.SerializedName;
    import org.springframework.stereotype.Component;
    
    import java.io.Serializable;
    import java.util.Date;
    
    @Component
    public class IotData implements Serializable {
    
      @SerializedName("deviceid")
      String deviceid;//设备id
    
      @SerializedName("sensorid")
      String sensorid;//数据id
    
      @SerializedName("types")
      String types;//设备来源
    
      @SerializedName("loraid")
      String loraid;//loraid硬件的id
    
      @SerializedName("createtime")
      Date createtime;//创建时间
    
      @SerializedName("temp")
      float temp;//温度
    
      @SerializedName("humi")
      float humi;//湿度
    
      @SerializedName("light")
      float light;//光敏
    
      //get/set/tostring省略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    api响应的实体类

    package sqy.rvo;
    
    /**
     * @author 
     * @Date 2022/4/15
     * 通用返回对象
     */
    public class ApiResult {
    
        private long code;
        private String message;
        private T data;
    
    
        private final static long SUCCESS_CODE=1000;
        private final static long FAIL_CODE=2000;
    
    
        protected ApiResult() {
        }
    
        protected ApiResult(long code, String message, T data) {
            this.code = code;
            this.message = message;
            this.data = data;
        }
    
    
        /**
         * 成功返回结果
         */
        public static  ApiResult success(T data, String message) {
            return new ApiResult(SUCCESS_CODE, message, data);
        }
    
    
        /**
         * 失败返回结果
         */
        public static  ApiResult failed(String message) {
            return new ApiResult(FAIL_CODE, message, null);
        }
    
    	//get/set/tostring省略...
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    1.5、mqtt配置类

    MqttConfig

    package sqy.config.mqtt;
    
    
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.core.MessageProducer;
    import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
    import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.util.StringUtils;
    import sqy.service.mqtt.MqttCaseServiceImpl;
    
    /**
     * @author 
     * @Date 2022/4/15
     * mqtt的配置类
     */
    @Configuration
    public class MqttConfig {
    
        /**
         * 发布的bean名称
         */
        public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
        public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
    
        // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
        private static final byte[] WILL_DATA;
    
        static {
            WILL_DATA = "offline".getBytes();
        }
    
        @Value("${mqtt.username}")
        private String username;
    
        @Value("${mqtt.password}")
        private String password;
    
        @Value("${mqtt.url}")
        private String url;
    
        @Value("${mqtt.sender.clientId}")
        private String clientsId;
    
        @Value("${mqtt.sender.defaultTopic}")
        private String defaultsTopic;
    
        @Value("${mqtt.receiver.clientId}")
        private String clientcId;
    
        @Value("${mqtt.receiver.defaultTopic}")
        private String defaultcTopic;
    
    
        /**
         * MQTT连接器选项
         */
        @Bean
        public MqttConnectOptions getSenderMqttConnectOptions() {
            MqttConnectOptions options = new MqttConnectOptions();
            // 设置连接的用户名
            System.out.println(username);
            if (!username.trim().equals("")) {
                options.setUserName(username);
            }
            // 设置连接的密码
            options.setPassword(password.toCharArray());
            // 设置连接的地址
            options.setServerURIs(new String[]{url});
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(100);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            // 但这个方法并没有重连的机制
            options.setKeepAliveInterval(30);
            // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
            options.setWill("willTopic", WILL_DATA, 2, false);
            return options;
        }
    
        /**
         * MQTT客户端
         */
        @Bean
        public MqttPahoClientFactory senderMqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setConnectionOptions(getSenderMqttConnectOptions());
            return factory;
        }
    
    
        /**
         * MQTT消息处理器(生产者)
         */
        @Bean
        @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
        public MessageHandler mqttOutbound() {
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientsId, senderMqttClientFactory());
            messageHandler.setAsync(true);
            messageHandler.setDefaultTopic(defaultsTopic);
            return messageHandler;
        }
    
    
        /**
         * MQTT信息通道(生产者)
         */
        @Bean(name = CHANNEL_NAME_OUT)
        public MessageChannel mqttOutboundChannel() {
            DirectChannel channel = new DirectChannel();
            return channel;
        }
    
        /**
         * MQTT信息通道(消费者)
         */
        @Bean(name = CHANNEL_NAME_IN)
        public MessageChannel mqttInboundChannel() {
            DirectChannel channel = new DirectChannel();
            return channel;
        }
    
    
        /**
         * MQTT消息订阅绑定(消费者)
         */
        @Bean
        public MessageProducer inbound() {
            String[] receiverTopics = StringUtils.split(defaultcTopic, ",");
            // 可以同时消费(订阅)多个Topic
            MqttPahoMessageDrivenChannelAdapter adapter =
                    new MqttPahoMessageDrivenChannelAdapter(
                            "re" + clientcId, senderMqttClientFactory(),
                            receiverTopics);
            adapter.setCompletionTimeout(5000);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(1);
            // 设置订阅通道
            adapter.setOutputChannel(mqttInboundChannel());
            return adapter;
        }
    
        /**
         * MQTT消息处理器(消费者)
         */
        @Bean
        @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
        public MessageHandler handler() {
            MqttCaseServiceImpl service = new MqttCaseServiceImpl();
            return service;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160

    1.6、mqtt发布接口

    IMqttSender

    package sqy.mqtt;
    
    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.mqtt.support.MqttHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    import sqy.config.mqtt.MqttConfig;    
    
    /**
     * MQTT生产者消息发送接口
     * 通过接口将数据传递到集成流
     */
    @Component
    @MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
    public interface IMqttSender {
    
        /**
         * 发送信息到MQTT服务器
         *
         * @param data 发送的文本
         */
        void sendToMqtt(String data);
    
        /**
         * 发送信息到MQTT服务器
         *
         * @param topic   主题
         * @param payload 消息主体
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    
        /**
         * 发送信息到MQTT服务器
         *
         * @param topic   主题
         * @param qos     对消息处理的几种机制。
         *                0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
         *                1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
         *                2 多了一次去重的动作,确保订阅者收到的消息有一次。
         * @param payload 消息主体
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    1.7、mqtt接收消息

    MqttCaseServiceImpl

    package sqy.service.mqtt;
    
    import com.google.gson.Gson;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.messaging.MessagingException;
    import org.springframework.stereotype.Service;
    import sqy.bean.IotData;
    
    
    /**
     * MQTT接收消息
     */
    @Service
    public class MqttCaseServiceImpl implements MessageHandler {
    
        /**
         * MessageHeaders:
         * public static final String PREFIX = "mqtt_";
         * public static final String QOS = "mqtt_qos";
         * public static final String ID = "mqtt_id";
         * public static final String RECEIVED_QOS = "mqtt_receivedQos";
         * public static final String DUPLICATE = "mqtt_duplicate";
         * public static final String RETAINED = "mqtt_retained";
         * public static final String RECEIVED_RETAINED = "mqtt_receivedRetained";
         * public static final String TOPIC = "mqtt_topic";
         * public static final String RECEIVED_TOPIC = "mqtt_receivedTopic";
         * public static final String MESSAGE_EXPIRY_INTERVAL = "mqtt_messageExpiryInterval";
         * public static final String TOPIC_ALIAS = "mqtt_topicAlias";
         * public static final String RESPONSE_TOPIC = "mqtt_responseTopic";
         * public static final String CORRELATION_DATA = "mqtt_correlationData";
         */
    
        @Autowired
        Gson gson;
    
    
        @Override
        public void handleMessage(Message message) throws MessagingException {
            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
            String payload = (String) message.getPayload();
            System.out.println("headers:" + topic + " 接收的数据:" + payload);
            if (topic.contains("receive_iot_topic")) {
                System.out.println("硬件的信息的主题");
                IotData entity = gson.fromJson(payload, IotData.class);
                if (entity!=null){
                    //不是心跳数据
                    if (!entity.getTypes().equals("heartbeat")) {
                        //判断硬件来源
                        if (entity.getTypes().equals("esp32")) {
                            System.out.println("来着esp32的数据");
                            //写入数据库 ...
                        }
                    }
                }else {
                    System.out.println("序列化失败");
                }
    
            }
            if (topic.contains("receive_chat_topic")) {
                System.out.println("对话的主题");
                //...构建聊天室.....
                //...相互订阅发送消息就可以了....
                //...逻辑代码...
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    1.8、集成Swagger2配置

    Swagger2Config

    package sqy.config.swagger;
    
    import io.swagger.annotations.Api;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import springfox.documentation.builders.ApiInfoBuilder;
    import springfox.documentation.builders.PathSelectors;
    import springfox.documentation.builders.RequestHandlerSelectors;
    import springfox.documentation.service.ApiInfo;
    import springfox.documentation.spi.DocumentationType;
    import springfox.documentation.spring.web.plugins.Docket;
    import springfox.documentation.swagger2.annotations.EnableSwagger2;
    
    /**
     * @author 
     * @Date 2022/4/15
     * Swagger2API文档的配置
     * http://localhost:8081/swagger-ui.html
     */
    @Configuration
    @EnableSwagger2
    public class Swagger2Config {
        @Bean
        public Docket createRestApi(){
            return new Docket(DocumentationType.SWAGGER_2)
                    .apiInfo(apiInfo())
                    .select()
                    //为当前包下controller生成API文档
                    .apis(RequestHandlerSelectors.basePackage("sqy.controller"))
                    //为有@Api注解的Controller生成API文档
                    .apis(RequestHandlerSelectors.withClassAnnotation(Api.class))
                    .paths(PathSelectors.any())
                    .build();
        }
    
        private ApiInfo apiInfo() {
            return new ApiInfoBuilder()
                    .title("SwaggerUI演示")
                    .description("mqtt-demo")
                    .contact("sqy")
                    .version("1.0")
                    .build();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    1.9、mqtt测试类

    MqttController

    package sqy.controller;
    
    
    import com.google.gson.Gson;
    import io.swagger.annotations.Api;
    import io.swagger.annotations.ApiOperation;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import sqy.bean.IotData;
    import sqy.mqtt.IMqttSender;
    import sqy.rvo.ApiResult;
    
    
    /**
     * @author 
     * @Date 2022/4/15
     * MQTT测试接口
     */
    @Api(tags = "MqttController", description = "MQTT测试接口")
    @RestController
    @RequestMapping("/mqtt")
    public class MqttController {
    
    
        @Autowired
        IMqttSender mqttSender;
    
        @Autowired
        Gson gson;
    
        //这个是外面配置文件里面的设置的接收主题之一
        private final static String SEND_TOPIC_PREFIX = "receive_iot_topic/";
    
    
        @ApiOperation("向指定主题发送消息")
        @PostMapping("/sendToTopic")
        public ApiResult sendToTopic(String topic, String payload) {
            /**
             * 想接收方方法消息-主题:receive_iot_topic/#,receive_chat_topic/#
             */
            mqttSender.sendToMqtt(topic,payload);
            System.out.println("发送成功=>" + "主题:" + topic + "  载荷:" + payload);
            return ApiResult.success(null, "发送成功");
        }
    
    
        /**
         * 127.0.0.1:8081/mqtt/control_command
         * post、json
         * {
         *   "createtime": "2022-04-17T07:02:23.707Z",
         *   "deviceid": "001设备",
         *   "humi": 30,
         *   "light": 55,
         *   "loraid": "r001",
         *   "sensorid": "123456789",
         *   "temp": 100,
         *   "types": "esp32"
         * }
         */
        @ApiOperation("模拟硬件发送的数据或控制指令")
        @PostMapping("/control_command")
        public ApiResult controlCommand(@RequestBody IotData iotData) {
            String deviceId = iotData.getDeviceid();
            // 前缀 + 设备号
            String topic = SEND_TOPIC_PREFIX + deviceId;
            String payload=gson.toJson(iotData);
            mqttSender.sendToMqtt( topic,payload);
            System.out.println("发送成功=>" + "主题:" + topic + "  载荷:" + payload);
            return ApiResult.success(null, "发送成功");
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    1.10、测试效果

    1. 登入swagger测试:http://localhost:8081/swagger-ui.html#/
    2. 或用post测试

    或者使用postman测试:

    127.0.0.1:8081/mqtt/control_command

    {
      
      "deviceid": "001设备",
      "humi": 30,
      "light": 55,
      "loraid": "r001",
      "sensorid": "123456789",
      "temp": 100,
      "types": "esp32",
      "createtime": "2022-04-17T07:02:23.707Z"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    如图:

    postman测试图:
    在这里插入图片描述
    后台打印
    在这里插入图片描述
    mqttbox订阅接收
    在这里插入图片描述

    swagger测试gif图:
    在这里插入图片描述

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    批量操作数据、自定义分页器
    专利申请常见的几个误区!
    基于Spring Boot的医院预约挂号系统设计与实现(源码+lw+部署文档+讲解等)
    视频汇聚/视频云存储/视频监控管理平台EasyCVR分发rtsp流起播慢优化步骤详解
    odoo16 一个比较复杂的domain
    python处理kitti激光雷达数据
    Asp .Net Core 系列:详解鉴权(身份验证)以及实现 Cookie、JWT、自定义三种鉴权 (含源码解析)
    小程序制作(超详解!!!)第十三节 随机数求和
    Objective-C和C/C++混合编译
    【Leetcode】1775. Equal Sum Arrays With Minimum Number of Operations
  • 原文地址:https://blog.csdn.net/m0_67391518/article/details/126115697