• 【EMQX】 Spring Cloud 集成MQTT并异步入库(mongodb)


    1. 需求:

            1. Spring boot项目,集成mqtt,订阅物联网设备数据上报topic1

            2. 处理topic1,并异步入库到mongodb

            3.订阅MQTT客户端上下线主题

            4. mongodb 根据设备id分集合

    2. 代码实现

    准备工作:需要在EMQX 5.0允许订阅:$SYS/brokers/+/clients/# (客户端上下线主题)

    第一步:打开授权文件

    第二步:添加允许后,更新即可

    {allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.

    2.1 引入pom

    • 子项目(本项目)
    1. <dependency>
    2. <groupId>org.eclipse.pahogroupId>
    3. <artifactId>org.eclipse.paho.client.mqttv3artifactId>
    4. <version>1.2.2version>
    5. dependency>
    6. <dependency>
    7. <groupId>org.springframework.bootgroupId>
    8. <artifactId>spring-boot-starter-data-mongodbartifactId>
    9. dependency>
    10. <dependency>
    11. <groupId>com.spring4allgroupId>
    12. <artifactId>mongodb-plus-spring-boot-starterartifactId>
    13. dependency>
    • 父-引入pom
    1. <parent>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-parentartifactId>
    4. <version>2.3.5.RELEASEversion>
    5. <relativePath/>
    6. parent>
    7. <dependencyManagement>
    8. <dependencies>
    9. <dependency>
    10. <groupId>org.springframework.cloudgroupId>
    11. <artifactId>spring-cloud-dependenciesartifactId>
    12. <version>Hoxton.SR12version>
    13. <type>pomtype>
    14. <scope>importscope>
    15. dependency>
    16. dependencies>

    2.2 启动类引入注解 @EnableMongoPlus

    2.3  application.yml mongodb配置

    1. spring:
    2. profiles:
    3. active: prod
    4. data:
    5. mongodb:
    6. uri: mongodb://root:${mongo-password}@${host}:${mongodbport}/hzd
    7. option:
    8. # 最小连接数
    9. min-connection-per-host: 10
    10. # 最大连接数
    11. max-connection-per-host: 200
    12. #允许阻塞连接乘数的线程
    13. threads-allowed-to-block-for-connection-multiplier: 5
    14. #服务器超时
    15. server-selection-timeout: 30000
    16. #最长等待时间
    17. max-wait-time: 120000
    18. #最大连接空闲时间
    19. max-connection-idle-time: 0
    20. #最大连接生存时间
    21. max-connection-life-time: 0
    22. #连接超时
    23. connect-timeout: 10000
    24. #套接字超时10000
    25. socket-timeout: 0
    26. socket-keep-alive: false
    27. ssl-enabled: false
    28. ssl-invalid-host-name-allowed: false
    29. always-use-m-beans: false
    30. heartbeat-socket-timeout: 20000
    31. heartbeat-connect-timeout: 20000
    32. min-heartbeat-frequency: 500
    33. heartbeat-frequency: 10000
    34. local-threshold: 15

    2.4 代码

    1. EmqClient.java                  连接emqx

    • init()  使用@PostConstruct注解,开机自动连接EMQX + 订阅
    • reConnect()  断线重连后,订阅主题
    1. package com.hzd.mqtt.client;
    2. import com.hzd.mqtt.enums.QosEnum;
    3. import com.hzd.mqtt.properties.MqttProperties;
    4. import lombok.extern.slf4j.Slf4j;
    5. import org.eclipse.paho.client.mqttv3.*;
    6. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    7. import org.springframework.beans.factory.annotation.Autowired;
    8. import org.springframework.stereotype.Component;
    9. import javax.annotation.PostConstruct;
    10. import javax.annotation.PreDestroy;
    11. /**
    12. * Created by
    13. */
    14. @Component
    15. @Slf4j
    16. public class EmqClient {
    17. private IMqttClient client;
    18. @Autowired
    19. private MqttProperties mqttProperties;
    20. @Autowired
    21. private MqttCallback mqttCallback;
    22. @Autowired
    23. private MqttProperties properties;
    24. private static final String topic_hzd = "/hzd/pub/#";
    25. private static final String topic_device_connect = "$SYS/brokers/+/clients/#";
    26. @PostConstruct
    27. public void init() {
    28. MqttClientPersistence mempersitence = new MemoryPersistence();
    29. try {
    30. if (client == null) {
    31. client = new MqttClient(mqttProperties.getBrokerUrl(), mqttProperties.getClientId(), mempersitence);
    32. }
    33. } catch (MqttException e) {
    34. log.error("初始化客户端mqttClient对象失败,errormsg={},brokerUrl={},clientId={}", e.getMessage(), mqttProperties.getBrokerUrl(), mqttProperties.getClientId());
    35. }
    36. //连接服务端
    37. connect(properties.getUsername(), properties.getPassword());
    38. //订阅一个主题
    39. subscribe(topic_hzd, QosEnum.QoS2);
    40. subscribe(topic_device_connect, QosEnum.QoS2);
    41. log.info("EmqClient连接成功,订阅成功.topic={},{}", topic_hzd, topic_device_connect);
    42. }
    43. /**
    44. * 连接broker
    45. *
    46. * @param username
    47. * @param password
    48. */
    49. public void connect(String username, String password) {
    50. MqttConnectOptions options = new MqttConnectOptions();
    51. options.setAutomaticReconnect(true);
    52. options.setUserName(username);
    53. options.setPassword(password.toCharArray());
    54. options.setCleanSession(true);
    55. client.setCallback(mqttCallback);
    56. try {
    57. client.connect(options);
    58. } catch (MqttException e) {
    59. log.error("mqtt客户端连接服务端失败,失败原因{}", e.getMessage());
    60. }
    61. }
    62. /**
    63. * 断开连接
    64. */
    65. @PreDestroy
    66. public void disConnect() {
    67. try {
    68. client.disconnect();
    69. } catch (MqttException e) {
    70. log.error("断开连接产生异常,异常信息{}", e.getMessage());
    71. }
    72. }
    73. /**
    74. * 重连
    75. */
    76. public void reConnect() {
    77. try {
    78. client.reconnect();
    79. while (true){
    80. if (client.isConnected()){
    81. subscribe(topic_hzd, QosEnum.QoS2);
    82. subscribe(topic_device_connect, QosEnum.QoS2);
    83. break;
    84. }else{
    85. Thread.sleep(100);
    86. log.info("重连中,请稍后......");
    87. }
    88. }
    89. log.info("EmqClient连接成功,订阅成功.topic={},{}", topic_hzd, topic_device_connect);
    90. } catch (MqttException | InterruptedException e) {
    91. log.error("重连失败,失败原因{}", e.getMessage());
    92. }
    93. }
    94. /**
    95. * 发布消息
    96. *
    97. * @param topic
    98. * @param msg
    99. * @param qos
    100. * @param retain
    101. */
    102. public void publish(String topic, String msg, QosEnum qos, boolean retain) {
    103. MqttMessage mqttMessage = new MqttMessage();
    104. mqttMessage.setPayload(msg.getBytes());
    105. mqttMessage.setQos(qos.value());
    106. mqttMessage.setRetained(retain);
    107. try {
    108. client.publish(topic, mqttMessage);
    109. } catch (MqttException e) {
    110. log.error("发布消息失败,errormsg={},topic={},msg={},qos={},retain={}", e.getMessage(), topic, msg, qos.value(), retain);
    111. }
    112. }
    113. /**
    114. * 订阅
    115. *
    116. * @param topicFilter
    117. * @param qos
    118. */
    119. public void subscribe(String topicFilter, QosEnum qos) {
    120. try {
    121. client.subscribe(topicFilter, qos.value());
    122. } catch (MqttException e) {
    123. log.error("订阅主题失败,errormsg={},topicFilter={},qos={}", e.getMessage(), topicFilter, qos.value());
    124. }
    125. }
    126. /**
    127. * 取消订阅
    128. *
    129. * @param topicFilter
    130. */
    131. public void unSubscribe(String topicFilter) {
    132. try {
    133. client.unsubscribe(topicFilter);
    134. } catch (MqttException e) {
    135. log.error("取消订阅失败,errormsg={},topicfiler={}", e.getMessage(), topicFilter);
    136. }
    137. }
    138. }

    2. MessageCallback.java         处理订阅消息

    • init() 加载异步入库处理类DataStorage
    • connectionLost(Throwable cause) 断线重连
    • messageArrived(String topic, MqttMessage message)  处理订阅消息
    1. package com.hzd.mqtt.client;
    2. import com.hzd.mqtt.async.DataStorage;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    5. import org.eclipse.paho.client.mqttv3.MqttCallback;
    6. import org.eclipse.paho.client.mqttv3.MqttMessage;
    7. import org.springframework.beans.factory.annotation.Autowired;
    8. import org.springframework.stereotype.Component;
    9. import javax.annotation.PostConstruct;
    10. /**
    11. *
    12. */
    13. @Component
    14. @Slf4j
    15. public class MessageCallback implements MqttCallback {
    16. @Autowired
    17. private DataStorage dataStorage;
    18. @Autowired
    19. private EmqClient emqClient;
    20. public static MessageCallback MessageCallback;
    21. @PostConstruct //通过@PostConstruct实现初始化bean之前进行的操作
    22. public void init() {
    23. log.info("初始化");
    24. MessageCallback = this;
    25. // MessageCallback.dataPageRepository = this.dataPageRepository;
    26. MessageCallback.dataStorage = this.dataStorage;
    27. }
    28. /**
    29. * 丢失了对服务端的连接后触发的回调
    30. *
    31. * @param cause
    32. */
    33. @Override
    34. public void connectionLost(Throwable cause) {
    35. // 资源的清理 重连
    36. log.info("断开emqx连接,开始重连......");
    37. emqClient.reConnect();
    38. log.info("emqx连接成功......");
    39. }
    40. /**
    41. * 应用收到消息后触发的回调
    42. *
    43. * @param topic
    44. * @param message
    45. * @throws Exception
    46. */
    47. @Override
    48. public void messageArrived(String topic, MqttMessage message) throws Exception {
    49. dataStorage.saveDataToMongo(topic, message);
    50. }
    51. /**
    52. * 消息发布者消息发布完成产生的回调
    53. *
    54. * @param token
    55. */
    56. @Override
    57. public void deliveryComplete(IMqttDeliveryToken token) {
    58. int messageId = token.getMessageId();
    59. String[] topics = token.getTopics();
    60. log.info("消息发布完成,messageid={},topics={}", messageId, topics);
    61. }
    62. }


    3. DataStorage.java          异步入库处理类

    1. package com.hzd.mqtt.async;
    2. import com.alibaba.fastjson2.JSON;
    3. import com.hzd.entity.po.ControlLog;
    4. import com.hzd.entity.po.DataExtend;
    5. import com.hzd.entity.po.MqttClientLog;
    6. import com.hzd.entity.vo.MqttPayload;
    7. import com.hzd.entity.vo.MqttTopic;
    8. import com.hzd.mqtt.enums.TopicType;
    9. import lombok.extern.slf4j.Slf4j;
    10. import org.eclipse.paho.client.mqttv3.MqttMessage;
    11. import org.springframework.data.mongodb.UncategorizedMongoDbException;
    12. import org.springframework.data.mongodb.core.MongoTemplate;
    13. import org.springframework.data.mongodb.core.query.Criteria;
    14. import org.springframework.data.mongodb.core.query.Query;
    15. import org.springframework.data.mongodb.core.query.Update;
    16. import org.springframework.scheduling.annotation.Async;
    17. import org.springframework.stereotype.Component;
    18. import javax.annotation.Resource;
    19. import java.util.Date;
    20. /**
    21. * Coding by 李炯 on 2022/10/28 15:12
    22. */
    23. @Component
    24. @Slf4j
    25. public class DataStorage {
    26. @Resource
    27. private MongoTemplate mongoTemplate;
    28. // @Resource
    29. // private DataPageRepository dataPageRepository;
    30. @Async
    31. public void saveDataToMongo(String topic, MqttMessage message) {
    32. MqttTopic t = MqttTopic.init(topic);
    33. String payload = new String(message.getPayload());
    34. if (t == null) return;
    35. if (payload.contains("\"type\":\"data\"")) {
    36. DataExtend dataExtend = DataExtend.init(payload);
    37. mongoTemplate.insert(dataExtend, t.getFac_id());
    38. log.info("设备数据上报,topic={},fac_id={},messageid={},qos={},payload={}",
    39. topic, t.getFac_id(), message.getId(), message.getQos(), payload);
    40. } else if (payload.contains("\"type\":\"config\"")) {
    41. ControlLog controlLog = ControlLog.init(payload);
    42. Query query = new Query(Criteria.where("_id").is(controlLog.get_id()));
    43. Update update = Update.update("status", controlLog.getStatus());
    44. update.set("status", controlLog.getStatus());
    45. update.set("msg", controlLog.getMsg());
    46. if (null != controlLog.getRelayIndexLst()) {
    47. update.set("relayIndexLst", controlLog.getRelayIndexLst());
    48. }
    49. if (null != controlLog.getRelayNumLst()) {
    50. update.set("relayNumLst", controlLog.getRelayNumLst());
    51. }
    52. if (null != controlLog.getReadInterval()) {
    53. update.set("readInterval", controlLog.getReadInterval());
    54. }
    55. if (null != controlLog.getRelayphoto()) {
    56. update.set("relayphoto", controlLog.getRelayphoto());
    57. }
    58. if (null != controlLog.getLongitude()) {
    59. update.set("longitude", controlLog.getLongitude());
    60. }
    61. if (null != controlLog.getLatitude()) {
    62. update.set("latitude", controlLog.getLatitude());
    63. }
    64. if (null != controlLog.getControlStatus()) {
    65. update.set("controlStatus", controlLog.getControlStatus());
    66. }
    67. if (null != controlLog.getReserve()) {
    68. update.set("reserve", controlLog.getReserve());
    69. }
    70. update.set("updateTime", new Date());
    71. mongoTemplate.upsert(query, update, "log_control");
    72. log.info("{},topic={},messageid={},qos={},payload={}",
    73. "配置返回", topic, message.getId(), message.getQos(), payload);
    74. } else if (t.getTopicType() == TopicType.connect) {
    75. MqttPayload pl = MqttPayload.init(payload);
    76. MqttClientLog clientLog = MqttClientLog.init(pl);
    77. updateConnectStatus(topic, clientLog);
    78. log.info("{},topic={},messageid={},qos={},payload={}",
    79. clientLog.getStatus(), topic, message.getId(), message.getQos(), payload);
    80. }
    81. }
    82. @Async
    83. public void updateConnectStatus(String topic, MqttClientLog clientLog) {
    84. try{
    85. MqttTopic t = MqttTopic.init(topic);
    86. if (t == null) return;
    87. mongoTemplate.insert(clientLog, "log_mqtt_client");
    88. Query query = new Query(Criteria.where("username").is(clientLog.getUserName())
    89. .and("deviceid").is(clientLog.getDeviceId()));
    90. Update update = Update.update("connectStatus", clientLog.getConnectStatus());
    91. update.set("status", clientLog.getStatus());
    92. update.set("updateTime", clientLog.getUpdateTime());
    93. mongoTemplate.upsert(query, update, "status_mqtt_client");
    94. }catch (Exception e){
    95. log.error("mongoDB 错误:MqttClientLog={},错误消息为{}",JSON.toJSONString(clientLog),e.getMessage());
    96. }
    97. }
    98. }

  • 相关阅读:
    笔记整体梳理
    tf.metrics
    微信支付 APP端 第三弹 申请退款
    2、NewObject的三种原型(来自GPT)
    Mysql索引学习笔记
    SSM+网上书店管理系统 毕业设计-附源码082255
    论文解读(gCooL)《Graph Communal Contrastive Learning》
    Matlab:基于Matlab实现人工智能算法应用的简介(SVM支撑向量机&GA遗传算法&PSO粒子群优化算法)、案例应用之详细攻略
    020—pandas 根据历史高考分段推断当前位次的分数
    Spring MVC 的返回值有哪些
  • 原文地址:https://blog.csdn.net/ladymorgana/article/details/127890217