• SpringBoot 实现EMQ设备的上下线告警


    前言

    上下线通知

    我遇到了一个难题,即在使用EMQ X 4.4.10的开源版本时,我需要实现设备的上下线状态监控,但该4.4.10开源版本并未内置设备上下线提醒模块,只有企业版才内置了该模块。这为我带来了一些技术上的难题,迫使我必须另辟蹊径,通过其他方法来监听设备的上下线状态。为了解决这个问题,我采取了以下步骤:

    首先,我修改了EMQ Xacl.config文件,添加了以下规则:

    {allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.
    
    • 1

    图示:

    这个规则允许订阅$SYS/brokers/+/clients/#主题的所有客户端

    接下来,我使用Spring Boot创建了一个应用程序,其中我设置了与EMQ X代理的连接。在这个应用程序中,我创建了一个监听器,用于订阅$SYS/brokers/+/clients/#主题,以感知设备的上下线信息。

    通过这个解决方案,我能够实时监控设备的连接和断开事件,而不受EMQ X开源版本的功能限制。这使我能够根据设备状态采取适当的措施,比如发送通知或执行其他自定义操作。这个方法允许我灵活地定制解决方案,以满足我的特定需求,而无需依赖EMQ X的功能。

    EMQ简介


    EMQErlang MQTT Broker)是一种基于Erlang编程语言开发的开源消息传递代理(MQTT broker),专门用于支持MQTTMessage Queuing Telemetry Transport)协议。MQTT是一种轻量级、高效的消息传递协议,最初设计用于连接受限的设备,如嵌入式系统和物联网设备。EMQ作为MQTT broker,提供了可靠的消息传递机制,使设备能够相互通信,同时也可与后端应用程序集成,使其成为物联网生态系统中的重要组成部分。

    环境

    • EMQX安装方式:Docker
    • EMQX版本:4.4.10开源版本
    • 操作系统:CentOS 7

    EMQX4.4版本官方文档

    下载

    下载 EMQX

    准备工作

    安装EMQX

    修改EMQX的ACL规则内容

    EMQX的Docker容器配置文件所在目录

    # 进入EMQX容器
    docker exec -it emqx /bin/sh
    # 进入配置文件目录
    cd /opt/emqx/etc
    # 查看acl配置文件
    cat acl.conf
    # 编辑acl配置文件
    vi acl.conf
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    非Docker容器配置文件所在目录

    # 进入配置文件目录
    cd /etc/emqx
    # 查看acl配置文件
    cat acl.conf
    # 编辑acl配置文件
    vi acl.conf
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    acl的默认文件内容

    %%--------------------------------------------------------------------
    %% [ACL](https://docs.emqx.io/broker/v3/en/config.html)
    %%
    %% -type(who() :: all | binary() |
    %%                {ipaddr, esockd_access:cidr()} |
    %%                {ipaddrs, [esockd_access:cidr()]} |
    %%                {client, binary()} |
    %%                {user, binary()}).
    %%
    %% -type(access() :: subscribe | publish | pubsub).
    %%
    %% -type(topic() :: binary()).
    %%
    %% -type(rule() :: {allow, all} |
    %%                 {allow, who(), access(), list(topic())} |
    %%                 {deny, all} |
    %%                 {deny, who(), access(), list(topic())}).
    %%--------------------------------------------------------------------
    
    {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
    
    {allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
    
    {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
    
    {allow, all}.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    新增一条ACL规则

    allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.
    
    • 1
    • allow: 表示这是一个允许(allow)访问的规则。这意味着与此规则匹配的操作将被允许。
    • all: 表示这个规则适用于所有的客户端。
    • subscribe: 表示这个规则定义了对主题的订阅权限。
    • $SYS/brokers/+/clients/#: 这是一个主题过滤器,它指定了匹配的主题模式。在这里,$SYS/brokers/+/clients/# 表示以 $SYS/brokers/ 开头,然后是一个通配符 +,它可以匹配单个层级的任何名称,接着是 clients/,最后又有一个 # 通配符,它可以匹配零个或多个层级的名称。因此,这个主题过滤器匹配了所有以 $SYS/brokers/ 开头,然后紧跟着 clients/ 的主题。

    综合起来,这个规则允许所有的客户端订阅以 $SYS/brokers/ 开头,然后跟着 clients/的所有主题。通常,这种规则用于允许所有客户端订阅系统级别的信息或监控数据,如经纪人(Broker)的客户端连接状态等。这可以用于监视和诊断MQTT Broker 的性能和状态。

    注意:修改完毕之后使用emqx_ctl reload_acl重新加载acl规则或者直接重启emqx服务

    搭建一个能够监听EMQX主题的Spring Boot应用程序

    MQTT相关依赖

    
    <dependency>
        <groupId>org.springframework.integrationgroupId>
        <artifactId>spring-integration-streamartifactId>
    dependency>
    <dependency>
        <groupId>org.springframework.integrationgroupId>
        <artifactId>spring-integration-mqttartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    MQTT接受订阅的主题

    $SYS/brokers/+/clients/#
    
    • 1

    处理设备上下线事件

    获取EMQX消息主题

    // 从消息请求头中获取消息主题topic
    String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
    
    • 1
    • 2

    获取topic最后的节点字符串

    以下方式通过主题topic来获取ClientId

    // topic最后的节点字符串
    String lastPart = extractLastPart(topic);
    // 获取消息内容
    String payload = StrUtil.str((byte[]) message.getPayload(), "utf-8");
    // 判断设备是上线或下线消息
    if ("connected".equals(lastPart)) {
        // 设备上线消息
        clientId = extractClientIdFromTopic(topic);
        log.info("设备上线提醒 -> IMEI:{}", clientId);
    } else if ("disconnected".equals(lastPart)) {
        // 设备下线消息
        clientId = extractClientIdFromTopic(topic);
        log.info("设备下线警告 -> IMEI:{}", clientId);
    }
    
    /**
     * 获取最后一个节点
     *
     * @param topic 主题
     * @return 节点内容
     */
    public static String extractLastPart(String topic) {
        // 使用split方法将字符串根据'/'分割成数组
        String[] parts = topic.split("/");
        // 获取最后一个元素
        String lastPart = parts[parts.length - 1];
        return lastPart;
    }
    
    /**
     * 从Topic中提取ClientId
     *
     * @param topic 主题
     * @return ClientId
     */
    public static String extractClientIdFromTopic(String topic) {
        // 使用正则表达式匹配主题中的ClientId
        String regex = "\\$SYS/brokers/[^/]+/clients/([^/]+)/(connected|disconnected)";
        Pattern pattern = Pattern.compile(regex);
        Matcher matcher = pattern.matcher(topic);
        // matcher.groupCount() 是一个方法,用于获取正则表达式中定义的组数。在正则表达式中,使用括号 () 来定义捕获组。在这个情况下,正则表达式 \\$SYS/brokers/[^/]+/clients/([^/]+)/(connected|disconnected) 中有两组,分别是括号内的 ([^/]+) 部分和 (connected|disconnected) 部分。matcher.groupCount() 返回的是正则表达式中捕获组的数量
        if (matcher.matches() && matcher.groupCount() == 2) {
            // 如果正则匹配成功,提取ClientId并返回
            return matcher.group(1);
        } else {
            // 如果匹配失败,返回null或者抛出异常,视情况而定
            return null;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    当然你也可以通过解析payload来获取更多详细信息,可参照官方文档:客户端上下线事件

    主题 (Topic)说明
    ${clientid}/connected上线事件。当任意客户端上线时,EMQX 就会发布该主题的消息
    ${clientid}/disconnected下线事件。当任意客户端下线时,EMQX 就会发布该主题的消息

    connected 事件消息的 Payload 解析成 JSON 格式如下:

    {
        "username": "foo",
        "ts": 1625572213873,
        "sockport": 1883,
        "proto_ver": 4,
        "proto_name": "MQTT",
        "keepalive": 60,
        "ipaddress": "127.0.0.1",
        "expiry_interval": 0,
        "connected_at": 1625572213873,
        "connack": 0,
        "clientid": "emqtt-8348fe27a87976ad4db3",
        "clean_start": true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    disconnected 事件消息的 Payload 解析成 JSON 格式如下:

    {
        "username": "foo",
        "ts": 1625572213873,
        "sockport": 1883,
        "reason": "tcp_closed",
        "proto_ver": 4,
        "proto_name": "MQTT",
        "ipaddress": "127.0.0.1",
        "disconnected_at": 1625572213873,
        "clientid": "emqtt-8348fe27a87976ad4db3"
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    可以解析JOSN数据拿到clientid,注意此处使用的JSON解析工具是Hutool的。

    // 获取消息内容
    String payload = StrUtil.str((byte[]) message.getPayload(), "utf-8");
    // 解析JSON字符串
    JSONObject payloadJsonObject = JSONUtil.parseObj(payload);
    // 获取ClientId
    String clientid = payloadJsonObject.getStr("clientid");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    实现效果

    总结

    1. 修改 EMQ X ACL 配置: 你在 EMQ X 中修改了 acl.config 文件,添加了相应的 ACL 规则,允许订阅 $SYS/brokers/+/clients/# 主题的所有客户端。这个步骤允许你在开源版本中访问关键的设备连接信息。

    2. 创建 Spring Boot 应用程序: 通过创建一个 Spring Boot 应用程序,你建立了一个连接到 EMQ X 代理的桥梁。这个应用程序充当了监听器,用于订阅 $SYS/brokers/+/clients/# 主题,以实时感知设备的连接和断开事件。

    3. 实时设备监控: 你的解决方案允许你实时监控设备的连接状态,而无需依赖 EMQ X 企业版的内置功能。这使你能够快速响应设备状态的变化,并采取相应的行动,如发送通知或执行自定义操作。

    4. 定制性: 通过这个方法,你能够灵活地定制解决方案,以满足你的特定需求。你可以根据设备状态采取不同的操作,因此具有更大的灵活性。

    5. 避免功能限制: 尽管 EMQ X 4.4.10 开源版本没有内置设备上下线提醒模块,但你成功地绕过了这个限制,通过自定义配置和应用程序开发来实现了所需的功能。

    6. 无需升级或许可证: 你的解决方案不需要升级到 EMQ X 企业版或购买额外的许可证。这使得你可以在开源版本的基础上构建所需的功能。

    7. 总而言之,你的解决方案是一种巧妙的方式,通过修改配置和创建一个自定义的 Spring Boot 应用程序,实现了设备上下线状态的监控和管理。这个方法不仅解决了技术挑战,还提供了灵活性和定制性,以满足你的特定需求。这是一个创造性的方法,适用于需要在 EMQ X 开源版本中实现设备监控的情况。

    参考文献

    EMQX4.4官方文档
    SpringBoot整合MQTT(EMQ)设备上下线告警
    【EMQX 5.0】 Spring Cloud 集成MQTT并异步入库 + 客户端上报数据 + 上下线主题订阅

  • 相关阅读:
    40 道基础Dubbo 面试题及答案
    BFC(边距重叠解决方案)
    企业IP地址管理(IPAM)
    Golang中的GC原理(介于三个不同版本)
    Runtime——KVC,KVO原理
    完成头像上传功能:使用node+express实现将前端发送的base64格式的图片转化为png格式的图片并保存在文件夹,同时将相对路径保存在数据库中
    带你从0到1开发AI图像分类应用
    Python3-类基础
    LeetCode每日一题(985. Sum of Even Numbers After Queries)
    ffmpeg强制关键帧间隔(key frame, gop size, gop duration)
  • 原文地址:https://blog.csdn.net/qq_31762741/article/details/133746513