• FlinkSQL自定义UDTF使用的四种方式


    1.UDTF定义

    Table functions(表函数)⼀进多出(炸裂),继承TableFunction,提供⽆返回值的eval⽅法,使⽤collect来输出。Table functions的返回值是⼀个表,需要跟原来的表join才能得到最终结果,因此要⽤到侧写表(不明⽩的可以研究下LATERAL TABLE)

    2.数据集格式

    用户使用app的测试数据集如下所示:

    {"userId":9527,"day":"2022-05-12","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.browser","activetime":120000},{"package":"com.qq","activetime":80000}]}

    {"userId":9528,"day":"2022-05-13","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.weixin","activetime":150000},{"package":"com.youdao","activetime":60000}]}

    {"userId":9529,"day":"2022-05-14","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.ebay","activetime":40000},{"package":"com.baidu","activetime":120000}]}

    {"userId":9530,"day":"2022-05-15","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.huawei","activetime":180000},{"package":"com.leshi","activetime":20000}]}

    {"userId":9531,"day":"2022-05-16","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.xiaomi","activetime":150000},{"package":"com.wangyi","activetime":60000}]}

    在JSON对象中,userId表示用户ID,day表示日期,begintime表示开始时间,endtime表示结束时间。data表示用户使用应用的时长列表,其中package表示应用名称,activetime表示应用使用时长。

    3.自定义UDTF

    FlinkSQL自定义UDTF函数解析用户行为数据的具体代码实现如下所示。

    public class FlinkSQLBaseTableFunction {

    public static void main(String[] args) {

    //1.获取stream的执行环境

    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    //设置并行度

    senv.setParallelism(1);

    //2.创建表执行环境

    StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);

    //3.读取数据

    DataStream<String> lines = senv.socketTextStream("hadoop1",8888);

    //4.流转换为动态表

    Table table = tEnv.fromDataStream(lines,$("line"));

    tEnv.createTemporaryView("userbehavior",table);

    //5.1调用方式1:TableAPI内连接实现

    tEnv.from("userbehavior")

    .joinLateral(call(explodeFunction.class,$("line"),"data").as("package","activetime"))

    .select(

    call(JsonFunction.class,$("line"),"userId"),

    call(JsonFunction.class,$("line"),"day"),

    call(JsonFunction.class,$("line"),"begintime"),

    call(JsonFunction.class,$("line"),"endtime"),

    $("package"),

    $("activetime")

    ).execute().print();

    //5.2调用方式2:TableAPI左外连接实现

    tEnv.from("userbehavior")

    .leftOuterJoinLateral(call(explodeFunction.class,$("line"),"data").as("package","activetime"))

    .select(

    call(JsonFunction.class,$("line"),"userId"),

    call(JsonFunction.class,$("line"),"day"),

    call(JsonFunction.class,$("line"),"begintime"),

    call(JsonFunction.class,$("line"),"endtime"),

    $("package"),

    $("activetime")

    ).execute().print();

    tEnv.createTemporarySystemFunction("JsonFunction",JsonFunction.class);

    tEnv.createTemporarySystemFunction("explodeFunction",explodeFunction.class);

    //5.3 调用方式3:sql内连接实现

    tEnv.sqlQuery("select " +

    "JsonFunction(line,'userId')," +

    "JsonFunction(line,'day')," +

    "JsonFunction(line,'begintime')," +

    "JsonFunction(line,'endtime')," +

    "package," +

    "activetime " +

    "  from userbehavior " +

    ",lateral table(explodeFunction(line,'data')) "

    ).execute().print();

    //5.4调用方式4:sql左外连接实现

    tEnv.sqlQuery("select " +

    "JsonFunction(line,'userId')," +

    "JsonFunction(line,'day')," +

    "JsonFunction(line,'begintime')," +

    "JsonFunction(line,'endtime')," +

    "package," +

    "activetime " +

    "  from userbehavior " +

    "left join lateral table(explodeFunction(line,'data')) as sc(package,activetime) on true "

    ).execute().print();

    }

    /**

    * 自定义udf

    */

    public static class JsonFunction extends ScalarFunction {

    public String eval(String line,String key){

    //转换为JSON

    JSONObject baseJson = new JSONObject(line);

    String value = "";

    if(baseJson.has(key)){

    //根据key获取value

    return baseJson.getString(key);

    }

    return value;

    }

    }

    /**

    * 自定义udtf

    */

    @FunctionHint(output = @DataTypeHint("ROW<package STRING,activetime INT>"))

    public static class explodeFunction extends TableFunction{

    public void eval(String line,String key){

    //转换为JSON

    JSONObject baseJson = new JSONObject(line);

    //提取key为data的JSONArray数据

    JSONArray jsonArray = new JSONArray(baseJson.getString(key));

    //循环解析输出

    for(int i=0;i<jsonArray.length();i++){

    String col1 = jsonArray.getJSONObject(i).getString("package");

    Integer col2 = Integer.parseInt(jsonArray.getJSONObject(i).getString("activetime"));

    collect(Row.of(col1,col2));

    }

    }

    }

    }

    4.输入测试数据

    在Linux虚拟机的hadoop1节点中使用如下命令打开nc服务并输入测试数据集。

    [root@hadoop1 ~] nc -lk 8888

    5.运行结果

    FlinkSQL自定义UDTF函数之后,使用注册的JsonFunction和explodeFunction函数对用户行为数据解析之后的效果如下所示。

  • 相关阅读:
    2023秋招面经记录
    DFS-组合的输出
    ping: mirrors.aliyun.com: Temporary failure in name resolution
    Java.lang.Byte类之parseByte(String s, int radix)方法的简介说明
    大家都能看得懂的源码-如何让定时器在页面最小化的时候不执行?
    学神经网络需要什么基础,神经网络快速入门
    带你深入学习k8s--(四) 控制器(k8s核心)
    拿铁DHT-PHEV座舱智能程度体验笔记(超详细)
    .NET性能优化-使用SourceGenerator-Logger记录日志
    【青书学堂】2023年第二学期 PhotoShop基础与应用(高起专) 作业
  • 原文地址:https://blog.csdn.net/dajiangtai007/article/details/125500063