• 物联网消息队列客户端-MQTT-基本功能实现


    1. 主要实现功能

    • 自动配置
    • 消息自动解析
    • 消息分组共享订阅
    • 消息不分组共享订阅
    • 消息排它订阅
    • 延时发布
    • 多数据源

    4. 快速开始

    4.1 引入依赖

        <dependency>
                <groupId>org.eclipse.pahogroupId>
                <artifactId>org.eclipse.paho.client.mqttv3artifactId>
                <version>1.2.5version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    4.2 配置

    spring:
      mqtt:
        emq:
          client:
            # 多数据源客户端名称,默认default
            default:
              # broker地址
              host: 127.0.0.1
              # 端口
              port: 31883
              # 用户名
              username: admin
              # 密码
              password: 123456
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    更多配置如下

    spring:
      mqtt:
        emq:
          client:
            # 多数据源客户端名称,默认default
            default:
              # broker地址
              host: 127.0.0.1
              # 端口
              port: 31883
              # 用户名
              username: admin
              # 密码
              password: 123456
              # 客户端标识,需保持全局唯一
              client-id: parking_server
              # 是否清除session
              clean-session: false
              # 连接超时时间,单位秒
              connection-timeout: 10
              # 心跳间隔时间,单位秒
              keep-alive-interval: 10
              # 全局消息质量
              global-qos: 1
              # 重新连接之间等待的最长时间
              maxReconnect-delay: 128000
              # 是否自动重新连接
              automatic-reconnect: true
              # 最大消息并发数量,超过此数量并发时可能会丢消息
              maxInflight: 1000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    4.3 开启自动配置

    在启动类上增加@EnableRabbitMqAutoConfiguration注解

    import com.demo.mqttclient.anno.EnableEmqAutoConfiguration;
    
    
    @SpringBootApplication
    @EnableEmqAutoConfiguration
    public class TestApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(TestApplication.class, args);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4.4 发布消息

    在生产者的业务程序中,注入MQTTClient

    import com.demo.mqttclient.MQTTClient;
    
    @Resource
    private MQTTClient defaultMQTTClient;
    
    • 1
    • 2
    • 3
    • 4

    为了兼容第三方及优化内部使用逻辑,所以内置提供了两种消息发送方式。

    4.4.1 第三方消息发送

    public String publishHeartbeatReply() {
        HeartbeatReplyMessage heartbeatReplyMessage = new HeartbeatReplyMessage();
        heartbeatReplyMessage.setCmd(32896);
        heartbeatReplyMessage.setExpire(1605252875L);
        heartbeatReplyMessage.setDevid("095437323930030130523933");
        heartbeatReplyMessage.setServer_time("1605252875");
    
        defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage);
    
        return "success";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    4.4.2 内部消息发送

    消息实体实现Message接口

    package com.example.test.message;
    
    import com.demo.mqttclient.MQTTMessage;
    import lombok.Data;
    
    import java.util.UUID;
    
    @Data
    public class demoMessage implements MQTTMessage {
        
        private String msgId = UUID.randomUUID().toString();
        
        private String name;
        
        private String gender;
        
        @Override
        public String getMsgId() {
            return this.msgId;
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    然后直接调用该类的publish方法发送即可

    @GetMapping("demo/publish")
    public String demoPublish() {
        demoMessage demoMessage = new demoMessage();
        demoMessage.setName("点都");
        demoMessage.setGender("xx");
    
        defaultMQTTClient.publish("demo/topic", demoMessage);
    
        return "success";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    其中存在多个重载的方法。

    package com.demo.mqttclient;
    
    import com.demo.mqttclient.enums.ShareModelEnum;
    import com.demo.plugin.core.lang.json.JSONUtil;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    /**
     * mqtt客户端
     *
     * @author zhangliuyang
     * @date 2022/07/18
     * @since 1.0.0
     */
    public interface MQTTClient {
    
        /**
         * 启动客户端
         */
        void start();
    
        /**
         * 关闭客户端
         */
        void close();
    
        /**
         * 发布
         *
         * @param topic   主题
         * @param message 消息
         */
        default <T extends MQTTMessage> void publish(String topic, T message) {
            this.publish(topic, message, 1);
        }
    
        /**
         * 发布
         *
         * @param topic   主题
         * @param message 消息
         * @param qos     消息质量
         */
        default <T extends MQTTMessage> void publish(String topic, T message, int qos) {
            this.publish(topic, message, qos, 0);
        }
    
        /**
         * 发布
         *
         * @param topic   主题
         * @param message 消息
         * @param qos     消息质量
         * @param delay   延迟时间[unit:s, max:4294967s, condition: > 0]
         */
        default <T extends MQTTMessage> void publish(String topic, T message, int qos, long delay) {
            this.publish(topic, message, qos, delay, false);
        }
    
        /**
         * 发布
         *
         * @param topic    主题
         * @param message  消息
         * @param qos      消息质量
         * @param delay    延迟时间[unit:s, max:4294967s, condition: > 0]
         * @param retained 保留消息
         */
        default <T extends MQTTMessage> void publish(String topic, T message, int qos, long delay, boolean retained) {
            MQTTMessageContext mqttMessageContext = new MQTTMessageContext();
            mqttMessageContext.setId(message.getMsgId());
            mqttMessageContext.setPayload(JSONUtil.write(message));
            mqttMessageContext.setQos(qos);
            mqttMessageContext.setDelay(delay);
            mqttMessageContext.setRetained(retained);
            mqttMessageContext.setTimestamp(System.currentTimeMillis());
    
            this.publish(topic, mqttMessageContext);
        }
    
        /**
         * 发布
         *
         * @param topic          主题
         * @param messageContext 消息上下文
         */
        void publish(String topic, MQTTMessageContext messageContext);
    
        /**
         * 发送到第三方
         *
         * @param topic   主题
         * @param message 消息
         */
        default void publish2ThirdParty(String topic, Object message) {
            this.publish2ThirdParty(topic, 1, message);
        }
    
        /**
         * 发送到第三方
         *
         * @param topic   主题
         * @param qos     消息质量
         * @param message 消息
         */
        default void publish2ThirdParty(String topic, int qos, Object message) {
            this.publish2ThirdParty(topic, qos, message, Constant.DEFAULT_CHARSET.name());
        }
    
        /**
         * 发送到第三方
         *
         * @param topic       主题
         * @param qos         消息质量
         * @param message     消息
         * @param charsetName 字符集名称
         */
        default void publish2ThirdParty(String topic, int qos, Object message, String charsetName) {
            this.publish2ThirdParty(topic, qos, message, charsetName, 0);
        }
    
        /**
         * publish2第三方
         *
         * @param topic       主题
         * @param qos         qos
         * @param message     消息
         * @param charsetName 字符集名称
         * @param delay       延迟时间
         */
        default void publish2ThirdParty(String topic, int qos, Object message, String charsetName, long delay) {
            this.publish2ThirdParty(topic, qos, message, charsetName, delay, false);
        }
    
        /**
         * publish2第三方
         *
         * @param topic       主题
         * @param qos         qos
         * @param message     消息
         * @param charsetName 字符集名称
         * @param delay       延迟时间
         * @param retained    是否保留消息
         */
        default void publish2ThirdParty(String topic, int qos, Object message, String charsetName, long delay, boolean retained) {
            this.publish2ThirdParty(topic, qos, JSONUtil.toString(message).getBytes(charsetName), delay, retained);
        }
    
        /**
         * 发送到第三方
         *
         * @param topic   主题
         * @param payload 有效载荷
         */
        default void publish2ThirdParty(String topic, byte[] payload) {
            this.publish2ThirdParty(topic, 1, payload, 0);
        }
    
        /**
         * 发送到第三方
         *
         * @param topic   主题
         * @param qos     消息质量
         * @param payload 有效载荷
         */
        default void publish2ThirdParty(String topic, int qos, byte[] payload) {
            this.publish2ThirdParty(topic, qos, payload, 0);
        }
    
        /**
         * 发送到第三方
         *
         * @param topic   主题
         * @param qos     消息质量
         * @param payload 有效载荷
         * @param delay   延迟时间
         */
        default void publish2ThirdParty(String topic, int qos, byte[] payload, long delay) {
            this.publish2ThirdParty(topic, qos, payload, delay, false);
        }
    
        /**
         * 发送到第三方
         *
         * @param topic    主题
         * @param qos      消息质量
         * @param payload  有效载荷
         * @param delay    延迟时间
         * @param retained 是否保留消息
         */
        default void publish2ThirdParty(String topic, int qos, byte[] payload, long delay, boolean retained) {
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(payload);
            mqttMessage.setQos(qos);
            mqttMessage.setRetained(retained);
    
            this.publish2ThirdParty(topic, delay, mqttMessage);
        }
    
        /**
         * 发送到第三方
         *
         * @param topic       主题
         * @param mqttMessage mqtt消息
         * @param delay       延迟时间
         */
        void publish2ThirdParty(String topic, long delay, MqttMessage mqttMessage);
    
        /**
         * 订阅
         *
         * @param topic          主题
         * @param qos            消息质量
         * @param shareModel     共享模型
         * @param groupName      分组名称
         * @param exclusive      排它
         * @param messageHandler 消息处理程序
         */
        void subscribe(String topic, int qos, ShareModelEnum shareModel, String groupName, boolean exclusive, MessageHandler messageHandler);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220

    4.5 接收消息

    消费者需要在消息处理类上添加@MQTTSubscriber(topics = {"npt/park/type1/server/10010"})注解,指定要监听topics和客户端名称即可。如果没有显示的指定客户端名称,则使用defaultMQTTClient,使用qos执行订阅消息质量。
    当消息处理类中有多个public方法时,需要@MQTTConsumerMethod标记具体消费方法

    package com.example.test.handler;
    
    import com.demo.mqttclient.MessageHandler;
    import com.demo.mqttclient.anno.MQTTConsumerMethod;
    import com.demo.mqttclient.anno.MQTTSubscriber;
    import com.demo.mqttclient.enums.ShareModelEnum;
    import com.demo.plugin.core.lang.json.JSONUtil;
    import com.example.test.message.DeviceStartMessage;
    import com.example.test.message.PlateRecognitionReportMessage;
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    import java.util.Map;
    
    
    @Slf4j
    @MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE)
    public class ParkingMessageHandler {
    
        public static final String TOPIC = "npt/park/type1/server/10010";
    
        public static final String CMD_KEY = "cmd";
    
        @MQTTConsumerMethod
        public void handle(Map<String, Object> message) {
            log.info("handle:{}", message);
            if (message.containsKey(CMD_KEY)) {
                Integer cmd = (Integer) message.get(CMD_KEY);
    
                switch (cmd) {
                    case 129:
                        handleDeviceStartMessage(message);
                        break;
                    case 140:
                        handlePlateRecognitionReportMessage(message);
                        break;
                    default:
                        log.warn("不支持此cmd:[{}]", cmd);
                        break;
                }
            } else {
                log.warn("消息消费异常");
            }
        }
    
        private void handleDeviceStartMessage(Map<String, Object> message) {
            DeviceStartMessage deviceStartMessage = JSONUtil.toObject(JSONUtil.toString(message), DeviceStartMessage.class);
            log.info("接收到设备启动消息:{}", deviceStartMessage);
        }
    
        private void handlePlateRecognitionReportMessage(Map<String, Object> message) {
            PlateRecognitionReportMessage plateRecognitionReportMessage = JSONUtil.toObject(JSONUtil.toString(message), PlateRecognitionReportMessage.class);
            log.info("接收到车牌上报识别消息:{}", plateRecognitionReportMessage);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    4.6 发送延迟消息

    要发送延迟消息,需要先开启emq延迟发布配置。
    发送延时消息的方式相比之前,仅仅增加一个延时时间。其中延时时长的单位为,最大为4294967

    //发送一个延时时长为10s的消息
    defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage, 10);
    
    • 1
    • 2

    4.7 多数据源

    多数据源与单数据源配置属性相同,在配置文件中声明即可

    spring:
      mqtt:
        emq:
          client:
            # 多数据源客户端名称,默认default
            default:
              # broker地址
              host: 127.0.0.1
              # 端口
              port: 31883
              # 用户名
              username: admin
              # 密码
              password: 123456
            # 多数据源客户端名称
            parking:
              # broker地址
              host: 127.0.0.1
              # 端口
              port: 31883
              # 用户名
              username: admin
              # 密码
              password: 123456
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    4.7.1 发布消息

    首先注入MQTTClient,与单数据源的唯一区别就是bean的名称。默认向Spring容器中添加的实现类名称为“${数据源名称}MQTTClient”
    以上面的配置文件为例,默认的bean名称为 defaultMQTTClientbillMQTTClient

    import com.demo.mqttclient.MQTTClient;
    
    @Resource
    private MQTTClient defaultMQTTClient;
    
    @Resource
    private MQTTClient billMQTTClient;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    其他操作同单数据源

    4.7.2 接收消息

    接收消息与单数据源基本一致,唯一的区别是在@MQTTSubscriber中指定clientName属性,指定当前从哪个数据源进行消费。

    import com.demo.mqttclient.anno.MQTTSubscriber;
    
    @MQTTSubscriber(topics = {"npt/park/type1/server/10010"}, clientName = "parking")
    public class ParkingMessageHandler {}
    
    • 1
    • 2
    • 3
    • 4

    4.8 分组共享订阅

    系统默认使用spring.application.name作为分组名称,用户可在消息消费类上指定@MQTTSubscriber属性中groupName = "group_name"即可

    import com.demo.mqttclient.anno.MQTTSubscriber;
    
    @MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE, groupName = "group_name")
    public class ParkingMessageHandler {}
    
    • 1
    • 2
    • 3
    • 4

    4.9 不分组共享订阅

    只需要在消息消费类上指定@MQTTSubscriber属性中share = ShareModelEnum.UN_GROUP_SHARE即可。

    import com.demo.mqttclient.anno.MQTTSubscriber;
    
    @MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.UN_GROUP_SHARE)
    public class ParkingMessageHandler {}
    
    • 1
    • 2
    • 3
    • 4

    4.10 排它订阅

    只需要在消息消费类上指定@MQTTSubscriber属性中exclusive = true即可,开启排它订阅时,默认关闭共享订阅。

    import com.demo.mqttclient.anno.MQTTSubscriber;
    
    @MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, exclusive = true)
    public class ParkingMessageHandler {}
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    UVA-122 树的层次遍历 题解答案代码 算法竞赛入门经典第二版
    高数有多难?AI高数考试正确率81%
    虾皮选品免费工具:如何用知虾进行虾皮市场分析选品
    synchronized关键字的作用
    【PID优化】基于萤火虫算法PID控制器优化设计含Matlab源码
    WebDriver库:实现对音频文件的自动下载与保存
    sqoop笔记(安装、配置及使用)
    网络爬虫技术及应用
    Vue中的常用指令v-html / v-show / v-if / v-else / v-on / v-bind / v-for / v-model
    AcWing.第121场周赛
  • 原文地址:https://blog.csdn.net/MacWx/article/details/126250325