• SpringBoot中使用EMQX实现MQTT通讯


    简述

    之前写过一篇SpringBoot通过Netty实现TCP服务的文章,本篇与之前那篇实现的场景类似,都是服务器与客户端之间双向交互,但个人觉得MQTT的方式实现更好。

    基础

    MQTT协议是通过MQTT服务器转发消息,MQTT服务器作为三方,为每个客户端转发消息。与TCP不同的是,TCP编码,java大部分场景是作为服务端,设备作为客户端,而MQTT是一台单独的服务器,java跟设备都作为客户端与之保持长连接。

    准备

    下载EMQX,对应有windows、mac、linux的版本

    下载网址:EMQX下载网址

    解压,启动

    bin文件夹下执行:(稍等五秒钟,出现两行英文时表示启动完成)

    emqx start

    浏览器打开

    http://localhost:18083/

    启动成功。

    账号密码:admin/public

    若代码里连接MQTT服务时带上用户名密码需要如下操作:

    创建客户端认证

    编码

    maven依赖

    1. org.springframework.integration
    2. spring-integration-mqtt

    监听器

    1. package com.dpkj.mqtt.mqtt;
    2. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
    3. import com.dpkj.mqtt.service.DpDeviceService;
    4. import com.mdd.common.entity.DpDevice;
    5. import com.mdd.common.enums.CtrlTypeEnum;
    6. import com.mdd.common.enums.MqttTopicEnum;
    7. import lombok.extern.slf4j.Slf4j;
    8. import org.eclipse.paho.client.mqttv3.MqttException;
    9. import org.jetbrains.annotations.NotNull;
    10. import org.springframework.beans.factory.annotation.Autowired;
    11. import org.springframework.context.ApplicationListener;
    12. import org.springframework.context.annotation.Lazy;
    13. import org.springframework.context.event.ContextRefreshedEvent;
    14. import org.springframework.stereotype.Component;
    15. import javax.annotation.Resource;
    16. import java.util.List;
    17. /**
    18. * 项目启动 连接mqtt服务器 订阅指定主题
    19. * 此处为项目启动就订阅主题,这种情况是主题已经明确的情况,
    20. * 若项目启动时主题不明确,需要在代码里动态订阅,
    21. 可直接在类中注入MQTTConnect server,之后执行server.sub("xxxx")
    22. */
    23. @Slf4j
    24. @Component
    25. public class MQTTListener implements ApplicationListener {
    26. private final MQTTConnect server;
    27. @Autowired
    28. public MQTTListener(MQTTConnect server) {
    29. this.server = server;
    30. }
    31. @Override
    32. public void onApplicationEvent(@NotNull ContextRefreshedEvent contextRefreshedEvent) {
    33. try {
    34. server.setMqttClient(new Callback());
    35. //订阅主题
    36. server.sub(“test/test/test”);
    37. } catch (MqttException e) {
    38. log.error(e.getMessage(), e);
    39. }
    40. }
    41. }

    发布及订阅主题

    1. package com.dpkj.mqtt.mqtt;
    2. import com.dpkj.mqtt.config.async.AsyncTaskService;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.eclipse.paho.client.mqttv3.*;
    5. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    6. import org.springframework.beans.factory.annotation.Value;
    7. import org.springframework.stereotype.Component;
    8. import javax.annotation.Resource;
    9. import java.io.UnsupportedEncodingException;
    10. import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;
    11. /**
    12. * WQ
    13. * 2023/2/14 17:10
    14. * MQTT工具类操作
    15. */
    16. @Slf4j
    17. @Component
    18. public class MQTTConnect {
    19. @Value("${mqtt.host}")
    20. private String mqttHost;
    21. @Value("${mqtt.port}")
    22. private String mqttPort;
    23. @Value("${mqtt.username}")
    24. private String mqttUsername;
    25. @Value("${mqtt.password}")
    26. private String mqttPassword;
    27. //唯一标识
    28. private final String clientId = "MqttClient" + (int) (Math.random() * 100000000);
    29. private MqttClient mqttClient;
    30. @Resource
    31. private AsyncTaskService asyncTaskService;
    32. /**
    33. * 客户端connect连接mqtt服务器
    34. **/
    35. public void setMqttClient(MqttCallback mqttCallback) {
    36. MqttConnectOptions options = mqttConnectOptions();
    37. if (mqttCallback == null) {
    38. mqttClient.setCallback(new Callback());
    39. } else {
    40. mqttClient.setCallback(mqttCallback);
    41. }
    42. try {
    43. mqttClient.connect(options);
    44. log.error("MQTT服务连接成功。tcp://{}:{},clientId:{},username:{},password:{}", mqttHost, mqttPort, clientId, mqttUsername, mqttPassword);
    45. } catch (MqttException e) {
    46. e.printStackTrace();
    47. log.error("MQTT服务连接失败,{}。tcp://{}:{},clientId:{},username:{},password:{}", e.getMessage(), mqttHost, mqttPort, clientId, mqttUsername, mqttPassword);
    48. }
    49. }
    50. /**
    51. * MQTT连接参数设置
    52. */
    53. public MqttConnectOptions mqttConnectOptions() {
    54. try {
    55. mqttClient = new MqttClient("tcp://" + mqttHost + ":" + mqttPort, clientId, new MemoryPersistence());
    56. } catch (MqttException e) {
    57. log.error("建立mqtt客户端出错。{}", e.getMessage());
    58. e.printStackTrace();
    59. }
    60. MqttConnectOptions options = new MqttConnectOptions();
    61. options.setUserName(mqttUsername);
    62. options.setPassword(mqttPassword.toCharArray());
    63. options.setConnectionTimeout(0);
    64. options.setAutomaticReconnect(true);
    65. options.setKeepAliveInterval(90);
    66. return options;
    67. }
    68. /**
    69. * 关闭MQTT连接
    70. */
    71. public void close() throws MqttException {
    72. mqttClient.disconnect();
    73. mqttClient.close();
    74. }
    75. /**
    76. * 向某个主题发布消息 默认qos:1
    77. *
    78. * @param sn:道品云泊控制卡设备的唯一标识
    79. * @param msg:发布的消息
    80. * @param msgId:发布的消息唯一标识
    81. */
    82. public void pub(String sn, String msg, String msgId) {
    83. MqttMessage mqttMessage = new MqttMessage();
    84. try {
    85. //GBK格式下发
    86. mqttMessage.setPayload(msg.getBytes("GBK"));
    87. } catch (UnsupportedEncodingException e) {
    88. e.printStackTrace();
    89. }
    90. String topic = "/test/" + sn + "/demo";
    91. log.error("向主题[{}],发送消息 {}", topic, msg);
    92. MqttTopic mqttTopic = mqttClient.getTopic(topic);
    93. MqttDeliveryToken token;
    94. try {
    95. token = mqttTopic.publish(mqttMessage);
    96. token.waitForCompletion();
    97. } catch (MqttException e) {
    98. e.printStackTrace();
    99. }
    100. }
    101. /**
    102. * 订阅某一个主题 ,此方法默认的的Qos等级为:1
    103. *
    104. * @param topic 主题
    105. */
    106. public void sub(String topic) throws MqttException {
    107. log.error("主题订阅:{}", topic);
    108. mqttClient.subscribe(topic);
    109. }
    110. // /**
    111. // * 订阅某一个主题,可携带Qos
    112. // *
    113. // * @param topic 所要订阅的主题
    114. // * @param qos 消息质量:0、1、2
    115. // */
    116. // public void sub(String topic, int qos) throws MqttException {
    117. // mqttClient.subscribe(topic, qos);
    118. // }
    119. }

    yml

    1. mqtt:
    2. host: 127.0.0.1
    3. port: 1883
    4. username: xxx
    5. password: 'xxx'

  • 相关阅读:
    聊聊室内导航在应用方面
    js 数组移除指定元素【函数封装】(含对象数组移除指定元素)
    kepler.gl笔记:添加数据
    【网络安全 --- kali2023安装】超详细的kali2023安装教程(提供镜像资源)
    什么是腾讯云关系型数据库(MySQL/SQL Server/MariaDB/PostgreSQL详解)
    【Linux】基础IO —— 缓冲区深度剖析
    BSV 上的信息不完整游戏
    微信小程序在线预览PDF文件
    嵌入式Linux应用开发-第七章-RK3288和 RK3399的 LED驱动程序
    2023-09-12力扣每日一题
  • 原文地址:https://blog.csdn.net/weiqiang915/article/details/132759452