• FlinkSQL自定义UDTF使用的四种方式


    1.UDTF定义

    Table functions(表函数)⼀进多出(炸裂),继承TableFunction,提供⽆返回值的eval⽅法,使⽤collect来输出。Table functions的返回值是⼀个表,需要跟原来的表join才能得到最终结果,因此要⽤到侧写表(不明⽩的可以研究下LATERAL TABLE)

    2.数据集格式

    用户使用app的测试数据集如下所示:

    {"userId":9527,"day":"2022-05-12","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.browser","activetime":120000},{"package":"com.qq","activetime":80000}]}

    {"userId":9528,"day":"2022-05-13","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.weixin","activetime":150000},{"package":"com.youdao","activetime":60000}]}

    {"userId":9529,"day":"2022-05-14","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.ebay","activetime":40000},{"package":"com.baidu","activetime":120000}]}

    {"userId":9530,"day":"2022-05-15","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.huawei","activetime":180000},{"package":"com.leshi","activetime":20000}]}

    {"userId":9531,"day":"2022-05-16","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.xiaomi","activetime":150000},{"package":"com.wangyi","activetime":60000}]}

    在JSON对象中,userId表示用户ID,day表示日期,begintime表示开始时间,endtime表示结束时间。data表示用户使用应用的时长列表,其中package表示应用名称,activetime表示应用使用时长。

    3.自定义UDTF

    FlinkSQL自定义UDTF函数解析用户行为数据的具体代码实现如下所示。

    public class FlinkSQLBaseTableFunction {

    public static void main(String[] args) {

    //1.获取stream的执行环境

    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    //设置并行度

    senv.setParallelism(1);

    //2.创建表执行环境

    StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);

    //3.读取数据

    DataStream<String> lines = senv.socketTextStream("hadoop1",8888);

    //4.流转换为动态表

    Table table = tEnv.fromDataStream(lines,$("line"));

    tEnv.createTemporaryView("userbehavior",table);

    //5.1调用方式1:TableAPI内连接实现

    tEnv.from("userbehavior")

    .joinLateral(call(explodeFunction.class,$("line"),"data").as("package","activetime"))

    .select(

    call(JsonFunction.class,$("line"),"userId"),

    call(JsonFunction.class,$("line"),"day"),

    call(JsonFunction.class,$("line"),"begintime"),

    call(JsonFunction.class,$("line"),"endtime"),

    $("package"),

    $("activetime")

    ).execute().print();

    //5.2调用方式2:TableAPI左外连接实现

    tEnv.from("userbehavior")

    .leftOuterJoinLateral(call(explodeFunction.class,$("line"),"data").as("package","activetime"))

    .select(

    call(JsonFunction.class,$("line"),"userId"),

    call(JsonFunction.class,$("line"),"day"),

    call(JsonFunction.class,$("line"),"begintime"),

    call(JsonFunction.class,$("line"),"endtime"),

    $("package"),

    $("activetime")

    ).execute().print();

    tEnv.createTemporarySystemFunction("JsonFunction",JsonFunction.class);

    tEnv.createTemporarySystemFunction("explodeFunction",explodeFunction.class);

    //5.3 调用方式3:sql内连接实现

    tEnv.sqlQuery("select " +

    "JsonFunction(line,'userId')," +

    "JsonFunction(line,'day')," +

    "JsonFunction(line,'begintime')," +

    "JsonFunction(line,'endtime')," +

    "package," +

    "activetime " +

    "  from userbehavior " +

    ",lateral table(explodeFunction(line,'data')) "

    ).execute().print();

    //5.4调用方式4:sql左外连接实现

    tEnv.sqlQuery("select " +

    "JsonFunction(line,'userId')," +

    "JsonFunction(line,'day')," +

    "JsonFunction(line,'begintime')," +

    "JsonFunction(line,'endtime')," +

    "package," +

    "activetime " +

    "  from userbehavior " +

    "left join lateral table(explodeFunction(line,'data')) as sc(package,activetime) on true "

    ).execute().print();

    }

    /**

    * 自定义udf

    */

    public static class JsonFunction extends ScalarFunction {

    public String eval(String line,String key){

    //转换为JSON

    JSONObject baseJson = new JSONObject(line);

    String value = "";

    if(baseJson.has(key)){

    //根据key获取value

    return baseJson.getString(key);

    }

    return value;

    }

    }

    /**

    * 自定义udtf

    */

    @FunctionHint(output = @DataTypeHint("ROW<package STRING,activetime INT>"))

    public static class explodeFunction extends TableFunction{

    public void eval(String line,String key){

    //转换为JSON

    JSONObject baseJson = new JSONObject(line);

    //提取key为data的JSONArray数据

    JSONArray jsonArray = new JSONArray(baseJson.getString(key));

    //循环解析输出

    for(int i=0;i<jsonArray.length();i++){

    String col1 = jsonArray.getJSONObject(i).getString("package");

    Integer col2 = Integer.parseInt(jsonArray.getJSONObject(i).getString("activetime"));

    collect(Row.of(col1,col2));

    }

    }

    }

    }

    4.输入测试数据

    在Linux虚拟机的hadoop1节点中使用如下命令打开nc服务并输入测试数据集。

    [root@hadoop1 ~] nc -lk 8888

    5.运行结果

    FlinkSQL自定义UDTF函数之后,使用注册的JsonFunction和explodeFunction函数对用户行为数据解析之后的效果如下所示。

  • 相关阅读:
    Hadoop2.x一次分布式HA启动时 ERROR Cannot set priority of xxxx process 解决方式
    SQL的编写需要注意的问题
    5G移动通信网的定位技术发展趋势
    用IO多路复用实现 nginx 静态资源代理(C/Java/Golang)
    判断满二叉树、完全二叉树
    chapter7——处理字节顺序
    PDF文件上传转成base64编码并支持预览
    R语言基于指定规则、条件删除列表中的元素:使用purrr包的discard函数移除模型列表中的R方指标低于指定阈值的模型(列表元素为lm模型、筛选条件为R方)
    C++实现排序 - 02 归并排序、快速排序和堆排序
    小商品公众号微信店铺搭建的作用是什么
  • 原文地址:https://blog.csdn.net/dajiangtai007/article/details/125500063