• FlinkSQL自定义UDAF使用的三种方式


    1.UDAF定义

    Aggregate functions(聚合函数)将多行的标量值映射到新的标量值(多进一出),聚合函数用到了累加器,下图是聚合过程:

    Aggregate functions聚合函数实现的核心步骤如下。

    (1)继承AggregateFunction

    (2)必须覆盖createAccumulator和getValue

    (3)提供accumulate方法

    (4)retract⽅法在OVER windows上才是必须的

    (5)merge有界聚合以及会话窗⼝和滑动窗口聚合都需要(对性能优化也有好处)

    2.数据集格式

    学生学科考试成绩数据集如下所示:

    1,"zhangsan","Chinese",90

    1,"zhangsan","Math",74

    1,"zhangsan","English",100

    2,"lisi","Chinese",86

    2,"lisi","Math",99

    2,"lisi","English",92

    第一列表示学生ID,第二列表示学生姓名,第三列表示学科,第四列表示成绩。

    3.自定义UDAF

    FlinkSQL自定义UDAF函数对学生考试成绩进行聚合操作的具体代码如下所示。

    public class FlinkAggFunction {

    public static void main(String[] args) {

    //1.获取stream的执行环境

    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    senv.setParallelism(1);

    //2.创建表执行环境

    StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);

    //3.数据源

    Table table = tEnv.fromValues(

    DataTypes.ROW(

    DataTypes.FIELD("id",DataTypes.DECIMAL(10,2)),

    DataTypes.FIELD("name",DataTypes.STRING()),

    DataTypes.FIELD("course",DataTypes.STRING()),

    DataTypes.FIELD("score",DataTypes.DOUBLE())

    ),

    row(1,"zhangsan","Chinese",90),

    row(1,"zhangsan","Math",74),

    row(1,"zhangsan","English",100),

    row(2,"lisi","Chinese",86),

    row(2,"lisi","Math",99),

    row(2,"lisi","English",92)

    ).select($("id"),$("name"),$("course"),$("score"));

    tEnv.createTemporaryView("student",table);

    //4.1调用方式1 table api(未注册函数)

    tEnv.from("student")

    .groupBy($("course"))

    .select($("course"),call(AvgFunction.class,$("score").as("avg_score")))

    .execute().print();

    //4.2调用方式2table api(注册函数)

    tEnv.createTemporarySystemFunction("AvgFunction",AvgFunction.class);

    tEnv.from("student")

    .groupBy($("course"))

    .select($("course"),call("AvgFunction",$("score").as("avg_score")))

    .execute().print();

    //4.3调用方式3 sql(注册函数)

    tEnv.sqlQuery("select course,AvgFunction(score) as avg_score from student group by course")

    .execute().print();

    }

    //可变累加器的数据结构

    @Data

    @NoArgsConstructor

    @AllArgsConstructor

    public static class AvgAccumulator{

    public double sum = 0.0;

    public int count = 0;

    }

    //自定义UDAF

    public static class AvgFunction extends AggregateFunction<Double,AvgAccumulator>{

    //获取累加器的值

    @Override

    public Double getValue(AvgAccumulator avgAccumulator) {

    if(avgAccumulator.count==0){

    return null;

    }else {

    return avgAccumulator.sum/avgAccumulator.count;

    }

    }

    //初始化累加器

    @Override

    public AvgAccumulator createAccumulator() {

    return new AvgAccumulator();

    }

    //迭代累加

    public void accumulate(AvgAccumulator acc,Double score){

    acc.setSum(acc.sum+score);

    acc.setCount(acc.count+1);

    }

    }

    }

    4.运行结果

    FlinkSQL自定义UDAF函数之后,使用注册的AvgFunction函数对学生考试成绩聚合之后的效果如下所示。

  • 相关阅读:
    opencv python debug记录
    C专家编程 第10章 再论指针 10.8 轻松一下---程序检验的限制
    揭秘计算机内部通信:探秘数据、地址与控制信号的奥秘
    常用中间件-OAuth2
    低版本客户端连接oracle12c报错ORA-28040: No matching authentication protocol
    jQuery笔记
    java毕业生设计在线考试系统演示录像2021计算机源码+系统+mysql+调试部署+lw
    Kafka重平衡导致无限循环消费问题
    Linux--基础IO
    一文详解企业数据分类分级的推进路径
  • 原文地址:https://blog.csdn.net/dajiangtai007/article/details/125500080