• ChunJun: 自定义插件


    序言

    Chunjun的版本兼容可能会有问题,在我们了解了自定义插件后,在修改源码以应对不同的场景就会得心应手了,针对Chunjun1.12.Release版本说明cuiyaonan2000@163.com 

    自定义插件整体流程

    数据流的角度来看ChunJun,可以理解为不同数据源的数据流通过对应的ChunJun插件处理,变成符合ChunJun数据规范的数据流;脏数据的处理可以理解为脏水流通过污水处理厂,变成符合标准,可以使用的水流,而对不能处理的水流收集起来。----总的来说跟Flink的数据处理一样,只是增加了一个插件的概念用于处理不同的数据源,并生成对应的Flink任务cuiyaonan2000@163.com

    插件开发不需要关注任务具体如何调度,只需要关注关键问题:

    1. 数据源本身读写数据的正确性;
    2. 如何合理且正确地使用框架;
    3. 配置文件的规范,每个插件都应有对应的配置文件;

    每个插件应当有以下目录:

    1. conf:存放插件配置类的包。
    2. converter:存放插件数据类型转换规则类的包。
    3. source:存放插件数据源读取逻辑有关类的包。
    4. sink:存放插件数据源写入逻辑有关类的包。
    5. table:存放插件数据源sql模式有关类的包。  -----这个应该不是我们的重点,flink的sql并不好cuiyaonan2000@163.com
    6. util:存放插件工具类的包,chunjun已经封装了一些常用工具类在chunjun-core模块中,如果还需编写插件工具类的请放在该插件目录中的util包

    以Stream插件为例子,他的插件结构如下图所示:

    调试

    Debug调试

    (1)本地调试

    在chunjun-local-test模块中,官方已经写好了本地测试的LocalTest类,只需更改脚本文件路径,在代码处打上断点即可调试。

    (2)远程调试

    如果需要远程调试,那么需要在 flink-conf.yaml 中增加 Flink 的远程调试配置,然后在 idea 中配置”JVM Remote“,在代码块中打断点(这种方法还能调试 Flink 本身的代码)

     
    

    env.java.opts.jobmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 env.java.opts.taskmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006

    只需要修改标记的这两个地方,如果是 HA 集群,需要根据日志修改怎么看日志,怎么修改,自行查资料

    至此,任务 idea 调试流程就这些内容。

    任务类型

    从Chunjun的配置文件Json中可以看到任务的分类

    • sync:同步任务,同理有同步任务的读插件和写插件,即sync(reader),sync(writer)
    • sql:计算任务,,同理有计算任务的读插件和写插件,即sync(reader),sync(writer)

    reader

    开发流程

    以Stream插件为例

    插件数据源读取逻辑需要继承BaseRichInputFormat类,BaseRichInputFormat是具体的输入数据的操作,包括open、nextRecord、close,每个插件具体操作自己的数据,InputFormat公共内容都在BaseRichInputFormat,不要随意修改。

    创建StreamInputFormat类继承BaseRichInputFormat类,重写其中的必要方法。

    1. public class StreamInputFormat extends BaseRichInputFormat {
    2. //创建数据分片
    3. @Override
    4. public InputSplit[] createInputSplitsInternal(int minNumSplits) {......}
    5. //打开数据连接
    6. @Override
    7. public void openInternal(InputSplit inputSplit) {......}
    8. //读取一条数据
    9. @Override
    10. public RowData nextRecordInternal(RowData rowData) throws ReadRecordException {......}
    11. //判断数据是否读取完毕
    12. @Override
    13. public boolean reachedEnd() {......}
    14. //关闭数据连接
    15. @Override
    16. protected void closeInternal() {......}
    17. }

    由此可见StreamInputFormat 具体的实施类,但是在调用实现类的方法前还有引导类的创建,具体流程是:StreamSourceFactory-->StreamInputFormatBuilder-->StreamInputFormat 中间会引用StreamConf和StreamColumnConverter  至此一个source就完成了cuiyaonan2000@163.com

    业务流程

    1 com.dtstack.chunjun.Main是启动类,首先判断是计算任务,还是同步任务

    2 以exeSyncJob为例进入可以看到,这里就是根据我们传入的Json文件内容生成环境变量

    3 .将上面解析生成的SyncConf,然后通过反射加载具体的插件调用createSource方法生成DataStream,  注意这里就是重点了根据 我们的json文件的内容,来获取StreamSourceFactory ,然后创建的数据内容是DataStream----从这里开始就是重点了

    4 createSource方法中会构建inputformat对象,然后调用createInput方法,将inputformat对象封装至DtInputFormatSourceFunction中。

    未完待续~~~

  • 相关阅读:
    Redis面试题
    cs224w(图机器学习)2021冬季课程学习笔记8
    04.PD与数据库关系模型简介
    键鼠自动化2.0树形结构讲解
    MyBatis注解开发实现学生管理页面(分页pagehelper,多条件搜索,查看课程信息)
    快消品b2b电子商务网站建设方案
    vue2升级vue3:vue-i18n国际化异步按需加载
    【附源码】计算机毕业设计JAVA疫情下的居民管理系统
    信息物理系统状态估计与传感器攻击检测
    adb 指令
  • 原文地址:https://blog.csdn.net/cuiyaonan2000/article/details/133268356