• FlinkSQL-UDF自定义数据源


    昨天遇到了一个比较好玩的需求,要测试Flink-iceberg的版本问题
    同时不能改动线上flink和iceberg的版本

    平台已经提供了,在iceberg专属的FlinkSQL端是可以勾选iceberg版本的
    但是自定义数据源插入iceberg一般用的是jar包,如何不通过jar直接通过SQL生成自定义数据源

    阿没错!那就是我们万能的UDF啦!UDF写一个connector,类似data-gen,想一想都很兴奋有木有,然后我发现,好像是没有这样的接口诶
    但是没有关系!函数可以实现万物,本身计算机大部分都是函数做得,如果你觉得函数做不了,那只是因为自己实现不了这样的函数

    在官方网站中我找到了灵感,一列对多列输出!
    好了就是你了,用datagen控制生成速率,输出一个无效值,然后通过接收无效值来生成我自己的行,好一个偷天换日呀
     

    那么直接用Java来实现flink UDF,接受一行,然后输出自己任意想模拟的数据

    1. package udf2;
    2. import org.apache.flink.table.annotation.DataTypeHint;
    3. import org.apache.flink.table.annotation.FunctionHint;
    4. import org.apache.flink.table.functions.TableFunction;
    5. import org.apache.flink.types.Row;
    6. import java.util.Random;
    7. @FunctionHint(output = @DataTypeHint("ROW"))
    8. public class generateRowUdtf extends TableFunction {
    9. public void eval(String a) {
    10. Random random = new Random();
    11. long id = Math.abs(random.nextLong()) % 20000;
    12. String data1 = Math.abs(random.nextInt()) % 2000000 + "";
    13. int data2 = Math.abs(random.nextInt()) % 2000;
    14. double data4 = Math.abs(Math.random() * 2000 + 1);
    15. collect(Row.of(id,data1,data2,data4));
    16. }
    17. }

    然后UDF上传,写下如下SQL,对这个表进行连接操作,SQL可以在本地进行测试~直接print即可看到效果,非常滴神奇~ 这样子我们的自定义生成器就做好啦(全是api,你做了个锤子)

    1. CREATE TEMPORARY table test_insert(
    2. id2 String
    3. )WITH(
    4. 'connector' = 'datagen',
    5. 'rows-per-second'='100',
    6. 'fields.id2.kind'='random',
    7. 'fields.id2.length'='8'
    8. );
    9. CREATE TEMPORARY SYSTEM FUNCTION generateRowUdtf AS 'udf2.generateRowUdtf';
    10. insert into xxx
    11. SELECT T.id, T.data1, T.data2, T.data4
    12. FROM test_insert AS S
    13. left join lateral table(generateRowUdtf(id2)) AS T(id,data1,data2,data4) on true;

  • 相关阅读:
    神经网络的问题总结
    openlayers点标记只在视图内展示,视图外不展示,用于大量数据展示
    java线程池
    ARouter出现 there‘s no route matched in group问题排查
    高速USB转JTAG/SPI/I2C/UART/GPIO应用
    Kubernetes速成课程:掌握容器编排的精髓
    KyLin离线安装OceanBase
    Java基于SSM的海淘商城系统
    LeetCode算法题解(动态规划,背包问题)|LeetCode416. 分割等和子集
    如何通过财务体系建设,助推企业数智化转型?
  • 原文地址:https://blog.csdn.net/zxc132465258/article/details/125912194