昨天遇到了一个比较好玩的需求,要测试Flink-iceberg的版本问题
同时不能改动线上flink和iceberg的版本
平台已经提供了,在iceberg专属的FlinkSQL端是可以勾选iceberg版本的
但是自定义数据源插入iceberg一般用的是jar包,如何不通过jar直接通过SQL生成自定义数据源
阿没错!那就是我们万能的UDF啦!UDF写一个connector,类似data-gen,想一想都很兴奋有木有,然后我发现,好像是没有这样的接口诶
但是没有关系!函数可以实现万物,本身计算机大部分都是函数做得,如果你觉得函数做不了,那只是因为自己实现不了这样的函数
在官方网站中我找到了灵感,一列对多列输出!
好了就是你了,用datagen控制生成速率,输出一个无效值,然后通过接收无效值来生成我自己的行,好一个偷天换日呀
那么直接用Java来实现flink UDF,接受一行,然后输出自己任意想模拟的数据
- package udf2;
-
- import org.apache.flink.table.annotation.DataTypeHint;
- import org.apache.flink.table.annotation.FunctionHint;
- import org.apache.flink.table.functions.TableFunction;
- import org.apache.flink.types.Row;
-
- import java.util.Random;
-
- @FunctionHint(output = @DataTypeHint("ROW
")) - public class generateRowUdtf extends TableFunction
{
- public void eval(String a) {
- Random random = new Random();
- long id = Math.abs(random.nextLong()) % 20000;
- String data1 = Math.abs(random.nextInt()) % 2000000 + "";
- int data2 = Math.abs(random.nextInt()) % 2000;
- double data4 = Math.abs(Math.random() * 2000 + 1);
- collect(Row.of(id,data1,data2,data4));
- }
- }
然后UDF上传,写下如下SQL,对这个表进行连接操作,SQL可以在本地进行测试~直接print即可看到效果,非常滴神奇~ 这样子我们的自定义生成器就做好啦(全是api,你做了个锤子)
- CREATE TEMPORARY table test_insert(
- id2 String
- )WITH(
- 'connector' = 'datagen',
- 'rows-per-second'='100',
- 'fields.id2.kind'='random',
- 'fields.id2.length'='8'
- );
- CREATE TEMPORARY SYSTEM FUNCTION generateRowUdtf AS 'udf2.generateRowUdtf';
-
- insert into xxx
- SELECT T.id, T.data1, T.data2, T.data4
- FROM test_insert AS S
- left join lateral table(generateRowUdtf(id2)) AS T(id,data1,data2,data4) on true;