• springboot集成mqtt


    文章目录

    • 前言
    • 一、MQTT是什么?
    • 二、继承步骤
      • 1.安装MQTT
      • 2.创建项目,引入依赖
      • 3. 对应步骤2的代码
      • 3 测试
    • 总结
      • mqtt 启动后访问地址


    前言

    随着物联网的火热,MQTT的应用逐渐增多

    曾经也有幸使用过mqtt,今天正好总结下MQTT的使用;


    一、MQTT是什么?

    可以把他理解为,也是一种mq消息,设计简单且轻量级,通讯报文开销小,占用的网络带宽和资源较少,适用于低带宽、不稳定网络环境下的通讯。
    MQTT采用发布/订阅模式,分为发布者和订阅者两个角色,需要一个中介来协调发布者和订阅者之间的消息传递,这个中介就是MQTT代理(Broker)。
    MQTT协议在物联网领域应用广泛,包括智能家居、工业自动化、智能交通系统等。


    个人简单总结:

    1. 每个客户端可以订阅一个或者多个主题(发消息,收消息)
    2. 每个客户端不订阅主题,也可以发送主题消息(只接受消息,不发送消息)
    3. 客户端A发送消息给客户端B流程为:
    客户端A>>>Broker>>>客户端B
    --- 
    前置条件:
    a: 客户端A 发送主题消息,且与客户端B的订阅主题一致
    b: 客户端B 订阅主题
    
    • 1
    • 2
    • 3
    • 4
    • 5

    二、继承步骤

    1.安装MQTT

    这里直接采用windows版本,解压版,比较快

    • 下载地址 MQTT-windows版本
    • 解压后,在bin文件下执行运行命令 .\emqx console
    • 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码admin/public
      MQTT管理页面

    2.创建项目,引入依赖

    大致分为如下步骤:

    • yml配置 主题 用户名 密码
    • 根据配置创建客户端实例,实例订阅主题
    • 实现 MqttCallback 接口
      1. 重连处理 connectionLost
      2. 消息接受处理 messageArrived
      3. 消息发生成功处理 deliveryComplete
      
      • 1
      • 2
      • 3
    • 根据客户端信息发送某个主题的消息

    3. 对应步骤2的代码

    1. yml配置
    server:
      port: 8081
    # 下面这里要看你自己的需求
    customer:
      mqtt:
        broker: tcp://127.0.0.1:1883
        clientList:
          #发布客户端ID
          - clientId: nxys_service
            #监听主题 同时订阅多个主题使用 - 分割开
            subscribeTopic: mqtt/publish
            #用户名
            userName: admin
            #密码
            password: public
          #接受客户端ID
          - clientId: receive_service
            #监听主题 同时订阅多个主题使用 - 分割开
            subscribeTopic: mqtt/receive
            #用户名
            userName: admin
            #密码
            password: public
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    1. 实例信息获取
    /**
     * Mqtt配置类
     */
    @Data
    @Configuration
    @ConfigurationProperties(prefix = "customer.mqtt")
    public class MqttConfig {
        /**
         * mqtt broker地址
         */
        String broker;
        /**
         * 需要创建的MQTT客户端
         */
        List<MqttClient> clientList;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    /**
     * MQTT客户端
     */
    @Data
    public class MqttClient {
        /**
         * 客户端ID
         */
        private String clientId;
        /**
         * 监听主题
         */
        private String subscribeTopic;
        /**
         * 用户名
         */
        private String userName;
        /**
         * 密码
         */
        private String password;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    1. 根据信息创建实例,订阅主题
    /**
     * MQTT客户端创建
     */
    @Component
    @Slf4j
    public class MqttClientCreate {
        @Resource
        private MqttClientManager mqttClientManager;
        @Autowired
        private MqttConfig mqttConfig;
    
        /**
         * 创建MQTT客户端
         */
        @PostConstruct
        public void createMqttClient() {
            List<MqttClient> mqttClientList = mqttConfig.getClientList();
    
            for (MqttClient mqttClient : mqttClientList) {
                log.info("{}", mqttClient);
                //创建客户端,客户端ID:demo,回调类跟客户端ID一致
                mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    /**
     * MQTT客户端管理类,如果客户端非常多后续可入redis缓存
     */
    @Slf4j
    @Component
    public class MqttClientManager {
        @Value("${customer.mqtt.broker}")
        private String mqttBroker;
        @Resource
        private MqttCallBackContext mqttCallBackContext;
        /**
         * 存储MQTT客户端
         */
        public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();
    
        public static MqttClient getMqttClientById(String clientId) {
            return MQTT_CLIENT_MAP.get(clientId);
        }
    
        /**
         * 创建mqtt客户端
         *
         * @param clientId       客户端ID
         * @param subscribeTopic 订阅主题,可为空
         * @param userName       用户名,可为空
         * @param password       密码,可为空
         * @return mqtt客户端
         */
        public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
            MemoryPersistence persistence = new MemoryPersistence();
    
            try {
                MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
                MqttConnectOptions connOpts = new MqttConnectOptions();
                if (null != userName && !"".equals(userName)) {
                    connOpts.setUserName(userName);
                }
    
                if (null != password && !"".equals(password)) {
                    connOpts.setPassword(password.toCharArray());
                }
    
                connOpts.setCleanSession(true);
    
                if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                    AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);
    
                    if (null == callBack) {
                        callBack = mqttCallBackContext.getCallBack("default");
                    }
    
                    callBack.setClientId(clientId);
                    callBack.setConnectOptions(connOpts);
                    client.setCallback(callBack);
                }
    
                //连接mqtt服务端broker
                client.connect(connOpts);
                // 订阅主题
                if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                    if (subscribeTopic.contains("-"))
                        client.subscribe(subscribeTopic.split("-"));
                    else
    //                    if (!subscribeTopic.equals("mqtt/receive"))
                    {
                        client.subscribe(subscribeTopic);
                    }
                }
    
                MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
            } catch (MqttException e) {
                log.error("Create mqttClient failed!", e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    1. 实现 MqttCallback 接口
    /**
     * MQTT回调抽象类
     */
    @Slf4j
    public abstract class AbsMqttCallBack implements MqttCallback {
        private String clientId;
    
        private MqttConnectOptions connectOptions;
     
        public String getClientId() {
            return clientId;
        }
     
        public void setClientId(String clientId) {
            this.clientId = clientId;
        }
     
        public MqttConnectOptions getConnectOptions() {
            return connectOptions;
        }
     
        public void setConnectOptions(MqttConnectOptions connectOptions) {
            this.connectOptions = connectOptions;
        }
     
        /**
         * 失去连接操作,进行重连
         *
         * @param throwable 异常
         */
        @Override
        public void connectionLost(Throwable throwable) {
            try {
                if (null != clientId) {
                    if (null != dconnectOptions) {
                        MqttClientManager.getMqttClientById(clientId).connect(connectOptions);
                    } else {
                        MqttClientManager.getMqttClientById(clientId).connect();
                    }
                }
     
            } catch (Exception e) {
                log.error("{} reconnect failed!", e);
            }
        }
     
        /**
         * 接收订阅消息
         * @param topic    主题
         * @param mqttMessage 接收消息
         * @throws Exception 异常
         */
        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
    		String content = new String(mqttMessage.getPayload());
         	handleReceiveMessage(topic, content);
        }
     
        /**
         * 消息发送成功
         *
         * @param iMqttDeliveryToken toke
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            log.info("消息发送成功");
        }
     
     
        /**
         * 处理接收的消息
         * @param topic   主题
         * @param message 消息内容
         */
        protected abstract void handleReceiveMessage(String topic, String message);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    /**
     * 默认回调
     */
    @Slf4j
    @Component("default")
    public class DefaultMqttCallBack extends AbsMqttCallBack {
    
        /**
         * @param topic   主题
         * @param message 消息内容
         */
        @Override
        protected void handleReceiveMessage(String topic, String message) {
            log.info("接收到主题---{}", topic);
            log.info("接收到消息---{}", message);
            // 你自己的消息处理业务
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    /**
     * MQTT订阅回调环境类
     */
    @Component
    @Slf4j
    public class MqttCallBackContext {
        private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();
    
        /**
         * 默认构造函数
         *
         * @param callBackMap 回调集合
         */
        public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
            this.callBackMap.clear();
            this.callBackMap.putAll(callBackMap);
        }
    
        /**
         * 获取MQTT回调类
         *
         * @param clientId 客户端ID
         * @return MQTT回调类
         */
        public AbsMqttCallBack getCallBack(String clientId) {
            return this.callBackMap.get(clientId);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    1. 发送消息
    @RestController
    public class SendController {
        @Resource
        MqttClientManager mqttClientManager;
    
        @RequestMapping("/sendMessage")
        public String sendMessage(String topic){
            try {
                MqttMessage mqttMessage = new MqttMessage("你好".getBytes());
                mqttClientManager.getMqttClientById("nxys_service").publish(topic,mqttMessage);
                return "发送成功";
            } catch (Exception e) {
                e.printStackTrace();
                return "发送失败";
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    3 测试

    1. 启动订阅,查看MQTT 管理页面
      两个实例
    2. 测试发送消息,查看发送情况,接受情况
      http://localhost:8081/sendMessage?topic=mqtt/receive
      发送成功,并接受到消息

    总结

    文中涉及的所有代码: MQTT-Demo

    mqtt 启动后访问地址

    http://localhost:18083/#/

    • 用户名/密码:
    • admin/public

    1. 每个客户端可以订阅一个或者多个主题
    2. 每个客户端不订阅主题,也可以发送主题消息
    3. 客户端A发送消息给客户端B流程为:
    客户端A>>>Broker>>>客户端B
    --- 
    前置条件:
    a: 客户端A 发送主题消息,且与客户端B的订阅主题一致
    b: 客户端B 订阅主题
    
    • 1
    • 2
    • 3
    • 4
    • 5

    mqtt启动命令
    在bin目录下,cmd 执行

    .\emqx console
    
    • 1
  • 相关阅读:
    论文解读(DAEGC)《Improved Deep Embedded Clustering with Local Structure Preservation》
    【wiki知识库】05.分类管理实现--前端Vue模块
    Linux文件/目录高级管理三
    教培机构怎么做在线教育直播
    【洛谷P1351】联合权值【数学】
    【c++刷题Day2】专题2线性DP
    Web开发-新建Spring Boot项目
    苹果手机如何设置铃声:3个方法完美解决题
    实现IP地址归属地显示功能、号码归属地查询
    react-native webstorm 无法启动 Android 模拟器
  • 原文地址:https://blog.csdn.net/qq_32419139/article/details/136176948