1. Spring boot项目,集成mqtt,订阅物联网设备数据上报topic1
2. 处理topic1,并异步入库到mongodb
3.订阅MQTT客户端上下线主题
4. mongodb 根据设备id分集合
准备工作:需要在EMQX 5.0允许订阅:$SYS/brokers/+/clients/# (客户端上下线主题)
第一步:打开授权文件

第二步:添加允许后,更新即可
{allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.

-
- <dependency>
- <groupId>org.eclipse.pahogroupId>
- <artifactId>org.eclipse.paho.client.mqttv3artifactId>
- <version>1.2.2version>
- dependency>
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-data-mongodbartifactId>
- dependency>
- <dependency>
- <groupId>com.spring4allgroupId>
- <artifactId>mongodb-plus-spring-boot-starterartifactId>
- dependency>
- <parent>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-parentartifactId>
- <version>2.3.5.RELEASEversion>
- <relativePath/>
- parent>
- <dependencyManagement>
- <dependencies>
-
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-dependenciesartifactId>
- <version>Hoxton.SR12version>
- <type>pomtype>
- <scope>importscope>
- dependency>
- dependencies>
- spring:
- profiles:
- active: prod
- data:
- mongodb:
- uri: mongodb://root:${mongo-password}@${host}:${mongodbport}/hzd
- option:
- # 最小连接数
- min-connection-per-host: 10
- # 最大连接数
- max-connection-per-host: 200
- #允许阻塞连接乘数的线程
- threads-allowed-to-block-for-connection-multiplier: 5
- #服务器超时
- server-selection-timeout: 30000
- #最长等待时间
- max-wait-time: 120000
- #最大连接空闲时间
- max-connection-idle-time: 0
- #最大连接生存时间
- max-connection-life-time: 0
- #连接超时
- connect-timeout: 10000
- #套接字超时10000
- socket-timeout: 0
- socket-keep-alive: false
- ssl-enabled: false
- ssl-invalid-host-name-allowed: false
- always-use-m-beans: false
- heartbeat-socket-timeout: 20000
- heartbeat-connect-timeout: 20000
- min-heartbeat-frequency: 500
- heartbeat-frequency: 10000
- local-threshold: 15
- package com.hzd.mqtt.client;
-
-
- import com.hzd.mqtt.enums.QosEnum;
- import com.hzd.mqtt.properties.MqttProperties;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
-
- /**
- * Created by
- */
- @Component
- @Slf4j
- public class EmqClient {
-
- private IMqttClient client;
-
- @Autowired
- private MqttProperties mqttProperties;
-
- @Autowired
- private MqttCallback mqttCallback;
-
- @Autowired
- private MqttProperties properties;
-
- private static final String topic_hzd = "/hzd/pub/#";
- private static final String topic_device_connect = "$SYS/brokers/+/clients/#";
-
- @PostConstruct
- public void init() {
- MqttClientPersistence mempersitence = new MemoryPersistence();
- try {
- if (client == null) {
- client = new MqttClient(mqttProperties.getBrokerUrl(), mqttProperties.getClientId(), mempersitence);
- }
- } catch (MqttException e) {
- log.error("初始化客户端mqttClient对象失败,errormsg={},brokerUrl={},clientId={}", e.getMessage(), mqttProperties.getBrokerUrl(), mqttProperties.getClientId());
- }
- //连接服务端
- connect(properties.getUsername(), properties.getPassword());
- //订阅一个主题
-
- subscribe(topic_hzd, QosEnum.QoS2);
- subscribe(topic_device_connect, QosEnum.QoS2);
- log.info("EmqClient连接成功,订阅成功.topic={},{}", topic_hzd, topic_device_connect);
-
- }
-
-
- /**
- * 连接broker
- *
- * @param username
- * @param password
- */
- public void connect(String username, String password) {
- MqttConnectOptions options = new MqttConnectOptions();
- options.setAutomaticReconnect(true);
- options.setUserName(username);
- options.setPassword(password.toCharArray());
- options.setCleanSession(true);
- client.setCallback(mqttCallback);
-
- try {
- client.connect(options);
- } catch (MqttException e) {
- log.error("mqtt客户端连接服务端失败,失败原因{}", e.getMessage());
- }
- }
-
- /**
- * 断开连接
- */
- @PreDestroy
- public void disConnect() {
- try {
- client.disconnect();
- } catch (MqttException e) {
- log.error("断开连接产生异常,异常信息{}", e.getMessage());
- }
- }
-
- /**
- * 重连
- */
- public void reConnect() {
- try {
- client.reconnect();
- while (true){
- if (client.isConnected()){
- subscribe(topic_hzd, QosEnum.QoS2);
- subscribe(topic_device_connect, QosEnum.QoS2);
- break;
- }else{
- Thread.sleep(100);
- log.info("重连中,请稍后......");
- }
- }
- log.info("EmqClient连接成功,订阅成功.topic={},{}", topic_hzd, topic_device_connect);
- } catch (MqttException | InterruptedException e) {
- log.error("重连失败,失败原因{}", e.getMessage());
- }
- }
-
- /**
- * 发布消息
- *
- * @param topic
- * @param msg
- * @param qos
- * @param retain
- */
- public void publish(String topic, String msg, QosEnum qos, boolean retain) {
-
- MqttMessage mqttMessage = new MqttMessage();
- mqttMessage.setPayload(msg.getBytes());
- mqttMessage.setQos(qos.value());
- mqttMessage.setRetained(retain);
- try {
- client.publish(topic, mqttMessage);
- } catch (MqttException e) {
- log.error("发布消息失败,errormsg={},topic={},msg={},qos={},retain={}", e.getMessage(), topic, msg, qos.value(), retain);
- }
-
- }
-
- /**
- * 订阅
- *
- * @param topicFilter
- * @param qos
- */
- public void subscribe(String topicFilter, QosEnum qos) {
- try {
- client.subscribe(topicFilter, qos.value());
- } catch (MqttException e) {
- log.error("订阅主题失败,errormsg={},topicFilter={},qos={}", e.getMessage(), topicFilter, qos.value());
- }
-
- }
-
- /**
- * 取消订阅
- *
- * @param topicFilter
- */
- public void unSubscribe(String topicFilter) {
- try {
- client.unsubscribe(topicFilter);
- } catch (MqttException e) {
- log.error("取消订阅失败,errormsg={},topicfiler={}", e.getMessage(), topicFilter);
- }
- }
-
- }
- package com.hzd.mqtt.client;
-
- import com.hzd.mqtt.async.DataStorage;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
-
- /**
- *
- */
- @Component
- @Slf4j
- public class MessageCallback implements MqttCallback {
-
-
- @Autowired
- private DataStorage dataStorage;
-
- @Autowired
- private EmqClient emqClient;
-
- public static MessageCallback MessageCallback;
-
-
- @PostConstruct //通过@PostConstruct实现初始化bean之前进行的操作
- public void init() {
- log.info("初始化");
- MessageCallback = this;
- // MessageCallback.dataPageRepository = this.dataPageRepository;
- MessageCallback.dataStorage = this.dataStorage;
- }
-
-
- /**
- * 丢失了对服务端的连接后触发的回调
- *
- * @param cause
- */
- @Override
- public void connectionLost(Throwable cause) {
- // 资源的清理 重连
- log.info("断开emqx连接,开始重连......");
- emqClient.reConnect();
- log.info("emqx连接成功......");
- }
-
- /**
- * 应用收到消息后触发的回调
- *
- * @param topic
- * @param message
- * @throws Exception
- */
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- dataStorage.saveDataToMongo(topic, message);
- }
-
- /**
- * 消息发布者消息发布完成产生的回调
- *
- * @param token
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- int messageId = token.getMessageId();
- String[] topics = token.getTopics();
- log.info("消息发布完成,messageid={},topics={}", messageId, topics);
- }
- }
- package com.hzd.mqtt.async;
-
- import com.alibaba.fastjson2.JSON;
- import com.hzd.entity.po.ControlLog;
- import com.hzd.entity.po.DataExtend;
- import com.hzd.entity.po.MqttClientLog;
- import com.hzd.entity.vo.MqttPayload;
- import com.hzd.entity.vo.MqttTopic;
- import com.hzd.mqtt.enums.TopicType;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.springframework.data.mongodb.UncategorizedMongoDbException;
- import org.springframework.data.mongodb.core.MongoTemplate;
- import org.springframework.data.mongodb.core.query.Criteria;
- import org.springframework.data.mongodb.core.query.Query;
- import org.springframework.data.mongodb.core.query.Update;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.util.Date;
-
-
- /**
- * Coding by 李炯 on 2022/10/28 15:12
- */
- @Component
- @Slf4j
- public class DataStorage {
-
- @Resource
- private MongoTemplate mongoTemplate;
-
- // @Resource
- // private DataPageRepository dataPageRepository;
-
- @Async
- public void saveDataToMongo(String topic, MqttMessage message) {
- MqttTopic t = MqttTopic.init(topic);
- String payload = new String(message.getPayload());
- if (t == null) return;
- if (payload.contains("\"type\":\"data\"")) {
- DataExtend dataExtend = DataExtend.init(payload);
- mongoTemplate.insert(dataExtend, t.getFac_id());
- log.info("设备数据上报,topic={},fac_id={},messageid={},qos={},payload={}",
- topic, t.getFac_id(), message.getId(), message.getQos(), payload);
- } else if (payload.contains("\"type\":\"config\"")) {
- ControlLog controlLog = ControlLog.init(payload);
- Query query = new Query(Criteria.where("_id").is(controlLog.get_id()));
- Update update = Update.update("status", controlLog.getStatus());
- update.set("status", controlLog.getStatus());
- update.set("msg", controlLog.getMsg());
- if (null != controlLog.getRelayIndexLst()) {
- update.set("relayIndexLst", controlLog.getRelayIndexLst());
- }
- if (null != controlLog.getRelayNumLst()) {
- update.set("relayNumLst", controlLog.getRelayNumLst());
- }
- if (null != controlLog.getReadInterval()) {
- update.set("readInterval", controlLog.getReadInterval());
- }
- if (null != controlLog.getRelayphoto()) {
- update.set("relayphoto", controlLog.getRelayphoto());
- }
- if (null != controlLog.getLongitude()) {
- update.set("longitude", controlLog.getLongitude());
- }
- if (null != controlLog.getLatitude()) {
- update.set("latitude", controlLog.getLatitude());
- }
- if (null != controlLog.getControlStatus()) {
- update.set("controlStatus", controlLog.getControlStatus());
- }
- if (null != controlLog.getReserve()) {
- update.set("reserve", controlLog.getReserve());
- }
- update.set("updateTime", new Date());
- mongoTemplate.upsert(query, update, "log_control");
- log.info("{},topic={},messageid={},qos={},payload={}",
- "配置返回", topic, message.getId(), message.getQos(), payload);
- } else if (t.getTopicType() == TopicType.connect) {
- MqttPayload pl = MqttPayload.init(payload);
- MqttClientLog clientLog = MqttClientLog.init(pl);
- updateConnectStatus(topic, clientLog);
- log.info("{},topic={},messageid={},qos={},payload={}",
- clientLog.getStatus(), topic, message.getId(), message.getQos(), payload);
- }
- }
-
- @Async
- public void updateConnectStatus(String topic, MqttClientLog clientLog) {
- try{
- MqttTopic t = MqttTopic.init(topic);
- if (t == null) return;
- mongoTemplate.insert(clientLog, "log_mqtt_client");
- Query query = new Query(Criteria.where("username").is(clientLog.getUserName())
- .and("deviceid").is(clientLog.getDeviceId()));
- Update update = Update.update("connectStatus", clientLog.getConnectStatus());
- update.set("status", clientLog.getStatus());
- update.set("updateTime", clientLog.getUpdateTime());
- mongoTemplate.upsert(query, update, "status_mqtt_client");
- }catch (Exception e){
- log.error("mongoDB 错误:MqttClientLog={},错误消息为{}",JSON.toJSONString(clientLog),e.getMessage());
- }
-
-
- }
- }