之前写过一篇SpringBoot通过Netty实现TCP服务的文章,本篇与之前那篇实现的场景类似,都是服务器与客户端之间双向交互,但个人觉得MQTT的方式实现更好。
MQTT协议是通过MQTT服务器转发消息,MQTT服务器作为三方,为每个客户端转发消息。与TCP不同的是,TCP编码,java大部分场景是作为服务端,设备作为客户端,而MQTT是一台单独的服务器,java跟设备都作为客户端与之保持长连接。
下载EMQX,对应有windows、mac、linux的版本
下载网址:EMQX下载网址
解压,启动
bin文件夹下执行:(稍等五秒钟,出现两行英文时表示启动完成)
emqx start
启动成功。
账号密码:admin/public
若代码里连接MQTT服务时带上用户名密码需要如下操作:
创建客户端认证
-
-
org.springframework.integration -
spring-integration-mqtt -
- package com.dpkj.mqtt.mqtt;
-
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.dpkj.mqtt.service.DpDeviceService;
- import com.mdd.common.entity.DpDevice;
- import com.mdd.common.enums.CtrlTypeEnum;
- import com.mdd.common.enums.MqttTopicEnum;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.jetbrains.annotations.NotNull;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.ApplicationListener;
- import org.springframework.context.annotation.Lazy;
- import org.springframework.context.event.ContextRefreshedEvent;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.util.List;
-
- /**
- * 项目启动 连接mqtt服务器 订阅指定主题
- * 此处为项目启动就订阅主题,这种情况是主题已经明确的情况,
- * 若项目启动时主题不明确,需要在代码里动态订阅,
- 可直接在类中注入MQTTConnect server,之后执行server.sub("xxxx")
- */
- @Slf4j
- @Component
- public class MQTTListener implements ApplicationListener
{ -
- private final MQTTConnect server;
-
- @Autowired
- public MQTTListener(MQTTConnect server) {
- this.server = server;
- }
-
- @Override
- public void onApplicationEvent(@NotNull ContextRefreshedEvent contextRefreshedEvent) {
- try {
- server.setMqttClient(new Callback());
-
- //订阅主题
- server.sub(“test/test/test”);
-
- } catch (MqttException e) {
- log.error(e.getMessage(), e);
- }
- }
- }
-
- package com.dpkj.mqtt.mqtt;
-
- import com.dpkj.mqtt.config.async.AsyncTaskService;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
-
- import java.io.UnsupportedEncodingException;
-
- import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;
-
- /**
- * WQ
- * 2023/2/14 17:10
- * MQTT工具类操作
- */
- @Slf4j
- @Component
- public class MQTTConnect {
-
- @Value("${mqtt.host}")
- private String mqttHost;
- @Value("${mqtt.port}")
- private String mqttPort;
- @Value("${mqtt.username}")
- private String mqttUsername;
- @Value("${mqtt.password}")
- private String mqttPassword;
-
- //唯一标识
- private final String clientId = "MqttClient" + (int) (Math.random() * 100000000);
-
- private MqttClient mqttClient;
-
- @Resource
- private AsyncTaskService asyncTaskService;
-
- /**
- * 客户端connect连接mqtt服务器
- **/
- public void setMqttClient(MqttCallback mqttCallback) {
- MqttConnectOptions options = mqttConnectOptions();
- if (mqttCallback == null) {
- mqttClient.setCallback(new Callback());
- } else {
- mqttClient.setCallback(mqttCallback);
- }
- try {
- mqttClient.connect(options);
- log.error("MQTT服务连接成功。tcp://{}:{},clientId:{},username:{},password:{}", mqttHost, mqttPort, clientId, mqttUsername, mqttPassword);
- } catch (MqttException e) {
- e.printStackTrace();
- log.error("MQTT服务连接失败,{}。tcp://{}:{},clientId:{},username:{},password:{}", e.getMessage(), mqttHost, mqttPort, clientId, mqttUsername, mqttPassword);
- }
- }
-
- /**
- * MQTT连接参数设置
- */
- public MqttConnectOptions mqttConnectOptions() {
- try {
- mqttClient = new MqttClient("tcp://" + mqttHost + ":" + mqttPort, clientId, new MemoryPersistence());
- } catch (MqttException e) {
- log.error("建立mqtt客户端出错。{}", e.getMessage());
- e.printStackTrace();
- }
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName(mqttUsername);
- options.setPassword(mqttPassword.toCharArray());
- options.setConnectionTimeout(0);
- options.setAutomaticReconnect(true);
- options.setKeepAliveInterval(90);
- return options;
- }
-
- /**
- * 关闭MQTT连接
- */
- public void close() throws MqttException {
- mqttClient.disconnect();
- mqttClient.close();
- }
-
- /**
- * 向某个主题发布消息 默认qos:1
- *
- * @param sn:道品云泊控制卡设备的唯一标识
- * @param msg:发布的消息
- * @param msgId:发布的消息唯一标识
- */
- public void pub(String sn, String msg, String msgId) {
- MqttMessage mqttMessage = new MqttMessage();
- try {
- //GBK格式下发
- mqttMessage.setPayload(msg.getBytes("GBK"));
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- String topic = "/test/" + sn + "/demo";
- log.error("向主题[{}],发送消息 {}", topic, msg);
- MqttTopic mqttTopic = mqttClient.getTopic(topic);
- MqttDeliveryToken token;
- try {
- token = mqttTopic.publish(mqttMessage);
- token.waitForCompletion();
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 订阅某一个主题 ,此方法默认的的Qos等级为:1
- *
- * @param topic 主题
- */
- public void sub(String topic) throws MqttException {
- log.error("主题订阅:{}", topic);
- mqttClient.subscribe(topic);
- }
-
- // /**
- // * 订阅某一个主题,可携带Qos
- // *
- // * @param topic 所要订阅的主题
- // * @param qos 消息质量:0、1、2
- // */
- // public void sub(String topic, int qos) throws MqttException {
- // mqttClient.subscribe(topic, qos);
- // }
- }
- mqtt:
- host: 127.0.0.1
- port: 1883
- username: xxx
- password: 'xxx'