• 【nacos】5.3 nacos 更新mqtt配置,自动加载连接EMQX


    返回首页 > 【笔记】Spring Cloud Alibaba Nacos

    上一节问题:

    1. 问题

    1.1 如果修改一些需要预加载的配置呢,如下场景是否不用启动服务器能立即生效?

    • 修改连接Mqtt服务器,并订阅
    • 修改连接TCP服务器
    • 修改TCP客户端端口
    • 修改mysql等数据库地址,端口,账号,密码

        单使用@RefreshScope + @Value以上4种场景均不能生效(需要重启spring boot项目,mqtt服务器才能重新连接)

    1.2 如何才能生效?

    当我们修改nacos值时,发现控制台打印信息如下

     查看 RefreshEventListener.class 代码,发现当naocs变化时handle方法会调用,并打印变化的key

    1. public void handle(RefreshEvent event) {
    2. if (this.ready.get()) {
    3. log.debug("Event received " + event.getEventDesc());
    4. Set keys = this.refresh.refresh();
    5. log.info("Refresh keys changed: " + keys);
    6. }
    7. }

    所以,只需要参考 RefreshEventListener.class 新建一个监听类 MqttConfigListener.java即可

    2. 实战

    案例:修改nacos配置,实施重连emqx服务器(mqtt)

    说明:关键看

    • MqttConfigListener.handle(RefreshEvent event)
    • emqClient.init();//重新连接EMQX

    • MqttConfigListener.java

    1. package com.hzd.listener;
    2. import com.hzd.mqtt.client.EmqClient;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.apache.commons.logging.Log;
    5. import org.apache.commons.logging.LogFactory;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.boot.context.event.ApplicationReadyEvent;
    8. import org.springframework.cloud.context.refresh.ContextRefresher;
    9. import org.springframework.cloud.endpoint.event.RefreshEvent;
    10. import org.springframework.cloud.endpoint.event.RefreshEventListener;
    11. import org.springframework.context.ApplicationEvent;
    12. import org.springframework.context.event.SmartApplicationListener;
    13. import org.springframework.stereotype.Component;
    14. import java.util.Set;
    15. import java.util.concurrent.atomic.AtomicBoolean;
    16. /**
    17. * Coding by 李炯 on 2022/11/11 11:24
    18. */
    19. @Component
    20. @Slf4j
    21. public class MqttConfigListener implements SmartApplicationListener {
    22. @Autowired
    23. private EmqClient emqClient;
    24. private ContextRefresher refresh;
    25. private AtomicBoolean ready = new AtomicBoolean(false);
    26. public MqttConfigListener(ContextRefresher refresh) {
    27. this.refresh = refresh;
    28. }
    29. public boolean supportsEventType(Class eventType) {
    30. return ApplicationReadyEvent.class.isAssignableFrom(eventType) || RefreshEvent.class.isAssignableFrom(eventType);
    31. }
    32. public void onApplicationEvent(ApplicationEvent event) {
    33. if (event instanceof ApplicationReadyEvent) {
    34. this.handle((ApplicationReadyEvent)event);
    35. } else if (event instanceof RefreshEvent) {
    36. this.handle((RefreshEvent)event);
    37. }
    38. }
    39. public void handle(ApplicationReadyEvent event) {
    40. this.ready.compareAndSet(false, true);
    41. }
    42. public void handle(RefreshEvent event) {
    43. if (this.ready.get()) {
    44. Set keys = this.refresh.refresh();
    45. log.info("Refresh keys changed>>>>>>>>: " + keys.toString());
    46. if (keys.toString().contains("mqtt")){
    47. emqClient.disConnect();//断开之前的连接
    48. emqClient.init();//重新连接EMQX
    49. }
    50. }
    51. }
    52. }
    • EmqClient.java
    1. package com.hzd.mqtt.client;
    2. import com.hzd.mqtt.enums.QosEnum;
    3. import com.hzd.mqtt.properties.MqttProperties;
    4. import lombok.extern.slf4j.Slf4j;
    5. import org.eclipse.paho.client.mqttv3.*;
    6. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    7. import org.slf4j.Logger;
    8. import org.slf4j.LoggerFactory;
    9. import org.springframework.beans.factory.annotation.Autowired;
    10. import org.springframework.cloud.context.config.annotation.RefreshScope;
    11. import org.springframework.context.annotation.Bean;
    12. import org.springframework.context.annotation.Primary;
    13. import org.springframework.stereotype.Component;
    14. import javax.annotation.PostConstruct;
    15. import javax.annotation.PreDestroy;
    16. /**
    17. * Created by
    18. */
    19. @Component
    20. @Slf4j
    21. public class EmqClient {
    22. private IMqttClient mqttClient;
    23. @Autowired
    24. private MqttProperties mqttProperties;
    25. @Autowired
    26. private MqttCallback mqttCallback;
    27. @Autowired
    28. private MqttProperties properties;
    29. @PostConstruct
    30. public void init() {
    31. MqttClientPersistence mempersitence = new MemoryPersistence();
    32. try {
    33. mqttClient = new MqttClient(mqttProperties.getBrokerUrl(), mqttProperties.getClientId(), mempersitence);
    34. } catch (MqttException e) {
    35. log.error("初始化客户端mqttClient对象失败,errormsg={},brokerUrl={},clientId={}", e.getMessage(), mqttProperties.getBrokerUrl(), mqttProperties.getClientId());
    36. }
    37. //连接服务端
    38. connect(properties.getUsername(), properties.getPassword());
    39. //订阅一个主题
    40. String topic_hzd = "/hzd/pub/#";
    41. String topic_device_connect = "/device/connect/#";
    42. subscribe(topic_hzd, QosEnum.QoS2);
    43. subscribe(topic_device_connect, QosEnum.QoS2);
    44. log.info("EmqClient连接成功,订阅成功.topic={},{}", topic_hzd, topic_device_connect);
    45. }
    46. /**
    47. * 连接broker
    48. *
    49. * @param username
    50. * @param password
    51. */
    52. public void connect(String username, String password) {
    53. MqttConnectOptions options = new MqttConnectOptions();
    54. options.setAutomaticReconnect(true);
    55. options.setUserName(username);
    56. options.setPassword(password.toCharArray());
    57. options.setCleanSession(true);
    58. mqttClient.setCallback(mqttCallback);
    59. try {
    60. mqttClient.connect(options);
    61. } catch (MqttException e) {
    62. log.error("mqtt客户端连接服务端失败,失败原因{}", e.getMessage());
    63. }
    64. }
    65. /**
    66. * 断开连接
    67. */
    68. @PreDestroy
    69. public void disConnect() {
    70. try {
    71. mqttClient.disconnect();
    72. } catch (MqttException e) {
    73. log.error("断开连接产生异常,异常信息{}", e.getMessage());
    74. }
    75. }
    76. /**
    77. * 重连
    78. */
    79. public void reConnect() {
    80. try {
    81. mqttClient.reconnect();
    82. } catch (MqttException e) {
    83. log.error("重连失败,失败原因{}", e.getMessage());
    84. }
    85. }
    86. /**
    87. * 发布消息
    88. *
    89. * @param topic
    90. * @param msg
    91. * @param qos
    92. * @param retain
    93. */
    94. public void publish(String topic, String msg, QosEnum qos, boolean retain) {
    95. MqttMessage mqttMessage = new MqttMessage();
    96. mqttMessage.setPayload(msg.getBytes());
    97. mqttMessage.setQos(qos.value());
    98. mqttMessage.setRetained(retain);
    99. try {
    100. mqttClient.publish(topic, mqttMessage);
    101. } catch (MqttException e) {
    102. log.error("发布消息失败,errormsg={},topic={},msg={},qos={},retain={}", e.getMessage(), topic, msg, qos.value(), retain);
    103. }
    104. }
    105. /**
    106. * 订阅
    107. *
    108. * @param topicFilter
    109. * @param qos
    110. */
    111. public void subscribe(String topicFilter, QosEnum qos) {
    112. try {
    113. mqttClient.subscribe(topicFilter, qos.value());
    114. } catch (MqttException e) {
    115. log.error("订阅主题失败,errormsg={},topicFilter={},qos={}", e.getMessage(), topicFilter, qos.value());
    116. }
    117. }
    118. /**
    119. * 取消订阅
    120. *
    121. * @param topicFilter
    122. */
    123. public void unSubscribe(String topicFilter) {
    124. try {
    125. mqttClient.unsubscribe(topicFilter);
    126. } catch (MqttException e) {
    127. log.error("取消订阅失败,errormsg={},topicfiler={}", e.getMessage(), topicFilter);
    128. }
    129. }
    130. }

  • 相关阅读:
    瑛字取名寓意及含义
    开源日报 0822 | 语音识别与推理
    什么是代码风格?
    Database之SQL:SQL在线编程、工作中常用SQL代码实践(以语法为导向的增、删、改、查,已基本涵盖所有语法案例)之详细攻略
    Rsync已过时?替代文件同步方式了解一下
    postman自动化运行接口测试用例
    Bee2.1.8支持Spring Boot 3.0.11,active命令行选择多环境,多表查改增删(bee-spring-boot发布,更新maven)
    微擎模块 砍价宝小程序 6.4.0全开源版安装更新一体包 砍价抢购拼团模式商家入驻
    【ESD专题】案例 :结构设计导致的ESD问题
    面试题 02.07. 链表相交-双指针法
  • 原文地址:https://blog.csdn.net/ladymorgana/article/details/127810860