• 数据生态第三弹 | RocketMQ OpenMLDB Connector,实时数据到特征工程的高速传输


    导读:
    基于真实的企业业务场景,将线上实时数据更好更快地转化为 AI 可用特征是加速人工智能落地的有效路径之一。因此,OpenMLDB 积极打通数据生态上游,继面向实时消息队列 Pulsar、分布式流处理平台 Kafka 的两款 connector 发布后,OpenMLDB 和 RocketMQ 合作推出 RocketMQ OpenMLDB Connector,助力实时数据到特征工程的高速传输,加速人工智能工程化落地。
    未来 OpenMLDB 社区也将推出面向 Flink 的 connector,为开发者提供丰富的实时数据生态,为各类应用场景赋予高效特征工程能力。

    为什么选择 RocketMQ OpenMLDB Connector

    为了使 OpenMLDB 与 RocketMQ 拥有高效稳定的传输通道,RocketMQ OpenMLDB connector 具有诸多优秀特性,包括但不限于:
    易上手。无需编写任何代码,只需进行简单配置,便可通过 RocketMQ OpenMLDB Connector 将 RocketMQ 的消息流入 OpenMLDB 。简化的数据导入过程能大幅提升企业数据的有效使用率。
    易部署。能够根据不同场景的实际业务需求,选择在单机或集群上运行 RocketMQ OpenMLDB Connector ,助力企业构建实时数据管道。
    高可靠。RocketMQ OpenMLDB Connector 集群部署的方式具备 Failover 能力,可以将有问题节点的任务调度到正常节点并保证集群负载均衡,使企业能更专注和更高效地探索数据的商业价值。
    低延时。秒级延迟,满足实时数据及特征开发场景。

    RocketMQ OpenMLDB Connector

    Connector 概述

    定位

    RocketMQ Connect 是 RocketMQ 数据集成重要组件,它具备具有低延时,可靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。RocketMQ OpenMLDB Connector 是一个用于在 RocketMQ 和 OpenMLDB 之间可扩展的、可靠的流式传输数据的工具。让 RocketMQ 及 RocketMQ connect 生态组件导入数据到 OpenMLDB 变得简单。

    功能

    可以使 RocketMQ 的消息流入 OpenMLDB 在线存储。
    在这里插入图片描述

    Connector 插件编译

    RocketMQ OpenMLDB Connector

    $ git clone git@github.com:apache/rocketmq-connect.git
    $ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/
    $ mvn clean package -Dmaven.test.skip=true
    
    • 1
    • 2
    • 3

    最终将编译好的插件包放在 RocketMQ connect 指定的加载地址。

    Connector 演示

    流程介绍 RocketMQ OpenMLDB Connector 用于 OpenMLDB 线上模式的实时数据流接入。使用 connector 的简要流程,如下图所示。我们接下来将详细介绍每一步。整体上,使用流程可以概括为四步:
    • 启动 OpenMLDB 并创建数据库
    • 启动 RocketMQ 并创建 topic
    • 启动 RocketMQ OpenMLDB Connector
    • 进行测试或者正常使用
    在这里插入图片描述

    关键步骤
    以下仅列出使用此 connector 的关键步骤

    步骤1 | 启动 OpenMLDB
    启动 OpenMLDB,并创建数据库 rocketmq_test,用于测试。表可以被 RocketMQ Connector 自动创建,所以这里不需要手动创建表。

    cd /work
    ./init.sh
    echo "create database rocketmq_test;" | /work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client
    
    • 1
    • 2
    • 3

    步骤2 | 启动 RocketMQ

    RocketMQ 搭建,启动 RocketMQ
    1、下载RocketMQ
    $ wget https://dlcdn.apache.org/rocketmq/4.9.3/rocketmq-all-4.9.3-source-release.zip
    
    
    2、编译RocketMQ
    如果是已经编译好的请直接执行第3部启动RocketMQ
    
    
    $ unzip rocketmq-all-4.9.3-source-release.zip  
    $ cd rocketmq-all-4.9.3/  
    $ mvn -Prelease-all -DskipTests clean install -U  
    $ cd distribution/target/rocketmq-4.9.3/rocketmq-4.9.3
    
    
    3、启动RocketMQ
    启动namesrv
    $ nohup sh bin/mqnamesrv &  
    查看namesrv是否启动成功
    $ tail -f ~/logs/rocketmqlogs/namesrv.log  
    The Name Server boot success...
    
    
    启动broker
    $ nohup sh bin/mqbroker -n localhost:9876 &
    查看broker是否启动成功
    $ tail -f ~/logs/rocketmqlogs/broker.log    
    The broker[%s, 172.30.30.233:10911] boot success...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    步骤3 | 启动 RocketMQ OpenMLDB Connector
    首先, 搭建 RocketMQ connect runtime
    环境项目下载

    $ git clone git@github.com:apache/rocketmq-connect.git
    
    • 1

    构建项目

    $ cd rocketmq-connect
    $ mvn -Prelease-connect -DskipTests clean install -U
    
    • 1
    • 2

    修改配置connect-standalone.conf ,重点配置如下

    $ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
    $ vim conf/connect-standalone.conf
    
    • 1
    • 2
    # 当前的节点的独特Id
    workerId=DEFAULT_WORKER_1
    
    
    # REST API的端口地址
    httpPort=8081
    
    
    # 本地存储路径
    storePathRootDir=~/storeRoot
    
    
    # 需要修改为自己的rocketmq NameServer的端口地址
    # Rocketmq namesrvAddr
    namesrvAddr=127.0.0.1:9876  
    
    
    #需要修改为connector-plugins文件夹所在的位置
    # Source or sink connector jar file dir
    pluginPaths=/usr/local/connector-plugins/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    我们需要将 OpenMLDB RocketMQ Connector 编译好的包放入这个目录。命令如下:

    mkdir -p /usr/local/connector-plugins/rocketmq-connect-jdbc
    cd ../../../../
    cp connectors/rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins/rocketmq-connect-jdbc
    
    • 1
    • 2
    • 3

    使用 standalone 的模式启动 RocketMQ Connect Runtime 环境。

    $ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
    $ sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
    
    • 1
    • 2

    以下表示 Rocketmq connect runtime 运行成功:
    在这里插入图片描述
    在这里插入图片描述

    步骤4 | 测试
    • 创建 Mysql 数据表 ,并初始化测试数据
    • 创建 mysql source, 从测试表中拉取数据
    • 创建 OpenMLDB sink,将 source 拉取的数据写入到 OpenMLDB 中
    初始化 Mysql 测试数据;

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;
    
    
    -- ----------------------------
    -- Table structure for employee_test
    -- ----------------------------
    DROP TABLE IF EXISTS `employee_test`;
    CREATE TABLE `employee_test` (
      `id` bigint NOT NULL AUTO_INCREMENT,
      `name` varchar(128) DEFAULT NULL,
      `howold` int DEFAULT NULL,
      `male` int DEFAULT NULL,
      `company` varchar(128) DEFAULT NULL,
      `money` double DEFAULT NULL,
      `begin_time` datetime DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8;
    
    
    -- ----------------------------
    -- Records of employee_test
    -- ----------------------------
    BEGIN;
    INSERT INTO `employee_test` VALUES (2, 'name-02', 19, 7, 'company', 32232, '2021-12-29 08:00:00');
    INSERT INTO `employee_test` VALUES (4, 'gjk', 25, 8, 'company', 3232, '2021-12-24 20:43:36');
    INSERT INTO `employee_test` VALUES (12, 'name-06', 19, 3, NULL, NULL, NULL);
    INSERT INTO `employee_test` VALUES (14, 'name-08', 25, 15, 'company', 32255, '2022-02-08 19:06:39');
    COMMIT;
    
    
    SET FOREIGN_KEY_CHECKS = 1;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    创建并启动 RocketMQ conect mysql source connector ,如下所示:

    curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbc-mysql-source-test
    -d  '{
        "connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector",
        "max-task":"1",
        "connection.url":"jdbc:mysql://127.0.0.1:3306", 
        "connection.user":"*****",   
        "connection.password":"*****", 
        "table.whitelist":"test_database.employee_test",
        "mode": "incrementing",     // 增量拉取方式
        "incrementing.column.name":"id",   // 指定增量拉取的字段
        "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
    }'st
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    确认任务启动并开始拉取数据:
    在这里插入图片描述
    在这里插入图片描述

    建立一个 OpenMLDB RocketMQ sink connector 将数据写入到 OpenMLDB 表中,信息如下。(注:监听的 Topic 为 source 拉取表的表名)

    curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbc-openmldb-sink-test
    -d '{
        "connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
        "max-task":"1",
        "connect-topicname":"employee_test",
        "connection.url":"jdbc:openmldb:///rocketmq_test?zk=127.0.0.1:2181&zkPath=/openmldb_cluster",
        "insert.mode":"INSERT",
        "db.timezone":"UTC",
        "table.types":"TABLE",
        "auto.create":"true",
        "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
    }'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    观察数据是否入库然后,我们可以在 OpenMLDB 中查询是否插入成功。内容如下:

    set @@execute_mode='online';
    use rocketmq_test;
    select * from employee_test;
    
    • 1
    • 2
    • 3

    结果如下:
    在这里插入图片描述

    写在最后

    关于 OpenMLDB

    OpenMLDB 是一个开源机器学习数据库,致力于闭环解决 AI 工程化落地的数据治理难题。自 2021 年 6 月开源以来,OpenMLDB 优先发布了特征数据治理能力,依托 SQL 的开发能力,为企业提供全栈功能的、低门槛特征数据计算和管理平台。

    OpenMLDB 包含 Feature Store 的全部功能,并且提供更为完整的 FeatureOps 全栈方案。除了提供特征存储功能,还具有基于 SQL 的低门槛数据库开发体验、面向特征计算优化的 OpenMLDB Spark 发行版,针对实时特征计算优化的索引结构,特征上线服务、企业级运维和管理等功能,让特征工程开发回归于本质——专注于高质量的特征计算脚本开发,不再被工程化效率落地所羁绊。

    关于 RocketMQ

    Apache RocketMQ 是一款由阿里巴巴开源的低延迟、高并发、高可用、高可靠,可支撑万亿级数据洪峰的分布式消息中间件。2016 年阿里巴巴正式将 RocketMQ 捐献给 Apache 基金会,2017 年毕业成为 Apache 顶级开源项目。在中国,目前 Apache RocketMQ 已经被 75% 以上的互联网及金融公司所使用,国内包括阿里云在内的 10+ 云厂商均提供了 RocketMQ 商业服务,全球超过 500 位贡献者参与其中,近乎成为成为业务消息领域首选消息平台。

    随着云原生时代的到来,Apache RocketMQ 进行了全面的架构升级,5.0 版本的发布,标志着 RocketMQ 从Messaging 平台升级为云原生事件、消息流融合处理平台,用以帮助用户轻松构建事件驱动服务,满足轻量级计算诉求,放大数据价值。

    OpenMLDB 上下游生态体系

    为更好降低开发者使用 OpenMLDB 的门槛,OpenMLDB 社区将持续打造面向上下游技术组件的生态圈,为开发者提供更多简单易用的生态 Connector。
    • 面向线上数据生态,如 Pulsar(已完成),Kafka(已完成),RocketMQ(已完成),Flink,RabbitMQ 等
    • 面向离线数据生态,如 HDFS,HBase,Cassandra,S3 等
    • 面向模型构建的算法、框架,如 XGBoost,LightGBM,TensorFlow,PyTorch,Scikit Learn 等
    • 面向机器学习建模全流程的调度框架、部署工具,如 DolphinScheduler(已完成),Airflow,Kubeflow,Prometheus,Grafana 等
    在这里插入图片描述

    相关阅读

    Apache RocketMQ Quick Start
    ​​
    ​​https://rocketmq.apache.org/docs/quick-start/​​

    Apache RocketMQ Connect
    ​​
    ​​https://github.com/apache/rocketmq-connect​​

    OpenMLDB 官网
    ​​
    ​​https://openmldb.ai​​
    ​​​
    OpenMLDB github 主页

    ​​https://github.com/4paradigm/OpenMLDB​​
    ​​​
    OpenMLDB 文档 ​​快速上手

    ​​https://openmldb.ai/docs/zh/v0.5/quickstart/index.html​

    OpenMLDB 微信交流群
    在这里插入图片描述

  • 相关阅读:
    JavaScript事件执行机制
    2023考研常识知识之五类数学有哪些区别
    Golang学习笔记
    明日周刊-第7期
    springboot读取resources下文件方式
    c++视觉检测------Shi-Tomasi 角点检测
    基于STM32的OLED多级菜单GUI实现(简化版智能手表)
    OpenJudge NOI 2.1 250:Safecracker
    Kubernetes 介绍
    Python基于OpenCV的人脸识别自助商店(源码&部署视频)
  • 原文地址:https://blog.csdn.net/weixin_48409843/article/details/125505457