• Springboot 结合 MQTT、Redis ,对接硬件以及做消息分发,最佳实践


    Springboot 结合 mqttredis对接硬件以及做消息分发,最佳实践

    一,认识

    需要了解EMQX 基本知识原理,不了解的可以查看我之间的博客,以及网上的资料,这里不在过多撰述。

    二,开发思路

    这里以对接雷达水位计为例:

    说一下思路, 这里场景各种设备连接 EMQX ,然后通过 EMQX 上报数据,和接收服务器下发的指令。

    我们需要部署一个 EMQX 服务器, 设备配置我们的服务器ip和端口连接到 EMQX 。

    那么我们开发EMQX 的思路应该是什么样子的。

    1. mqtt 客户端订阅相关主题;

    2. 数据库保存数据设备产品项目定义主题,存到redis;

    3. 通过主题做出相关数据分析;

    三,准备工作

    3.1 引入Springboot-mqtt依赖

    Springboot 依赖, MQTT依赖

    	<parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.1.3.RELEASEversion>
        parent>
    
    	<dependency>
            
            <dependency>
    			<groupId>org.springframework.bootgroupId>
    			<artifactId>spring-boot-starter-integrationartifactId>
    		dependency>
    		<dependency>
    			<groupId>org.springframework.integrationgroupId>
    			<artifactId>spring-integration-streamartifactId>
    		dependency>
    		<dependency>
    			<groupId>org.springframework.integrationgroupId>
    			<artifactId>spring-integration-mqttartifactId>
    		dependency>
        dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    其他相关依赖 不在撰写, 数据库依赖以及 工具类依赖 ,自己按需引用

    四,编写代码

    4.1 编写MQTT配置类

    不在过多解释代码,每行都有注释

    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.PropertySource;
    import org.springframework.stereotype.Component;
    
    /**
     * 功能描述: 配置类
     *
     * @Author keLe
     * @Date 2022/10/31
     */
    @Data
    @Component
    @Configuration
    @PropertySource("classpath:application.yml")
    @ConfigurationProperties(prefix = "mqtt")
    public class MqttProperties {
    
        /**服务器地址url*/
        private String host;
    
        /**客户端唯一ID*/
        private String clientId;
    
        /**用户名*/
        private String userName;
    
        /**密码*/
        private String passWord;
    
        /**超时时间*/
        private Integer timeOut;
    
        /**保活时间*/
        private Integer keepaLive;
    
        /**是否清除会话*/
        private boolean clearSession;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    appliction.yml

    mqtt:
        host: tcp://xx.xx.xx.xx:1883 #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
        clientId: ${random.int}  #MQTT-连接服务器默认客户端ID
        userName: admin   #MQTT-用户名
        passWord: admin #MQTT-密码
        default-topic: test #MQTT-默认的消息推送主题,实际可在调用接口时指定
        timeOut: 1000 #连接超时
        keepaLive: 30   #设置会话心跳时间
        clearSession: true  #清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    4.2 编写MQTT客户端,处理创建,连接,订阅,发布等功能

    import com.joygis.mqtt.MqttProperties;
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.util.Arrays;
    
    /**
     * 功能描述: mqtt客户端
     *
     * @Author keLe
     * @Date 2022/10/31
     */
    @Slf4j
    @Component
    public class MqttCustomerClient {
    
        @Resource
        private MqttCallback mqttCallback;
    
        @Resource
        private MqttProperties mqttProperties;
    
        /**
         * 连接配置
         */
        private MqttConnectOptions options;
    
        /**
         * MQTT异步客户端
         */
        public static MqttAsyncClient client;
    
        /**
         * 功能描述: 客户端连接
         *
         * @Author keLe
         * @Date 2022/10/31
         */
        public void connect() {
            if (mqttProperties == null) {
                log.error("【mqtt异常】:连接失败,配置文件缺失。");
                return;
            }
            //设置配置
            if (options == null) {
                setOptions();
            }
            //创建客户端
            if (client == null) {
                createClient();
            }
            while (!client.isConnected()) {
                try {
                    IMqttToken token = client.connect(options);
                    token.waitForCompletion();
                } catch (Exception e) {
                    log.error("【mqtt异常】:mqtt连接失败,message={}", e.getMessage());
                }
            }
        }
    
        /**
         * 功能描述: 创建客户端
         *
         * @Author keLe
         * @Date 2022/10/31
         */
        private void createClient() {
            if (client == null) {
                try {
                  /*host为主机名,clientId是连接MQTT的客户端ID,MemoryPersistence设置clientId的保存方式
                    默认是以内存方式保存*/
                    client = new MqttAsyncClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence());
                    //设置回调函数
                    client.setCallback(mqttCallback);
                    log.info("【mqtt】:mqtt客户端启动成功");
                } catch (MqttException e) {
                    log.error("【mqtt异常】:mqtt客户端连接失败,error={}", e.getMessage());
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 功能描述: 设置连接属性
         *
         * @Author keLe
         * @Date 2022/10/31
         */
        private void setOptions() {
            if (options != null) {
                options = null;
            }
            if (mqttProperties == null) {
                log.error("【mqtt异常】连接失败,失败原因:配置文件缺失。");
                return;
            }
            options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(mqttProperties.getUserName());
            options.setPassword(mqttProperties.getPassWord().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeOut());
            options.setKeepAliveInterval(mqttProperties.getKeepaLive());
            //设置自动重新连接
            options.setAutomaticReconnect(true);
            options.setCleanSession(mqttProperties.isClearSession());
        }
    
        /**
         * 功能描述: 断开与mqtt的连接
         *
         * @Author keLe
         * @Date 2022/10/31
         */
        public synchronized void disconnect() {
            //判断客户端是否null 是否连接
            if (client != null && client.isConnected()) {
                try {
                    IMqttToken token = client.disconnect();
                    token.waitForCompletion();
                } catch (MqttException e) {
                    log.error("【mqtt异常】: 断开mqtt连接发生错误,message={}", e.getMessage());
                }
            }
            client = null;
        }
    
        /**
         * 功能描述: 重新连接MQTT
         *
         * @Author keLe
         * @Date 2022/10/31
         */
        public synchronized void refresh() {
            disconnect();
            setOptions();
            createClient();
            connect();
        }
    
        /**
         * 功能描述: 发布
         * @param qos         连接方式
         * @param retained    是否保留
         * @param topic       主题
         * @param pushMessage 消息体
         * @Author keLe
         * @Date 2022/10/31
         */
        public void publish(int qos, boolean retained, String topic, String pushMessage) {
            log.info("【mqtt】:发布主题" + topic);
            MqttMessage message = new MqttMessage();
            message.setQos(qos);
            message.setRetained(retained);
            message.setPayload(pushMessage.getBytes());
    
            try {
                IMqttDeliveryToken token = client.publish(topic,message);
                token.waitForCompletion();
            } catch (MqttPersistenceException e) {
                e.printStackTrace();
            } catch (MqttException e) {
                log.error("【mqtt异常】: 发布主题时发生错误 topic={},message={}",topic,e.getMessage());
            }
        }
    
        /**
         * 功能描述: 订阅某个主题
         * @param topic 主题
         * @param qos   消息质量
         *              Qos1:消息发送一次,不确保
         *              Qos2:至少分发一次,服务器确保接收消息进行确认
         *              Qos3:只分发一次,确保消息送达和只传递一次
         * @Author keLe
         * @Date 2022/10/31
         */
        public void subscribe(String topic, int qos){
            log.info("【mqtt】:订阅了主题 topic={}",topic);
            try {
                IMqttToken token = client.subscribe(topic, qos);
                token.waitForCompletion();
            }catch (MqttException e){
                log.error("【mqtt异常】:订阅主题 topic={} 失败 message={}",topic,e.getMessage());
            }
        }
    
        /**
         * 功能描述: 订阅某些主题
         * @param topic 主题
         * @param qos   消息质量
         *              Qos1:消息发送一次,不确保
         *              Qos2:至少分发一次,服务器确保接收消息进行确认
         *              Qos3:只分发一次,确保消息送达和只传递一次
         * @Author keLe
         * @Date 2022/10/31
         */
        public void subscribe(String[] topic,int[] qos){
            log.info("【mqtt】:订阅了主题 topic={}", Arrays.toString(topic));
            try {
                IMqttToken token = client.subscribe(topic,qos);
                token.waitForCompletion();
            }catch (MqttException e){
                log.error("【mqtt异常】:订阅主题 topic={} 失败 message={}",topic,e.getMessage());
            }
        }
    
        /**是否处于连接状态*/
        public boolean isConnected(){
            return client != null && client.isConnected();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215

    4.3 编写MQTT 回调监听器

    /**
     * 功能描述: 消费监听
     *
     * @Author keLe
     * @Date 2022/10/31
     */
    @Slf4j
    @Component
    public class MqttCallback implements MqttCallbackExtended {
    
        @Resource
        private MqttService mqttService;
    
        @Override
        public void connectionLost(Throwable throwable) {
            log.error("【mqtt异常】:断开连接....");
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            mqttService.subscribeCallback(topic,message);
        }
    
        /**
         * 功能描述: 发布消息后,到达MQTT服务器,服务器回调消息接收
         * @param token  Mqtt传递令牌
         * @Author keLe
         * @Date 2022/10/31
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            log.info("【mqtt】交付完成:{}",token.isComplete());
        }
    
        /**
         * 功能描述: 监听mqtt连接消息
         * @param reconnect 是否重连
         * @param serverUrl 服务地址
         * @Author keLe
         * @Date 2022/10/31
         */
        @Override
        public void connectComplete(boolean reconnect, String serverUrl) {
            log.info("mqtt已经连接!!");
            //连接后,可以在此做初始化事件,或订阅
            try {
                mqttService.subscribe(MqttCustomerClient.client);
            } catch (MqttException e) {
                log.error("======>>>>>订阅主题失败 error={}",e.getMessage());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    4.4 编写MQTT 业务接口,处理订阅发布

    import cn.hutool.core.collection.CollectionUtil;
    import com.alibaba.fastjson.JSON;
    import com.joygis.common.constant.Constants;
    import com.joygis.common.core.redis.RedisCache;
    import com.joygis.mqtt.client.MqttCustomerClient;
    import com.joygis.mqtt.domian.SubscribeConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.util.List;
    
    @Slf4j
    @Service
    public class MqttService {
    
        @Resource
        private MqttCustomerClient mqttCustomerClient;
    
        @Autowired
        private RedisCache redisCache;
    
        /**
         * 功能描述: 订阅主题
         * @param client MQTT异步客户端
         * @Author keLe
         * @Date 2022/10/31
         */
        public void subscribe(MqttAsyncClient client) throws MqttException {
            //获取主题
            List<String> cacheList = redisCache.getCacheList(Constants.SUB_CONFIG_KEY + "topic");
            if(CollectionUtil.isEmpty(cacheList)){
                log.error("【mqtt异常】:redis缓存中,无法获取主题相关信息!");
                return;
            }
            String[] topicFilters = cacheList.toArray(new String[cacheList.size()]);
            int[] qos = new int[cacheList.size()];
            for(int i = 0 ; i<cacheList.size() ; i++){
                qos[i] = 1;
            }
            // 订阅
            client.subscribe(topicFilters, qos);
            log.info("mqtt订阅了设备信息和物模型主题");
        }
    
        /**
         * 功能描述: 消息回调方法
         * @param topic  主题
         * @param mqttMessage 消息体
         * @Author keLe
         * @Date 2022/10/31
         */
        @Async
        public void subscribeCallback(String topic, MqttMessage mqttMessage) throws InterruptedException {
            /**测试线程池使用*/
            log.info("====>>>>线程名--{}",Thread.currentThread().getName());
            /**模拟耗时操作*/
            // Thread.sleep(1000);
            // subscribe后得到的消息会执行到这里面
            String message = new String(mqttMessage.getPayload());
            log.info("接收消息主题 : " + topic);
            log.info("接收消息Qos : " + mqttMessage.getQos());
            log.info("接收消息内容 : " + message);
            String key = Constants.SUB_CONFIG_KEY+topic;
            SubscribeConfig subscribeConfig = redisCache.getCacheObject(key);
            //TODO 这里使用通过数据取到相应的bean 动态去调用接口解析数据
        }
    
        /**
         * 功能描述: 发布设备状态
         * @Author keLe
         * @Date 2022/10/31
         * @param  productId 产品id
         * @param  deviceNum 设备编号
         * @param  deviceStatus 设备状态
         * @param  isShadow 影子模式
         * @param  rssi 编号
         */
        public void publishStatus(Long productId, String deviceNum, int deviceStatus, int isShadow,int rssi) {
            String message = "{\"status\":" + deviceStatus + ",\"isShadow\":" + isShadow + ",\"rssi\":" + rssi + "}";
            mqttCustomerClient.publish(1, false, "/" + productId + "/" + deviceNum + "", message);
        }
    
        /**
         * 功能描述: 发布设备状态
         * @Author keLe
         * @Date 2022/10/31
         * @param  productId 产品id
         * @param  deviceNum 设备编号
         */
        public void publishInfo(Long productId, String deviceNum) {
            mqttCustomerClient.publish(1, false, "/" + productId + "/" + deviceNum + "", "");
        }
    
        /**
         * 功能描述: 发布设备状态
         * @Author keLe
         * @Date 2022/10/31
         * @param  productId 产品id
         * @param  deviceNum 设备编号
         */
        public void publishFunction(Long productId, String deviceNum, List<String> thingsList) {
            if (thingsList == null) {
                mqttCustomerClient.publish(1, true, "/" + productId + "/" + deviceNum + "", "");
            } else {
                mqttCustomerClient.publish(1, true, "/" + productId + "/" + deviceNum + "", JSON.toJSONString(thingsList));
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116

    4.5 Redis初始化,加载数据库 topic

    @Service
    public class SubscribeConfigServiceImpl implements ISubscribeConfigService {
        @Autowired
        private RedisCache redisCache;
    
        @Resource
        private SubscribeConfigMapper subscribeConfigMapper;
    
        @PostConstruct
        public void init() {
            loadingConfigCache();
        }
        
        @Override
        public void loadingConfigCache() {
            List<SubscribeConfig> configsList = subscribeConfigMapper.selectSubscribeConfigList(new SubscribeConfig());
            if(CollectionUtil.isNotEmpty(configsList)){
                for (SubscribeConfig config : configsList) {
                    redisCache.setCacheObject(getCacheKey(config.getTopic()), config);
                }
                List<String> topicList = configsList.stream().map(SubscribeConfig::getTopic).collect(Collectors.toList());
                redisCache.deleteObject(getCacheKey("topic"));
                redisCache.setCacheList(getCacheKey("topic"), topicList);
            }
        }
    
        /**
         * 设置cache key
         *
         * @param configKey 参数键
         * @return 缓存键key
         */
        private String getCacheKey(String configKey) {
            return Constants.SUB_CONFIG_KEY + configKey;
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    4.6 Springboot 启动,MQTT也自启动,任务定时器池也启动

    package com.joygis.iot.config;
    import com.joygis.mqtt.client.MqttCustomerClient;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * 功能描述: spring 容器创建完成之后,开始创建mqtt客户端
     *
     * @Author keLe
     * @Date 2022/10/31
     */
    @Order(value = 1 )
    @Component
    public class MqttStart implements ApplicationRunner {
    
        @Resource
        private MqttCustomerClient mqttCustomerClient;
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            mqttCustomerClient.connect();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    五 ,测试

    启动服务

    在这里插入图片描述

  • 相关阅读:
    【深度学习目标检测】十九、基于深度学习的芒果计数分割系统-含数据集、GUI和源码(python,yolov8)
    beanstalkd 启动跟停止【经常使用 nohup 和 & 配合来启动程序,如: nohup ./test &同时免疫SIGINT和SIGHUP信号】
    (pytorch进阶之路)交叉熵、信息熵、二分类交叉熵、负对数似然、KL散度、余弦相似度
    网络学习:邻居发现协议NDP
    金融德语翻译,常用词汇有哪些
    UnitAuto——机器学习单元测试平台 (三)
    BIOS < UEFI
    Img标签的src地址自动拼接本地域名(localhost:8080)导致图片不显示问题
    JavaSE - 包(package)
    【InfoQ】博睿数据CTO孟曦东访谈实录:可观测性技术是未来发展方向
  • 原文地址:https://blog.csdn.net/Crazy_Cw/article/details/127892651