• Java 使用 EMQX 实现物联网 MQTT 通信



    前言

    EMQX 实现物联网 MQTT 通信。物联网的 MQ 消息通信方式。


    一、介绍

    1、MQTT

    MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为远程连接设备提过实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
    MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(loT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

    特点:
    使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
    对负载内容屏蔽的消息传输;
    使用 TCP/IP 提供网络连接;
    有三种消息发布服务质量:
    小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
    使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2、EMQX

    EMQX 是一个「无限连接,任意集成,随处运行」大规模分布式物联网接入平台。
    EMQX 企业版提供一体化的分布式 MQTT 消息服务和强大的 IoT 规则引擎,为高可靠、高性能的物联网实时数据移动、处理和集成提供动力,助力企业快速构建关键业务的 IoT 平台与应用。附下载地址: https://www.emqx.com/zh/try?product=enterprise 可以自行下载对应版本运行
    在这里插入图片描述

    优势:
    海量连接:单节点支持 500 万 MQTT 设备连接,集群可水平扩展至支持 1 亿并发的 MQTT 连接。
    高可靠:弹性伸缩,无单点故障。内置 RocksDB 可靠地持久化 MQTT 消息,确保无数据损失。
    数据安全:端到端数据加密(支持国密),细粒度访问控制,保障数据安全,满足企业合规需求。
    多协议:支持 MQTT、HTTP、QUIC、WebSocket、LwM2M/CoAP 或专有协议连接任何设备。
    高性能:单节点支持每秒实时接收、处理与分发数百万条的 MQTT 消息。毫秒级消息交付时延。
    易运维:图形化配置、操作与管理,实时监测运行状态。支持 MQTT 跟踪进行端到端问题分析。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3、Mria 集群架构​

    支持全新的 Mria 集群架构,在此架构下 EMQX 水平扩展性得到指数级提升,单个集群可以轻松支持 1 亿 MQTT 连接,这使得 EMQX 5.0 成为目前全球最具扩展性的 MQTT Broker。

    在构建满足用户业务需求的更大规模集群的同时,Mria 架构还能够降低大规模部署下的脑裂风险以及脑裂后的影响,以提供更加稳定可靠的物联网数据接入服务。

    具体可以查看官方文档: https://docs.emqx.com/zh/enterprise/v5.1/deploy/cluster/create-cluster.html

    4、MQTTX

    MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。MQTTX 的用户界面 UI 采用聊天式设计,使得操作逻辑更加简明直观。它支持用户快速创建和保存多个 MQTT 连接,便于测试 MQTT/MQTTS 连接,以及 MQTT 消息的订阅和发布。

    在这里插入图片描述

    主要功能
    采用聊天界面设计,使得操作更加简单明了
    跨平台兼容,支持在 Windows,macOS,Linux 系统上运行
    100% 兼容 MQTT v5.0,v3.1.1 和 v3.1 协议
    订阅的 MQTT 主题支持自定义颜色标签
    支持单向和双向 SSL 认证,同时支持 CA 和自签名证书
    支持通过 WebSocket 连接 MQTT 服务器
    支持 Hex, Base64, JSON, Plaintext 等 Payload 格式转换
    自定义脚本支持模拟 MQTT 发布/订阅测试
    提供完整的日志记录功能
    多语言支持:简体中文、英语、日语、土耳其语及匈牙利语 ??? ??? ??? ??? ???
    自由切换 Light、Dark、Night 三种主题模式

    二、SpringBoot 集成 EMQX

    1、yaml 配置

    # EMQX配置
    emqx:
      # EMQX服务地址,端口号默认18083
      url: http://127.0.0.1:18083
      # 认证用户名
      username: admin
      # 密码
      password: admin123456
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2、Properties 配置类

    /**
     * EMQX配置
     */
    @Data
    @Configuration
    @ConfigurationProperties(prefix = "emqx")
    public class EmqxConfig {
    
    	private String url;
    
    	private String username;
    
    	private String password;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3、客户端连接实体 model

    客户端连接实现模型类。

    /**
     * EMQX客户端连接model
     */
    @Data
    public class EmqxClientModel {
    
    	Integer awaiting_rel_cnt;
    	Integer awaiting_rel_max;
    	Boolean clean_start;
    	
    	/**
    	 * 客户端id
    	 */
    	String clientid;
    	
    	/**
    	 * 连接状态
    	 */
    	Boolean connected;
    	Long connected_at;
    	Long created_at;
    	Long disconnected_at;
    	Integer expiry_interval;
    	Integer heap_size;
    	Integer inflight_cnt;
    	Integer inflight_max;
    	
    	/**
    	 * ip地址
    	 */
    	String ip_address;
    	Boolean is_bridge;
    	
    	/**
    	 * 心跳检测时间s
    	 */
    	Integer keepalive;
    	Integer mailbox_len;
    	Integer mqueue_dropped;
    	Integer mqueue_len;
    	
    	/**
    	 * 消息队列最大长度
    	 */
    	Integer mqueue_max;
    	String node;
    	Integer port;
    	String proto_name;
    	Integer proto_ver;
    	Integer recv_cnt;
    	Integer recv_msg;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    4、token 服务类

    提供获取 token 的方法。

    @Component
    @RequiredArgsConstructor
    public class EmqxTokenService {
    
    	private final EmqxConfig emqxConfig;
    
    	public String getToken(){
    		String authentication = emqxConfig.getUsername() + ":" + emqxConfig.getPassword();
    		return "Basic " + Base64.getEncoder().encodeToString(authentication.getBytes());
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    5、客户端 api

    提供对外调用的 api 服务。

    1. 查询客户端连接状态。
    2. 查询客户端连接信息。
    3. 删除客户端连接。
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class EmqxClientsApi {
    
    	private final EmqxConfig emqxConfig;
    
    	private final EmqxTokenService emqxTokenService;
    
    	/**
    	 * 根据客户端id查询连接状态
    	 *
    	 * @param clientId 客户端id
    	 * @return 连接状态
    	 */
    	public boolean getConnectedStatus(String clientId) {
    		EmqxClientModel client = getByClientId(clientId);
    		if (client == null) {
    			return false;
    		}
    		return client.getConnected();
    	}
    
    	/**
    	 * 根据客户端id查询客户端信息
    	 *
    	 * @param clientId 客户端id
    	 * @return 客户端信息
    	 */
    	public EmqxClientModel getByClientId(String clientId) {
    		String url = String.format(emqxConfig.getUrl() + "/clients/%s", clientId);
    		HttpResponse httpResponse;
    		try {
    			httpResponse = HttpRequest.get(url)
    					.header("Authorization", emqxTokenService.getToken())
    					.execute();
    		} catch (Exception e) {
    			log.info("未查到emqx客户端:clientId=" + clientId + "[msg]:" + e.getMessage());
    			return null;
    		}
    		if (httpResponse != null && httpResponse.getStatus() == 200) {
    			return JSON.parseObject(httpResponse.body(), EmqxClientModel.class);
    		}
    		return null;
    	}
    
    	/**
    	 * 根据客户端id删除客户端
    	 *
    	 * @param clientId 客户端id
    	 * @return 客户端信息
    	 */
    	public void delete(String clientId) {
    		String url = String.format(emqxConfig.getUrl() + "/clients/%s", clientId);
    		HttpResponse httpResponse;
    		try {
    			httpResponse = HttpRequest.delete(url)
    					.header("Authorization", emqxTokenService.getToken())
    					.execute();
    		} catch (IORuntimeException e) {
    			throw new ServiceException("删除emqx客户端请求超时");
    		} catch (Exception e) {
    			throw new ServiceException("删除emqx客户端请求异常:clientId=" + clientId + "[msg]:" + e.getMessage());
    		}
    		if (httpResponse == null || httpResponse.getStatus() != 204) {
    			throw new ServiceException("删除emqx客户端失败:clientId=" + clientId);
    		}
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    三、SpringBoot 集成 MQTT

    1、pom 依赖

    <dependency>
      <groupId>org.eclipse.paho</groupId>
    	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    	<version>1.2.2</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、yaml 配置

    spring:
      # MQTT配置
      mqtt:
        # MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开
        host-url: tcp://127.0.0.1:1883
        # 用户名
        username: admin
        # 密码
        password: admin123456
        # 客户端id(不能重复)
        client-id: real-mqtt-client
        # MQTT默认的消息推送主题,实际可在调用接口时指定
        default-topic: topic
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    3、Properties 配置类

    @Configuration
    @ConfigurationProperties(prefix = "spring.mqtt")
    @Data
    public class MqttConfig {
    
    	private String username;
    
    	private String password;
    
    	private String hostUrl;
    
    	private String clientId;
    
    	private String defaultTopic;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    4、连接工厂类

    执行 mq 初始化配置、连接、消息主题订阅。

    @Slf4j
    @Component
    public class MqttFactory {
    
    	public static ConcurrentHashMap<String, MqttClient> clientMap = new ConcurrentHashMap<>();
    
    	@Autowired
    	private MqttConfig mqttConfig;
    
    	@Autowired
    	private RealPersonAccessDeviceMapper realPersonAccessDeviceMapper;
    
    	/**
    	 * 在bean初始化后连接到服务器
    	 */
    	@PostConstruct
    	public void init() {
    		String mqttStartFlag = ParamResolver.getStr(RealCommonConstants.MQTT_START_FLAG);
    		if (StrUtil.equals(mqttStartFlag, CommonConstants.SYS_YES_NO_Y)) {
    			// 初始化订阅主题
    			initSubscribeTopic(getInstance());
    		}
    	}
    
    	/**
    	 * 初始化客户端
    	 */
    	public MqttClient getInstance() {
    		MqttClient client = null;
    		if (clientMap.get(mqttConfig.getClientId()) == null) {
    			try {
    				client = new MqttClient(mqttConfig.getHostUrl(), mqttConfig.getClientId());
    				// MQTT配置对象
    				MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    				// 设置自动重连, 其它具体参数可以查看MqttConnectOptions
    				mqttConnectOptions.setAutomaticReconnect(true);
    				// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
    				// mqttConnectOptions.setCleanSession(true);
    				// 设置超时时间 单位为秒
    				mqttConnectOptions.setConnectionTimeout(10);
    				mqttConnectOptions.setUserName(mqttConfig.getUsername());
    				mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
    				// mqttConnectOptions.setServerURIs(new String[]{url});
    				// 设置会话心跳时间 单位为秒
    				mqttConnectOptions.setKeepAliveInterval(10);
    				// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
    				// mqttConnectOptions.setWill("willTopic", "offline".getBytes(), 2, false);
    				if (!client.isConnected()) {
    					client.connect(mqttConnectOptions);
    				}
    				client.setCallback(new MqttCallBack());
    				log.info("MQTT创建client成功={}", JSONObject.toJSONString(client));
    				clientMap.put(mqttConfig.getClientId(), client);
    			} catch (MqttException e) {
    				log.error("MQTT连接消息服务器[{}]失败", mqttConfig.getClientId() + "-" + mqttConfig.getHostUrl());
    			}
    		} else {
    			client = clientMap.get(mqttConfig.getClientId());
    			log.info("MQTT从map里获取到client,clientId=" + mqttConfig.getClientId());
    			// TODO 已采用自动重连策略
    //			log.info("MQTT从map里获取到client={}", JSONObject.toJSONString(client));
    //			if (!client.isConnected()) {
    //				initSubscribeTopic(client);
    			// 如果缓存里的client已经断开,则清除该缓存,再重新创建客户端连接
    //				clientMap.remove(mqttConfig.getClientId());
    //				this.getInstance();
    //			}
    		}
    		return client;
    	}
    
    
    	/**
    	 * 初始化订阅主题
    	 * 

    * 消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息 */ public void initSubscribeTopic(MqttClient client) { // 订阅设备发布消息主题 List<String> upstreamTopics = new ArrayList<>(); List<Integer> upstreamQos = new ArrayList<>(); upstreamTopics.add("topic_1"); upstreamQos.add(1); upstreamTopics.add("topic_2"); upstreamQos.add(0); upstreamTopics.add("topic_3"); upstreamQos.add(1); try { client.subscribe(upstreamTopics.toArray(new String[upstreamTopics.size()]), upstreamQos.stream().mapToInt(Integer::intValue).toArray()); } catch (MqttException e) { e.printStackTrace(); } } } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99

    5、MQTT 回调类

    连接断开回调,以及消息到达回调。根据消息主题进行消息分发。

    @Slf4j
    @Component
    public class MqttCallBack implements MqttCallback, MqttCallbackExtended {
    
    	/**
    	 * 客户端断开连接的回调
    	 */
    	@Override
    	public void connectionLost(Throwable throwable) {
    		log.info("客户端断开连接回调");
    	}
    
    	@Override
    	public void connectComplete(boolean reconnect, String serverURI) {
    		log.info("客户端断开连接重连");
    		// 重新订阅
    		MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
    		client.initSubscribeTopic(client.getInstance());
    		log.info("重连成功");
    	}
    
    	/**
    	 * 消息到达的回调
    	 */
    	@Override
    	public void messageArrived(String topic, MqttMessage mqttMessage) {
    		// 日志输出
    		MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
    		log.info("mqtt客户端ID : {}", client.getInstance().getClientId());
    		log.info("mqtt接收消息主题 : {}", topic);
    		log.info("mqtt接收消息Qos : {}", mqttMessage.getQos());
    		log.info("mqtt接收消息retained : {}", mqttMessage.isRetained());
    		log.info("mqtt接收消息内容 : {}", new String(mqttMessage.getPayload()));
    
    		String message = new String(mqttMessage.getPayload()); // 消息内容
    
    		// TODO 根据消息主题进行消息分发
    		
    	}
    
    	/**
    	 * 消息发布成功的回调
    	 */
    	@Override
    	public void deliveryComplete(IMqttDeliveryToken token) {
    		IMqttAsyncClient client = token.getClient();
    		log.info(client.getClientId() + " 发布消息成功!");
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    6、MQ 服务类

    提供消息发布等方法。

    这里对发送消息方法进行了自定义封装,增加了 redis 对发送的消息进行存储,在异步的响应消息返回时,利用 redis 中发送消息的消息id实现响应数据的异步绑定。响应消息存在丢失的情况。需要合理设置发送消息的过期时间,防止时间过短导致返回的响应丢失,防止时间过长占用 redis 资源。

    @Slf4j
    @Data
    @Configuration
    public class UMqttClientService {
    
    	private final MqttFactory mqttFactory;
    
    	private final StringRedisTemplate redisTemplate;
    
    	/**
    	 * 订阅主题
    	 */
    	public void subscribeTopic(String deviceNo) {
    		try {
    			// 订阅设备发布消息主题
    			List<String> upstreamTopics = new ArrayList<>();
    			upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));
    			upstreamTopics.add(UMqttCommonConstants.ONLINE);
    			upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));
    			upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));
    			int[] upstreamQos = {1, 1, 2, 0};
    			mqttFactory.getInstance().subscribe(upstreamTopics.toArray(new String[0]), upstreamQos);
    		} catch (MqttException e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * 取消订阅主题
    	 */
    	public void stopSubscribeTopic(String deviceNo) {
    		try {
    			// 取消订阅设备发布消息主题
    			List<String> upstreamTopics = new ArrayList<>();
    			upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));
    			upstreamTopics.add(UMqttCommonConstants.ONLINE);
    			upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));
    			upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));
    			mqttFactory.getInstance().unsubscribe(upstreamTopics.toArray(new String[0]));
    		} catch (MqttException e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * 断开连接
    	 */
    	public void disConnect() {
    		try {
    			mqttFactory.getInstance().disconnect();
    		} catch (MqttException e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * 订阅主题
    	 */
    	public void subscribe(String topic, int qos) {
    		try {
    			mqttFactory.getInstance().subscribe(topic, qos);
    		} catch (MqttException e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * 发布请求设备消息
    	 *
    	 * @param deviceNo 设备编号
    	 * @param message  消息
    	 */
    	public void publish(String deviceNo, String message) {
    		publish(1, false, String.format(UMqttCommonConstants.REQUEST, deviceNo), message);
    	}
    
    	/**
    	 * 发布请求设备消息
    	 */
    	public void publish(UMqttPublishDate publishDate) {
    		// 将消息id和方法名存到redis中:缓存3分钟
    		redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),
    				JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);
    		publish(1, false, String.format(UMqttCommonConstants.REQUEST, publishDate.getDeviceNo()), publishDate.getMessage());
    	}
    
    	/**
    	 * 发布响应设备消息
    	 */
    	public void publishResponse(UMqttPublishDate publishDate) {
    		// 将消息id和方法名存到redis中:缓存3分钟
    		redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),
    				JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);
    		publish(1, false, String.format(UMqttCommonConstants.RESPONSE, publishDate.getDeviceNo()), publishDate.getMessage());
    	}
    
    	/**
    	 * 发布消息
    	 *
    	 * @param qos      qos
    	 * @param retained retained
    	 * @param topic    主题
    	 * @param message  消息
    	 */
    	public void publish(int qos, boolean retained, String topic, String message) {
    		log.info("发布消息topic:" + topic);
    		log.info("发布消息message:" + message);
    		MqttMessage mqttMessage = new MqttMessage();
    		mqttMessage.setQos(qos);
    		mqttMessage.setRetained(retained);
    		mqttMessage.setPayload(message.getBytes());
    		//主题的目的地,用于发布/订阅信息
    		MqttTopic mqttTopic = mqttFactory.getInstance().getTopic(topic);
    		//提供一种机制来跟踪消息的传递进度
    		//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
    		MqttDeliveryToken token;
    		try {
    			//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
    			//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
    			token = mqttTopic.publish(mqttMessage);
    			token.waitForCompletion();
    		} catch (MqttException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127

    四、MQTT 的重连策略

    1. 在 mqtt 工厂类中进行初始化连接时,设置自动重连状态为开启。
    // 设置自动重连
    mqttConnectOptions.setAutomaticReconnect(true);
    
    • 1
    • 2
    1. 在 mqtt 回调类中,设置重连处理业务。
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
    	log.info("客户端断开连接重连");
    	// 重新订阅
    	MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
    	client.initSubscribeTopic(client.getInstance());
    	log.info("重连成功");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    MQTT 连接失效时,会自动进行重连,执行自定义的重连策略,重连过程中 MQTT 消息服务停止,消息处理等业务会抛出异常。

    五、EMQX 的 Windows 部署启动方式

    EMQX 服务需要延时启动,因为部署服务器开机时 EMQX 服务需要等待完成一些初始化操作。保证 EMQX 服务在业务服务启动前启动。

    bat 脚本部署方案:

    1. 方式一:(每次停止服务后需重启电脑)
    @echo off
    start cmd /k "cd /d D:\Program Files\emqx-5.0.11-windows-amd64\bin && emqx start"
    
    echo start magic-demo-biz
    java -jar -Dfile.encoding=utf-8 magic-demo-biz.jar
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 方式二:(管理员权限cmd进入D:\Program Files\emqx-5.0.11-windows-amd64\bin,执行emqx install安装成服务,将emqx服务设置为手动启动)
    @echo off
    sc start emqx
    
    echo start magic-demo-biz
    java -jar -Dfile.encoding=utf-8 magic-demo-biz.jar
    
    • 1
    • 2
    • 3
    • 4
    • 5

    六、疑难解答

    1、避免消息发送速率过快

    当需要批量发送大量消息时,如果消息发送频率过快,会导致 EMQX 服务器会主动将当前发送消息的客户端连接断开,因此在发送消息时需要控制 MQTT 消息的发送频率。

    for (int i = 0; i < recordSize; i++) {
    	// 发送MQTT消息
    	mqttClientService.sendMessage("topic", "发送消息");
    
    	// 执行延时程序,控制消息发送速率。(速率过快会导致MQTT客户端连接掉线)
    	try {
    		TimeUnit.SECONDS.sleep(second);
    	} catch (Exception e) {
    		log.error("延时程序执行异常:" + e.getMessage());
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2、判断 MQTT 客户端连接状态

    会有 MQTT 客户端连接存在但连接状态为已断开的情况。因此判断 MQTT 客户端连接状态时,需要获取 MQTT 客户端连接的实际连接状态,而不是仅判断 MQTT 客户端连接是否存在。

    /**
     * 根据客户端id查询连接状态
     *
     * @param clientId 客户端id
     * @return 连接状态
     */
    public boolean getConnectedStatus(String clientId) {
    	EmqxClientModel client = getByClientId(clientId);
    	if (client == null) {
    		return false;
    	}
    	return client.getConnected(); // 查询实际连接状态
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    总结

    MQTT 原理同 MQ 消息发送与接收,EMQX 的 MQTT 客服端连接即消息队列。

  • 相关阅读:
    golang singleflight资料整理
    78.C++ STL set/multiset容器
    8. SAP ABAP OData 服务如何支持创建(Create)操作
    Python语言程序设计 习题7
    Matlab通信仿真系列——信号处理函数
    【MIT6.824】lab2C-persistence, lab2D-log compaction 实现笔记
    【Android开发】Android服务和系统服务
    AI工人操作行为流程规范识别算法
    C++学习——C++运算符重载(含义、格式、示例、遵循的规则)
    redis常用命令
  • 原文地址:https://blog.csdn.net/demo_yo/article/details/132495705