• SpringBoot+RabbitMQ实现MQTT协议通讯


    一、简介

    MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。此处使用RabbitMQ
    MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

    二、环境准备

    2.1 Erlang安装

    使用rabbitMQ首先需要安装Erlang环境,因为rabbitMQ是用Erlang语言编写的。

    2.1.1 下载安装

    官网下载:https://www.erlang.org/patches/otp-26.0 (比较慢,不推荐)
    在这里插入图片描述

    百度网盘下载:https://pan.baidu.com/s/1xU4syn14Bh7QR-skjm_hOg (推荐)
    提取码:az1t

    2.1.2 环境变量

    进入高级系统设置
    在这里插入图片描述
    在这里插入图片描述
    环境变量: 变量名-ERLANG_HOME 变量值-文件安装路径
    在这里插入图片描述
    配置path: 配置完上面的之后,找到系统变量中的path点击编辑,然后新建:%ERLANG_HOME%\bin
    在这里插入图片描述
    验证: 进入cmd,输入 erl -version 显示版本号就说明安装成功
    在这里插入图片描述

    2.2 RabbtiMQ安装

    2.2.1 下载安装

    官网下载:http://www.rabbitmq.com/download.html
    下载后一通傻瓜式安装即可。
    在这里插入图片描述

    2.2.2 环境变量

    变量名-RABBITMQ_SERVER 变量值-文件安装路径
    在这里插入图片描述
    编辑path,点击新建按钮,输入%RABBITMQ_SERVER%\sbin,点击确定
    在这里插入图片描述

    2.2.3 安装mqtt插件

    rabbitmq-plugins enable rabbitmq_mqtt
    
    • 1

    在这里插入图片描述

    2.2.4 管理控制台安装

    rabbitmq-plugins enable rabbitmq_management
    
    • 1

    在这里插入图片描述

    2.2.5 访问测试

    登录测试: 浏览器输入 http://localhost:15672 ,输入用户名:guest,密码:guest在这里插入图片描述

    三、代码实现

    3.1 引入依赖

    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    
    <dependency>
        <groupId>org.springframework.integrationgroupId>
        <artifactId>spring-integration-mqttartifactId>
    dependency>
    <dependency>
        <groupId>org.eclipse.pahogroupId>
        <artifactId>org.eclipse.paho.client.mqttv3artifactId>
        <version>1.2.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3.2 yml配置

    # mqtt配置
    mqtt:
      url: ***********
      username: ***********
      password: ***********
      # 间隔时间
      keep-alive-interval: 60
      # 超时时间
      completion-timeout: 30000
      # 会话保持,默认为false
      clean-session: false
      # 自动连接,默认为true
      automatic-reconnect: true
      # 生产者配置
      producer:
        # 很重要
        client-id: producer1
        topic: demo-topic
        # 传输质量 QoS 0:最多分发一次 QoS 1:至少分发一次(默认) QoS 2:只分发一次
        qos: 1
      # 消费者配置
      subscriber:
        # 很重要
        client-id: subscriber1
        topic: demo-topic
        # 传输质量 QoS 0:最多分发一次 QoS 1:至少分发一次(默认) QoS 2:只分发一次
        qos: 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    3.3 配置属性类

    package com.qiangesoft.mqtt.config;
    
    import lombok.Data;
    import org.apache.commons.lang3.StringUtils;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.stereotype.Component;
    
    /**
     * mqtt配置
     *
     * @author qiangesoft
     * @date 2024-04-24
     */
    @Data
    @Component
    @Configuration
    @ConfigurationProperties(prefix = "mqtt")
    public class MqttProperty {
    
        /**
         * 服务地址
         */
        private String url;
    
        /**
         * 账号
         */
        private String username;
    
        /**
         * 密码
         */
        private String password;
    
        /**
         * 间隔时间
         */
        private int keepAliveInterval;
    
        /**
         * 超时时间
         */
        private int completionTimeout;
    
        /**
         * 会话保持,默认为false
         */
        private boolean cleanSession;
    
        /**
         * 自动连接,默认为true
         */
        private boolean automaticReconnect = true;
    
        /**
         * 生产者
         */
        private Client producer = new Client();
    
        /**
         * 消费者
         */
        private Client subscriber = new Client();
    
        @Data
        public class Client {
            /**
             * 客户端id
             */
            private String clientId;
    
            /**
             * 默认主题
             */
            private String topic;
    
            /**
             * 传输质量
             * QoS 0:最多分发一次
             * QoS 1:至少分发一次(默认)
             * QoS 2:只分发一次
             */
            private int qos = 1;
        }
    
        @Bean
        public MqttPahoClientFactory mqttPahoClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            // 连接参数
            MqttConnectOptions options = new MqttConnectOptions();
            options.setServerURIs(new String[]{url});
            if (StringUtils.isNotBlank(this.username)) {
                options.setUserName(this.username);
            }
            if (StringUtils.isNotBlank(this.password)) {
                options.setPassword(this.password.toCharArray());
            }
            // 心跳时间
            options.setKeepAliveInterval(this.keepAliveInterval);
            // 断开是否自动重联
            options.setAutomaticReconnect(this.automaticReconnect);
            // 保持session,客户端上线后会接受到它离线的这段时间的消息
            options.setCleanSession(this.cleanSession);
            // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
    //        options.setWill("willTopic", WILL_DATA, 2, false);
            factory.setConnectionOptions(options);
            return factory;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    package com.qiangesoft.mqtt.constant;
    
    /**
     * mqtt通用常量信息
     *
     * @author qiangesoft
     * @date 2024-04-24
     */
    public class MqttConstant {
    
        /**
         * 生产者管道
         */
        public static final String OUTBOUND_CHANNEL = "outboundChannel";
    
        /**
         * 消费者管道
         */
        public static final String INBOUND_CHANNEL = "inboundChannel";
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.4 生产者

    package com.qiangesoft.mqtt.producer;
    
    import com.qiangesoft.mqtt.config.MqttProperty;
    import com.qiangesoft.mqtt.constant.MqttConstant;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    
    /**
     * mqtt生产者
     *
     * @author qiangesoft
     * @date 2024-04-24
     */
    @Configuration
    public class MqttProducerConfig {
    
        @Autowired
        private MqttProperty mqttProperty;
        @Autowired
        private MqttPahoClientFactory mqttPahoClientFactory;
    
        /**
         * 消息生产通道
         *
         * @return
         */
        @Bean(name = MqttConstant.OUTBOUND_CHANNEL)
        public MessageChannel outboundChannel() {
            return new DirectChannel();
        }
    
        /**
         * 消息发布
         *
         * @return
         */
        @Bean
        @ServiceActivator(inputChannel = MqttConstant.OUTBOUND_CHANNEL)
        public MessageHandler mqttOutbound() {
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperty.getProducer().getClientId(), mqttPahoClientFactory);
            messageHandler.setAsync(false);
            messageHandler.setDefaultQos(mqttProperty.getProducer().getQos());
            messageHandler.setDefaultTopic(mqttProperty.getProducer().getTopic());
            return messageHandler;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    3.5 消费者

    package com.qiangesoft.mqtt.subscriber;
    
    import com.qiangesoft.mqtt.config.MqttProperty;
    import com.qiangesoft.mqtt.constant.MqttConstant;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.core.MessageProducer;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
    import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    
    import java.util.Objects;
    
    /**
     * mqtt消费者
     *
     * @author qiangesoft
     * @date 2024-04-24
     */
    @Slf4j
    @Configuration
    public class MqttSubscriberConfig {
    
        @Autowired
        private MqttProperty mqttProperty;
        @Autowired
        private MqttPahoClientFactory mqttPahoClientFactory;
    
        /**
         * 消息订阅通道
         *
         * @return
         */
        @Bean(name = MqttConstant.INBOUND_CHANNEL)
        public MessageChannel inboundChannel() {
            return new DirectChannel();
        }
    
        /**
         * 消息订阅通道绑定
         *
         * @return
         */
        @Bean
        public MessageProducer mqttInbound() {
            MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperty.getSubscriber().getClientId(),
                    mqttPahoClientFactory, mqttProperty.getSubscriber().getTopic());
            adapter.setCompletionTimeout(mqttProperty.getCompletionTimeout());
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(mqttProperty.getSubscriber().getQos());
            adapter.setOutputChannel(inboundChannel());
            return adapter;
        }
    
        /**
         * 消息订阅
         *
         * @return
         */
        @Bean
        @ServiceActivator(inputChannel = MqttConstant.INBOUND_CHANNEL)
        public MessageHandler messageHandler() {
            return message -> {
                try {
                    String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
                    log.info("订阅主题为: {}", topic);
                    String payload = message.getPayload().toString();
                    log.info("订阅接收到消息:{}", payload);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            };
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    3.6 消息发送网关

    package com.qiangesoft.mqtt.service;
    
    import com.qiangesoft.mqtt.constant.MqttConstant;
    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.mqtt.support.MqttHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    /**
     * 消息发送网关
     *
     * @author qiangesoft
     * @date 2024-04-24
     */
    @Component
    @MessagingGateway(defaultRequestChannel = MqttConstant.OUTBOUND_CHANNEL)
    public interface MqttGateway {
    
        /**
         * 发送到mqtt
         *
         * @param payload 消息内容
         */
        void sendToMqtt(String payload);
    
        /**
         * 发送到mqtt
         *
         * @param topic   主题
         * @param payload 消息内容
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    
        /**
         * 发送到mqtt
         *
         * @param topic   主题
         * @param qos     qos
         * @param payload 消息内容
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    3.7 发送测试

    package com.qiangesoft.mqtt.controller;
    
    import com.qiangesoft.mqtt.service.MqttGateway;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * 控制器
     *
     * @author qiangesoft
     * @date 2024-04-24
     */
    @RestController
    @RequestMapping("/mqtt")
    public class MqttController {
    
        @Autowired
        private MqttGateway mqttGateway;
    
        @GetMapping("/send")
        public String send(String message) {
            mqttGateway.sendToMqtt(message);
            return "success";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    HomeAssistant——Intrgration开发
    这 6 款在线 PDF 转换工具,得试
    【微信测试号实战——02】编写你独有的微信消息模板
    Openstack部署
    GitHub 报告发布:TypeScript 取代 Java 成为第三受欢迎语言
    java计算机毕业设计框架的企业机械设备智能管理系统的设计与实现源码+数据库+系统+lw文档+mybatis+运行部署
    NSSCTF CRYPTO 题解(四)
    面向对象编程·下
    ssm 基于springboot的车辆故障管理系统Java
    Flink CDC
  • 原文地址:https://blog.csdn.net/weixin_39311781/article/details/138183141