• MLSQL(Byzer)的快速入门


    建议在阅读本教程之前先阅读官方文档
    https://docs.byzer.org/#/byzer-lang/zh-cn/

    在这里插入图片描述

    数据加载/Load

    load,代表读入数据源的行为

    load一张mysql表

    load jdbc.`stg_sxyxpz_nw.DecDeclareFlowConf` options and driver="com.mysql.jdbc.Driver"
    and url="jdbc:mysql://127.0.0.1:9030/stg_sxyxpz_nw?useSSL=false"
    and dbtable="`stg_sxyxpz_nw`.`DecDeclareFlowConf`"
    and user="fs_nwbj"
    and password="123456"
    as DecDeclareFlowConf;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    load多张mysql表

    -- 01加载数据源
    connect jdbc where
    driver="com.mysql.jdbc.Driver"
    and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false"
    and user="fs_nwbj"
    and password="123456"
     as ods_yxjc_prod;
    load jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` as ods_SmpOnethingModule;
    load jdbc.`ods_yxjc_prod.ods_SmpOnethingCatalog` as ods_SmpOnethingCatalog;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    load的时候添加原生语句的支持

    -- 01加载数据源t_item_implement
    connect jdbc where 
    driver="com.mysql.jdbc.Driver"
    and url="jdbc:mysql://127.0.0.1:9030/stg_sx?useSSL=false"
    and user="fs_nwbj"
    and password="123456"
     as stg_sx;
    load jdbc.`stg_sx.t_item_implement` where
    directQuery = '''
    SELECT * FROM t_item_implement WHERE `STATUS` = "1" and IN_CURRENT = "1" AND ITEM_TYPE = "01"
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这样就避免了全量加载导致的OOM

    load一张Doris

    load doris.`stg_sx.t_item_material`   --注意表名和doris.table.identifier一致
    options doris.table.identifier="stg_sx.t_item_material"  --需指定,与上一行要load的表名一致
    and doris.fenodes="127.0.0.1:8030"
    and user="fs_nwbj"
    and password="123456" 
    and doris.request.tablet.size="3"  -- 可选参数
    as t_item_material;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可选参数说明:
    1)doris.request.tablet.size
    默认值:
    Integer.MAX_VALUE
    说明:
    一个RDD Partition对应的Doris Tablet个数。
    此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。
    (如一个表的tablet=10,设置的doris.request.tablet.size=3,则spark按4个并行度进行数据读取(10/3=3.3≈4),即读取数据的时候可按tablet分成4个并行任务,提高效率,但“同时会对Doris造成更大的压力”,适当调整)

    load多张Doris表

    load doris.`ods_yxjc_prod.ods_SmpOnethingFlowNode`   --注意表名和doris.table.identifier一致
    options doris.table.identifier="ods_yxjc_prod.ods_SmpOnethingFlowNode"  --需指定,与上一行要load的表名一致
    and doris.fenodes="127.0.0.1:8030"
    and user="fs_nwbj"
    and password="123456"
    and doris.request.tablet.size="3"
    as ods_SmpOnethingFlowNode;
    
    load doris.`ods_yxjc_prod.ods_SmpOnethingHditmDir`   --注意表名和doris.table.identifier一致
    options doris.table.identifier="ods_yxjc_prod.ods_SmpOnethingHditmDir"  --需指定,与上一行要load的表名一致
    and doris.fenodes="127.0.0.1:8030"
    and user="fs_nwbj"
    and password="123456"
    and doris.request.tablet.size="3"
    as ods_SmpOnethingHditmDir;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    每张表都是一个源,需要单独写连接和用户密码,不支持一个connetor重复使用load表

    注册函数,模型/Register

    Register动态注册 Java/Scala 写的 UDF/UDAF 函数

    注册一个java函数

    -- 设置java函数
    set decDeclareProxyAcceptConfighandleScope ='''
    public class DecDeclareProxyAcceptConfighandleScope {
        public String is_full_city_handle(String value) {
            if (value == null || "".equals(value)) {
                return "0";
            }
            return value.contains("city") ? "1" : "0";
        }
    }
    
    ''';
    load script.`decDeclareProxyAcceptConfighandleScope` as scriptTable;
    register ScriptUDF.`scriptTable` as is_full_city_handle
    options lang="java"
    and className = "DecDeclareProxyAcceptConfighandleScope"
    and methodName = "is_full_city_handle";
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    注册多个java函数

    -- 设置java函数
    set smpOnethingModuleExtendContent ='''
    public class SmpOnethingModuleExtendContent {
        public String project_type(String value) {
            if (value == null || "".equals(value)) {
                return null;
            }
            int valueIndex = value.indexOf("infoType=");
            int indexOf = value.indexOf(",", valueIndex);
            int beginIndex = valueIndex + "infoType=".length();
            if (indexOf < beginIndex) {
                return null;
            }
            String substring = value.substring(beginIndex, indexOf);
            switch (substring) {
                case "承诺上报件":
                    return "0";
                case "承诺件":
                    return "1";
                case "即办件":
                    return "2";
                case "联办件":
                    return "3";
                default:
                    return null;
            }
        }
    
        public String project_type_text(String value) {
            if (value == null || "".equals(value)) {
                return null;
            }
            int valueIndex = value.indexOf("infoType=");
            if (valueIndex == -1) {
                return null;
            }
            int indexOf = value.indexOf(",", valueIndex);
            return value.substring(valueIndex + "infoType=".length(), indexOf);
    
        }
    
     
    ''';
    load script.`smpOnethingModuleExtendContent` as scriptTable;
    register ScriptUDF.`scriptTable` as project_type
    options lang="java"
    and className = "SmpOnethingModuleExtendContent"
    and methodName = "project_type";
    register ScriptUDF.`scriptTable` as project_type_text
    options lang="java"
    and className = "SmpOnethingModuleExtendContent"
    and methodName = "project_type_text";
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    数据转换/Select

    这个select就和我们mysql中的sql没有区别,他完全兼容 Spark SQL,除了select 句式最后 as 表名

    select一张表

    select
        unid,
        jointTrialModel,
        jointTrialOvertime
    from SmpOnethingJointTrialConfig as res01;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    select多张表

    select
        SmpOnethingModule.unid,
        SmpOnethingModule.moduleName,
        core_tag.value as module_class_tag_name
    from SmpOnethingModule
             left join core_tag on core_tag.unid = SmpOnethingModule.moduleClassTag
        as res01
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    select使用自带函数

    select
        SmpOnethingModule.unid,
        SUBSTRING(SmpOnethingModule.areaCode,1,6) as region_city_code,
    from SmpOnethingModule as res01;
    
    • 1
    • 2
    • 3
    • 4

    select使用上方注册的java函数

    select
        SmpOnethingModule.unid,
        project_type(SmpOnethingModule.extendContent) as project_type,
        project_type_text(SmpOnethingModule.extendContent) as project_type_text
    from SmpOnethingModule as res01;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    select使用自带语句

    select
        catalogName,
        catalogType,
        catalogCode,
        legalStandard,
        case status
        when 'Y' then '1'
            when 'N' then '0'
            else null end as status,
        deptName,
        deptUnid,
        exerciseLevel,
        creditCode,
        case onethingType
            when 'L0' then '0'
            when 'L1' then '1'
            else null end as onethingType
    from SmpOnethingCatalog as res01;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    select使用子查询

    select
        SmpOnethingModule.unid,
        (SELECT count(*) FROM SmpOnethingFlowNode WHERE moduleUnid = SmpOnethingModule.unid) as integrate_single_item_num
    from SmpOnethingModule as res01;
    
    • 1
    • 2
    • 3
    • 4

    保存数据/Save

    save 句式类似传统 SQL 中的 insert 语法

    保存select结果到mysql数据表中

    save overwrite res01 as jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` options
    and driver="com.mysql.jdbc.Driver"
    and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
    and dbtable="`ods_yxjc_prod`.`ods_SmpOnethingModule`"
    and user="fs_nwbj"
    and password="123456"
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    保存的时候清空mysql数据表

    -- --save保存
    save overwrite res01 as jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` options
    and driver="com.mysql.jdbc.Driver"
    and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
    and dbtable="`ods_yxjc_prod`.`ods_SmpOnethingModule`"
    and user="fs_nwbj"
    and password="123456"
    and truncate="true";
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    更新一条数据

    如果主键id一样直接append就会覆盖

    save append res01 as jdbc.`dws_yxjc_nw.yxjc_index_full` options 
    and driver="com.mysql.jdbc.Driver"
    and url="jdbc:mysql://127.0.0.1:9030/dws_yxjc_nw?useSSL=false&rewriteBatchedStatements=true"
    and dbtable="`dws_yxjc_nw`.`yxjc_index_full`"
    and user="fs_nwbj"
    and password="123456"
    and truncate="true";
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    清空Doris数据表

    执行DDL语句truncate表格

    run command as JDBC.`byzer_demo._` where 
    driver="com.mysql.jdbc.Driver"
    and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
    and user="fs_nwbj"
    and password="123456"
    and `driver-statement-0`="truncate table ods_yxjc_prod.ods_item_material_test";
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    保存select结果到Doris数据表中

    -- 执行写入语句(暂时不支持overwrite .... truncate=true,有需要添加上面那句)
    save append res01 as doris.`ods_yxjc_prod.ods_item_material_test` 
    and doris.table.identifier="ods_yxjc_prod.ods_item_material_test"
    and doris.fenodes="127.0.0.1:8030"
    and user="fs_nwbj"
    and password="123456" ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    Xilinx 7系列FPGA配置(ug470)
    Terraform 华为云最佳实践
    《Redis篇》Another Redis DeskTop Manager 超详细安装教程
    C语言中的typedef用法
    spring上传文件
    MySQL当前链接状态查询
    c语言中extern定义和引用其他文件的变量,(sublime text)单独一个文件编译不会成功
    mysql 与 Oracle 的区别,oracle 与 mysql分页查询的区别
    通过S3协议实现通用的文件存储服务中间件
    如何通过postMessage实现跨源和跨窗口通信?
  • 原文地址:https://blog.csdn.net/csdnerM/article/details/126379992