• 案例推荐|助力智慧城市,Pulsar Functions 的边缘物联网场景实践


    作者介绍

    梁远鹏,云兴科技开发运维工程师,开源爱好者、Apache Pulsar 贡献者、OpenELB 和 OpenYurt 成员。活跃在 Apache Pulsar 社区,关注 Pulsar 在云原生领域的生态与 etcd 元数据中心在 Pulsar 的应用,关注 MQTT On Pulsar 协议处理器,同时也是 CNCF 项目 KEDA 中 Pulsar scaler 的贡献者。

    文章摘要

    本文整理自 7 月 Apache Pulsar Meetup 梁远鹏题为《Pulsar Functions 在智慧城市中的实践》的分享。Pulsar Functions 作为在 MQ 场景的 FaaS 理念践行者,对于事件驱动的物联网场景来说天然适合,在将 Apache Pulsar 作为消息队列之后,使用 Pulsar Functions 可以说是一件顺势而为的事情。本文主要围绕云兴科技在发展智慧城市业务中为何选型及如何应用 Pulsar Functions 经验展开。

    a66d61a09de7ff6615fc9d64f7b01123.png

    扫码查看演讲视频

    业务介绍

    云兴科技被称为“‘厕所革命’ 智慧洗手间的先行者”,其主要业务是智慧洗手间。智慧洗手间是智慧城市的一环,智慧厕所结合了物联网、大数据、云计算、网络传输、传感器等技术,使传统厕所具备即时感知、准确判断和精确执行的能力。可以结合传感器等物联网和大数据技术帮助企业做数字化转型,进行精细化管理。例如智慧洗手间可以显示当前可用小间的数量、空气质量等数据。

    IoT 物联网传感器作为硬件产品基础,连接和驱动起了整个业务流程,业务涉及到的传感器多种多样,如下图所示:

    4c0a9bf214c984cf9092e4a45746eb2c.png

    该业务的场景架构如下图:

    aa8b5d3c81319311ce5ebe5132daa722.png

    目前整个部署,由于硬件终端侧的资源有限,所以均以 Pulsar 单机模式部署,这点和其他海量数据场景有所区别,也是边缘物联网场景的特点之一。

    针对上述架构图,接下来做个简要分析:

    1. 1. 多种传感器都会通过无线方式将数据传输到网关,网关层则通过互联网,经过 MQTT 将数据上传到服务器。

    2. 2. LoraWan Server 负责对数据包解密解码,之后将数据发送到另一个 MQTT,让第三方应用经过这个 MQTT 获取传感器的实际数据。

    3. 3. 云兴科技有服务 P 和 服务 S 两个服务,P 主要负责解析传感器实际数据,之后将数据通过 Pulsar 传送到服务 S,后者以 SocketIO 的方式推送到用户页面显示出来;

    4. 4. 服务 P 还会将传感器数据同时推送到客户服务,方便客户进行分析处理。

    云兴科技希望使用 Pulsar Functions 对以上架构进行迭代,替代传感器解析逻辑、数据推送到客户服务等逻辑,从而让架构在基于 Pulsar Function 的基础上更具扩展性。

    为何选择 Pulsar Functions

    云兴科技选择 Pulsar Functions,主要有三大原因:

    1. 1. 足够简单、轻量:Pulsar Functions 与 Pulsar 集成在一起,不需要学习新的语法知识。

    2. 2. 无额外组件,减少运维需求。

    3. 3. 多语言生态:随着 Function 越来越多,如果需要将某个 Function 从一种语言迁移到另一种,就可以利用 Pulsar Functions 的多语言生态轻松实现。由于 Function 本身的逻辑很小,迁移工作也会很简单。

    各阶段 Pulsar Function 一览

    2021 年,团队第一个 Pulsar Function 上线:

    2c8386535b1555fd94907eadfd4baab9.png

    这里使用的是 Pulsar 2.8.1 版本。这个 Function 的逻辑很简单,就是将服务 P 推送数据到客户服务的逻辑改为放入 HTTP Sink Function,在 Function 中发起 HTTP 请求推送数据。

    下一个 Function 很快上线,主要提供 HTTP 和 MQTT 两种方式供客户选择:

    2c8242cb7a567e25d0a37beb5e748e90.png

    从名称上可以看出,第二个 Function-MQTT Sink Function 是将数据以 MQTT 协议的方式发送到客户的 MQTT 服务器,也是很简单的处理逻辑,非常的单一。

    在上线上述两个 Function 之后,公司很长一段时间内都没有再新增其他的 Function,因为场景较为单一,因此也比较稳定。

    在 Pulsar 生态中,有一个组件叫 Pulsar Sink,但是我们统一使用 Function。之所以没有选用 Pulsar Sink 而是单独实现 Function 主要有两点原因:

    1. 1. Function 足够简单,只需从进程(Process)方法拿到数据就可以直接使用,且可以与其他 Function 统一风格,方便团队成员使用。

    2. 2. Function 的形式较为灵活,可以在 Function 逻辑中单独输出到其他位置。

    在后来的一次业务发展契机下,团队后续上线了更多 Function:

    5eaf0de8761eaa8c6700b628fe3a0a8e.png

    随着传感器种类的增多,客户也加入了新的种类的传感器,所以团队计划对服务 P 进行主要升级,将大部分逻辑转向 Functions 实现。

    新上线的第一个是 Parser Function,负责接入所有传感器输入数据并解析,输出到 Prometheus Sink Function(后者用 Go 语言编写),将数据存储到 Prometheus,同时会输出到服务 P。这时候服务 P 会填充一些内容逻辑策略到对应的 Topic 中,再将 Topic 发送到对应的传感器 Function 逻辑。

    例如 AQI Level Window Function 就是负责处理空气质量检测任务。传感器逻辑处理之后发送到 SensorAlert Function,后者负责检查输入数据是否需要告警,例如空气质量是否超过客户定制阈值,如超过阈值就会发送告警到服务 S,服务 S 再将告警推送到用户页面。

    上图下半部分主要负责将数据推送到客户服务,也就是前面提到的第一个 Function 和第二个 Function。

    接下来介绍一个 Pulsar Sink Function 案例:

    a4e6c005d7ad4643e117d488a603b974.png

    某日区域 B 的 服务 P 宕机,导致服务 S 没有数据来源,这时用户页面陷入静止状态。服务 P 又无法短时间恢复,所以团队让区域 A 的服务 P 从区域 B 的 MQTT 拉取传感器数据,然后在区域 A 的环境中进行处理,最终通过 Pulsar Sink Function 推送到区域 B 的 Pulsar,使后者的服务 S 可以继续消费数据。简单来说,就是使用 Pulsar Function 向另一个 Pulsar 集群做单向的数据复制。

    97aaa7234a2a585dd5b5c5120488faac.png

    使用的 Function 运行时及语言

    当前主要使用线程(Thread)运行时和进程(Process)运行时,它们的使用场景各有不同。有些客户只能在边缘部署服务器,服务无法接入互联网,可用资源有限,此时会使用线程运行时。

    所有的 Function 都运行在 Pulsar Broker 中,和 Broker 处于同一个 JVM 内。团队在对 Go 实现的 Function 上做了处理,让 Prometheus Sink Function 同时支持 Function 模式运行和单独的 Go 进程模式运行(以 Go 二进制程序的方式)。

    在云端环境中使用进程运行时。此时所有 Function 交给 Pulsar Function Worker 管理,以进程运行时部署 Function。

    管理多个 Function

    应对多个 Function 时,团队开发了一个 Spring-boot-starter,用来管理各个 Function 的配置以及部署。下面是一个配置示例:

    1. ps:
    2.   functions:
    3.     occupy:
    4.       enabled: false
    5.       autoUpdate: false
    6.       classname: com.smartoilets.pulsar.functions.OccuopyFunction
    7.       inputs: persistent://public/default/smt_ecf_occupy_function
    8.       output: persistent://public/default/smt_ecf_occupy_new
    9.       jar: functions/pulsar-functions-latest1.15.jar
    10.       logTopic: persistent://public/default/function_log
    11.       userConfig:
    12.         occupylog: false
    13.     aqLevel:
    14.       enabled: false
    15.       ...

    它会不断调用 Pulsar Function AdminAPI 来监控 Function:

    af6d0f2536b064bd7d06d91df86bad28.png

    这样只需通过配置文件修改就可以轻松上下线一个 Function。团队还有一个配置中心,配置中心更改内容后会通知服务 S 的 starter,后者不断调用 AdminAPI 检查当前 Function 各种参数状态是否与配置一致,如果不一致会调用 API 更新 Function,达到动态更新 Function 的效果。

    当存在多个 Function 时,要知道各个 Function 是否在运行,也会通过 AdminAPI 不断请求 Function 状态:

    829bf53676db263756e4e6294b7f742a.png

    在边缘部署时,由于资源有限、没有互联网连接,只能在获得状态数据后打印日志。在云端则会发出邮件告警,并将状态数据推送到 Prometheus。

    以上内容则是目前阶段 Pulsar Function 在云兴科技的边缘场景以及云端的一个应用情况。

    未来展望与计划

    在运行多个 Function 后,团队未来计划上线更多 Function,并将增强 Function 初始化和关闭操作和 Function 可观测性。目前仅可对 Function 定时监测,希望在未来可以改进优化这个情况。

    团队也会积极拥抱社区,更多地参与社区活动并分享落地经验,期待与社区多多交流。

    推荐阅读


    关注「Apache Pulsar」,获取干货与动态

    👇🏻扫码回复 mesh,加入 Pulsar Functions  👇🏻

    da730fb6a66b509e7014b40afd78afbb.jpeg


    4513bc111b7a12c1e57a4ef2caf375ba.jpeg

    Pulsar Summit Asia 2022 演讲征集中,点击提交议题👆🏻

    点击阅读原文,提交议题。

  • 相关阅读:
    函数重入、函数重载、函数重写自己理解
    【记录】Truenas scale|Truenas 的 SSH 服务连不上 VScode,终端能连上
    如何在 Vue 中使用 防抖 和 节流
    【JavaSE】多线程篇(四)线程的同步机制、互斥锁、线程死锁与释放锁
    聊一聊医疗器械的可用性
    Spring Security OAuth实现Gitee快捷登录
    Vue源码cached解析
    vue3触发store的时机和使用store中的变量
    网络安全(黑客)自学
    PyRosetta 安装方法之Conda安装
  • 原文地址:https://blog.csdn.net/zhaijia03/article/details/126984093