• 大数据(9e)图解Flink窗口


    1、代码模板

    本地开发环境:WIN10+IDEA
    只改##################### 业务逻辑 #####################之间的代码

    1.1、pom.xml

    
    <properties>
        <maven.compiler.source>8maven.compiler.source>
        <maven.compiler.target>8maven.compiler.target>
        <flink.version>1.14.6flink.version>
        <scala.binary.version>2.12scala.binary.version>
        <slf4j.version>2.0.3slf4j.version>
        <log4j.version>2.17.2log4j.version>
    properties>
    
    <dependencies>
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-javaartifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-java_${scala.binary.version}artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-clients_${scala.binary.version}artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-runtime-web_${scala.binary.version}artifactId>
            <version>${flink.version}version>
        dependency>
        
        <dependency>
            <groupId>org.slf4jgroupId>
            <artifactId>slf4j-apiartifactId>
            <version>${slf4j.version}version>
        dependency>
        <dependency>
            <groupId>org.slf4jgroupId>
            <artifactId>slf4j-log4j12artifactId>
            <version>${slf4j.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.logging.log4jgroupId>
            <artifactId>log4j-to-slf4jartifactId>
            <version>${log4j.version}version>
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    1.2、log4j.properties

    log4j.rootLogger=error, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
    
    • 1
    • 2
    • 3
    • 4

    1.3、Java模板

    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import java.util.Scanner;
    
    public class Hello {
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //设置并行度
            env.setParallelism(1);
            //加入自定义数据源
            DataStreamSource<String> dss = env.addSource(new MySource());
            //################################### 业务逻辑 ########################################
            dss.print();
            //################################### 业务逻辑 ########################################
            env.execute();
        }
    
        public static class MySource implements SourceFunction<String> {
            public MySource() {}
    
            @Override
            public void run(SourceContext<String> sc) {
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    String str = scanner.nextLine().trim();
                    if (str.equals("STOP")) {break;}
                    if (!str.equals("")) {sc.collect(str);}
                }
                scanner.close();
            }
    
            @Override
            public void cancel() {}
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    2、按键分区(Keyed)、非按键分区(Non-Keyed)

    • 按键分区窗口 可在keyBy后执行
    • 非按键分区窗口 流的并行度=1
    语法示例KeyedNon-Keyed
    基于时间的窗口.keyBy(...).window(...).windowAll(...)
    基于事件个数的窗口.keyBy(...).countWindow(...).countWindowAll(...)

    3、窗口的分类

    • 将 无界限的 数据 切分为 有界限的 数据
    • https://yellow520.blog.csdn.net/article/details/121288240

    3.1、基于时间的窗口

    基于时间滑动窗口

    .window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(3)))
    
    • 1

    基于时间滚动窗口

    .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
    
    • 1

    基于时间会话窗口

    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))
    
    • 1

    基于时间的全局窗口

    .window(GlobalWindows.create())
    
    • 1

    3.2、基于事件个数的窗口

    基于事件个数滑动窗口

    .countWindow(4,3)
    
    • 1

    基于事件个数滚动窗口

    .countWindow(4)
    
    • 1

    4、窗口函数

    窗口函数窗口关闭时,窗口函数就去处理窗口中的每个元素
    ReduceFunction增量处理,高效
    AggregateFunction增量处理,高效
    ProcessWindowFunction函数执行前要在内部缓存窗口上所有的元素,低效

    修改代码模板中##################### 业务逻辑 #####################之间的代码

    4.1、ReduceFunction

    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    • 1
    • 2
    • 3
    dss.keyBy(s -> s)
       .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
       .reduce((ReduceFunction<String>) (v1, v2) -> v1 + "," + v2)
       .print("输出");
    
    • 1
    • 2
    • 3
    • 4

    基于时间滚动窗口

    4.2、AggregateFunction

    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    • 1
    • 2
    • 3
    dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
       //AggregateFunction
       .aggregate(new AggregateFunction<String, Long, Long>() {
           //创建累加器
           @Override
           public Long createAccumulator() {return 0L;}
           //累加
           @Override
           public Long add(String in, Long acc) {return acc + 1L;}
           //从累加器获取结果
           @Override
           public Long getResult(Long acc) {return acc;}
           //合并累加器
           @Override
           public Long merge(Long a1, Long a2) {return a1 + a2;}
       })
       .print("输出");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    基于时间滑动窗口

    4.3、ProcessWindowFunction

    源码截取

    abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> {
        abstract void process(
            ProcessAllWindowFunction<IN, OUT, W>.Context var1,  //上下文对象
            Iterable<IN> var2,                                  //窗口内的所有输入
            Collector<OUT> var3                                 //收集器
        );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    代码

    import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
       .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
           @Override
           public void process(Context context, Iterable<String> in, Collector<String> out) {
               //打印窗口范围
               System.out.println(context.window().toString());
               //在窗口内,收集元素
               out.collect(String.valueOf(in));
           }
       })
       .print("输出");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    测试运行截图

  • 相关阅读:
    基础 | 并发编程 - [导论 & volatile]
    .NET MVC第七章、jQuery插件验证
    java并发编程之synchronized
    开源—neo4j的知识图谱
    Linux---cpu和核心数目查看
    计算机与操作系统
    L1-015 跟奥巴马一起画方块
    【Git】Git常用操作命令大全(笔记)
    朴素贝叶斯算法
    字节青训营 浅尝Type Script
  • 原文地址:https://blog.csdn.net/Yellow_python/article/details/127951690