• Flink时间窗口语义


    内容学习于尚硅谷 方便自己熟悉

    WarterMarker

    特点

    • 水位线是插入到数据流的一个标记,可以认为是一个特殊的数据
    • 水位线主要的内容是一个时间戳,用来表示当前事件的时间的进展
      水位线是基于数据的时间戳生成的
      水位线的时间戳必须单调递增的,以确保任务的事件时间时钟一直向前推进
      水位线可以通过设置延迟,来保证正确处理乱序数据
      一个水位线 WarterMarker(t),表示在当前流中的时间时间已经到达了时间戳t,这代表了t之前的所有数据都到齐了,之后流中不会出现时间戳t‘<=t的数据
    
    		 // 有序数据 watermark
            orderDS.assignTimestampsAndWatermarks(WatermarkStrategy
                    .<CategoryPojo>forMonotonousTimestamps().withTimestampAssigner((categoryPojo, l) -> categoryPojo.getDateTime()));
            // 无序数据 wartermarker
            orderDS.assignTimestampsAndWatermarks(WatermarkStrategy
                    .<CategoryPojo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((categoryPojo, l) -> categoryPojo.getDateTime()))
            tempAggResult.keyBy(CategoryPojo::getDateTime)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                    .process(new FinalResultWindowProcess());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    自定义水位线策略

    Flink有两种有两种不同生成水位线的方式:一种是周期性(Periodic),另一种是断点式(Punctuated)

    WaterMarker接口中有两种方法?
    onEvent()和onPeriodicEmit()前者是在每个事件到来时调用,后者有框架周期性调用。
    周期性调用的方式发出水位线,自然是周期性生成水位线;
    而在事件触发的方法发出水位线,自然是断点式生成

    周期性水位线生成器(Periodic Generator)

    周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出的水位线

    public class ClickSource implements SourceFunction<Event> {
        // 声明一个布尔变量,作为控制数据生成的标识位
        private Boolean running = true;
    
        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            Random random = new Random(); // 在指定的数据集中随机选取数据
            String[] users = {"Mary", "Alice", "Bob", "Cary"};
            String[] urls = {"./home", "./cart", "./fav", "./prod?id=1",
                    "./prod?id=2"};
            while (running) {
                ctx.collect(new Event(
                        users[random.nextInt(users.length)],
                        urls[random.nextInt(urls.length)],
                        Calendar.getInstance().getTimeInMillis()
                ));
                // 隔 1 秒生成一个点击事件,方便观测
                Thread.sleep(100);
            }
        }
    
        @Override
        public void cancel() {
            running = false;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    
    /**
     * Event,有这样几个特点
     *
     * ⚫ 类是公有(public)的
     * ⚫ 有一个无参的构造方法
     * ⚫ 所有属性都是公有(public)的
     * ⚫ 所有属性的类型都是可以序列化的
     */
    public class Event {
        public String user;
        public String url;
        public Long timestamp;
    
        public Event() {
        }
    
        public Event(String user, String url, Long timestamp) {
            this.user = user;
            this.url = url;
            this.timestamp = timestamp;
        }
    
        @Override
        public String toString() {
            return "Event{" +
                    "user='" + user + '\'' +
                    ", url='" + url + '\'' +
                    ", timestamp=" + timestamp +
                    '}';
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    // 自定义水位线的产生
    public class CustomWatermarkTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env
                    .addSource(new ClickSource())
                    .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
                    .print();
            env.execute();
        }
    
        public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
            @Override
            public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
                    }
                };
            }
    
            @Override
            public WatermarkGenerator<Event>
            createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new CustomPeriodicGenerator();
            }
        }
    
        public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
            private Long delayTime = 5000L; // 延迟时间
            private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
    
            @Override
            public void onEvent(Event event, long eventTimestamp, WatermarkOutput
                    output) {
                // 每来一条数据就调用一次
                maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
            }
    
            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                // 发射水位线,默认 200ms 调用一次
                output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    onPeriodicEmit()里调用output.emitWatemark(),就可以发出水位线了;这个方法有系统框架周期性测试,默认200ms一次。所以水位线的时间戳是依赖当前已有数据的最大时间戳(这里的实现与内置生成器类似,也是减去延迟时间再减1),但是具体什么时间生成与数据无关

    断点式水位线生成器(Punctuared Generator)

    断点式生成水位线会不停的检测onEvent中的事件,当发现带有水位线信息的特殊事件时就立即发出水位线。一般来说,断点式生成不会通过onPeriodicEmit()发出水位线

     public static class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
            @Override
            public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
                // 只有在遇到特定的 itemId 时,才发出水位线
                if (r.user.equals("Mary")) {
                    output.emitWatermark(new Watermark(r.timestamp - 1));
                }
            }
            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    我们在 onEvent()中判断当前事件的 user 字段,只有遇到“Mary”这个特殊的值时,才调用
    output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一
    定在某个数据到来之后。

    在自定义数据源中发送水位线

    我们可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了 在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线只能二选一

    public class EmitWatermarkInSourceFunction {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.addSource(new ClickSourceWithWatermark()).print();
            env.execute();
        }
    
        // 泛型是数据源中的类型
        public static class ClickSourceWithWatermark implements SourceFunction<Event> {
            private boolean running = true;
    
            @Override
    
            public void run(SourceContext<Event> sourceContext) throws Exception {
                Random random = new Random();
                String[] userArr = {"Mary", "Bob", "Alice"};
                String[] urlArr = {"./home", "./cart", "./prod?id=1"};
                while (running) {
                    long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
                    String username = userArr[random.nextInt(userArr.length)];
                    String url = urlArr[random.nextInt(urlArr.length)];
                    Event event = new Event(username, url, currTs);
                    // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段
                    sourceContext.collectWithTimestamp(event, event.timestamp);
                    // 发送水位线
                    sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
                    Thread.sleep(1000L);
                }
            }
    
            @Override
            public void cancel() {
                running = false;
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    在自定义水位线生成与代码中调用assignTimestampsAndWatermarks方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全有我们自定义。所以非常适合来编写Flink的测试程序,测试Flink的各种各样的特性

    水位线的传递

    我们知道水位线是数据流中插入的一个标记,用来表示事件时间的进展,它会随着数据一起在任务传递。如果只是直通式的传输,那很简单,数据和水位线都是按照本身的顺序依次传递,依次处理的;一旦水位线达到了算子任务,那么这个任务就会将它内部的时钟设为这个水位线的时间戳
    在这里,“任务的时钟”其实仍然是各自为政的,并没有统一的时钟。实际应用中往往上下游都有多个并行子任务,为了统一推进水位线再次发出,广播给所有的下游子任务。这样,后续任务就不需要依赖元数据中的时间戳(讲过转化处理后,数据可能已经改变了),也可以知道当前事件时间了
    可是还有另外一个问题,那就是在 重分区 的模式下,一个任务有可能会受到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,所以统一时刻发给下游任务的水位线可能并不相同。这是下游任务又该听谁的?
    这就要回到水位线定义的本质了:他表示“当前事件之前的数据,都已经到到齐了”。这是一种保证,告诉下有任务“只有您接到这个水位线,就代表之后我不会再给你发更早的数据了,您可以放行做统计计算而不会遗漏数据”。所以如果一个任务收到了来自上游并行任务的不同水位线,说明 上游各个分区处理的有快有慢,进度各不相同,比如上游有两个并行子任务发来了水位线,一个是5秒,一个是7秒;这代表第一个并行任务已经处理完5秒之前的所有数据,而第二个并行任务处理到了7秒。那这时自己的时钟怎么确定?当然也要以“这个之前所有的数据全部到齐”为标准。如果我们以较大的水位线7秒作为当前事件,那就表示“7秒签的数据已经处理完”,这显然不是事实–第一个上游分区才处理到5秒,5-7秒的数据还会不停的发来;而如果以最小的水位线5秒作为当前时钟就不会有这个问题,因为确实所有上游分区都已经处理完,不会再发5秒前的数据。这让我们联想到“木桶原理”:所有的上游并行任务就像未成木桶的一块块木板,他们中最短的那一块,决定了我们桶中的水位请添加图片描述
    我们可以用一具体的例子,将水位线在任务间传递的过程完整梳理一遍。
    如上图,当前任务的上游,有4个并行子任务,所以会接受的来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。
    (1)上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线”(Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个
    (2)当有一个新的水位线(第一分区的4)从上游传来是,当前任务会首先更新对应的分区时钟;然后再次判断分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟应该更新。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。
    (3)再次接收到新的水位线(第二分区7)后,执行同样的处理流程。首先将第二个分区时钟更新为7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不改变,也不会向下游任务发出水位线
    (4)同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间戳为 4 的水位线,广播到下游各个分区任务。

    水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。对于有多条流合并之后进行处理的场景,水位线传递的规则是类似的

    水位线总结

    水位线在事件时间的世界里,承担了时钟的角色。也就是说在事件时间的流中,水位线是唯一的时间尺度。如果想要知道现在几点,就要看水位线大小。后面讲到的窗口闭合,以及定时器的触发多要通过判断水位线的大小来决定是否触发

    水位线是一种特殊的事件,有程序员通过编程插入的数据流里面,然后跟随数据流向下游流动
    
    • 1

    水位线的默认计算公式: 水位线 = 观察到的最大事件时间 - 最大延迟时间 - 1毫秒

    所以这里涉及到一个问题,就是不同的算子看到的水位线的大小可能是不一样的。因为下
    游的算子可能并未接收到来自上游算子的水位线,导致下游算子的时钟要落后于上游算子的时
    钟。比如 map->reduce 这样的操作,如果在 map 中编写了非常耗时间的代码,将会阻塞水位
    线的向下传播,因为水位线也是数据流中的一个事件,位于水位线前面的数据如果没有处理完
    毕,那么水位线不可能弯道超车绕过前面的数据向下游传播,也就是说会被前面的数据阻塞。
    这样就会影响到下游算子的聚合计算,因为下游算子中无论由窗口聚合还是定时器的操作,都
    需要水位线才能触发执行。这也就告诉了我们,在编写 Flink 程序时,一定要谨慎的编写每一
    个算子的计算逻辑,尽量避免大量计算或者是大量的 IO 操作,这样才不会阻塞水位线的向下
    传递

    在数据流开始之前,Flink 会插入一个大小是负无穷大(在 Java 中是-Long.MAX_VALUE)
    的水位线,而在数据流结束时,Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线,保
    证所有的窗口闭合以及所有的定时器都被触发。
    对于离线数据集,Flink 也会将其作为流读入,也就是一条数据一条数据的读取。在这种
    情况下,Flink 对于离线数据集,只会插入两次水位线,也就是在最开始处插入负无穷大的水
    位线,在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线,就可以保证计算
    的正确,无需在数据流的中间插入水位线了。
    水位线的重要性在于它的逻辑时钟特性,而逻辑时钟这个概念可以说是分布式系统里面最
    为重要的概念之一了,理解透彻了对理解各种分布式系统非常有帮助。

    窗口(Window)

    我们已经了解了Flink中的事件事件和水位线的概念,那他们的具体应用?当然是做基于时间的处理计算。最常见的场景窗口聚合计算

    之前我们已经了解了 Flink 中基本的聚合操作。在流处理中,我们往往需要面对的是连续
    不断、无休无止的无界流,不可能等到所有所有数据都到齐了才开始处理。所以聚合计算其实
    只能针对当前已有的数据——之后再有数据到来,就需要继续叠加、再次输出结果。这样似乎
    很“实时”,但现实中大量数据一般会同时到来,需要并行处理,这样频繁地更新结果就会给
    系统带来很大负担了。

    更加高效的做法是,把无界流进行切分,每一段数据分别进行聚合,结果只输出一次。这
    就相当于将无界流的聚合转化为了有界数据集的聚合,这就是所谓的“窗口”(Window)聚合
    操作。窗口聚合其实是对实时性和处理效率的一个权衡。在实际应用中,我们往往更关心一段
    时间内数据的统计结果,比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下,我们
    就可以定义一个窗口,收集最近一分钟内的所有用户点击数据,然后进行聚合统计,最终输出
    一个结果就可以了。

    窗口的概念

    Flink是一种流失计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效的处理无界流,一种方式就是将无线数据切割成有限的 “数据块” 进行处理,这就是所谓的窗口

    在Flink中,窗口就是用来处理无界流的核心。我们很容易把窗口想象成一个固定位置的“框”,数据源源不断地流过来,到某个时间点,窗口该关闭了,就停止收集数据、触发计算并输出结果。例如,我们定义一个时间窗口,每10秒统计一次记录,那么就相当于把窗口放在那
    里,从 0 秒开始收集数据;到 10 秒时,处理当前窗口内所有数据,输出一个结果,然后清空
    窗口继续收集数据;到 20 秒时,再对窗口内所有数据进行计算处理,输出结果;依次类推请添加图片描述
    这里注意为了明确数据划分到哪一个窗口,定义窗口都是包含起始时间、不包含结束时间的,用数学符号表示就是一个左闭右开的区间,例如 0~10 秒的窗口可以表示为[0, 10),这里单位为秒。
    对于处理时间下的窗口而言,这样理解似乎没什么问题。因为窗口的关闭是基于系统时间的,赶不上这班车的数据,就只能坐下一班车了——正如上图中,0~10 秒的窗口关闭后,可能还有时间戳为 9 的数据会来,它就只能进入 10~20 秒的窗口了。这样会造成窗口处理结果的不准确。

    然而如果我们采用事件时间语义,就会有些费解了。由于有乱序数据,我们需要设置一个延迟时间来等所有数据到齐。比如上面的例子中,我们可以设置延迟时间为 2 秒,如图 6-14所示,这样 0~10 秒的窗口会在时间戳为 12 的数据到来之后,才真正关闭计算输出结果,这样就可以正常包含迟到的 9 秒数据了。请添加图片描述
    但是这样一来,0~10 秒的窗口不光包含了迟到的 9 秒数据,连 11 秒和 12 秒的数据也包含进去了。我们为了正确处理迟到数据,结果把早到的数据划分到了错误的窗口——最终结果都是错误的

    所以在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗
    口。相比之下,我们应该把窗口理解成一个“桶”,如图 6-15 所示。在 Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。请添加图片描述
    我们可以梳理一下事件时间语义下,之前例子中窗口的处理过程:
    (1)第一个数据时间戳为 2,判断之后创建第一个窗口[0, 10),并将 2 秒数据保存进去;
    (2)后续数据依次到来,时间戳均在 [0, 10)范围内,所以全部保存进第一个窗口;
    (3)11 秒数据到来,判断它不属于[0, 10)窗口,所以创建第二个窗口[10, 20),并将 11秒的数据保存进去。由于水位线设置延迟时间为 2 秒,所以现在的时钟是 9 秒,第一个窗口也没有到关闭时间;
    (4)之后又有 9 秒数据到来,同样进入[0, 10)窗口中;
    (5)12 秒数据到来,判断属于[10, 20)窗口,保存进去。这时产生的水位线推进到了 10秒,所以 [0, 10)窗口应该关闭了。第一个窗口收集到了所有的 7 个数据,进行处理计算后输出结果,并将窗口关闭销毁;
    (6)同样的,之后的数据依次进入第二个窗口,遇到 20 秒的数据时会创建第三个窗口[20, 30)并将数据保存进去;遇到 22 秒数据时,水位线达到了 20 秒,第二个窗口触发计算,输出结果并关闭。
    这里需要注意的是,Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,

    窗口的分类

    1. 按照驱动类型
      时间驱动和数据量驱动
      请添加图片描述
    • 时间窗口(Time Window)
      用结束时间减去开始时间,得到这段时间的长度,就是窗口的大小(window size)

      这里的时间可以是不同的语义,所以我们可以定义处理时间窗口和事件时间窗口。

    • 计数窗口(Count Window)
      计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。这相当于座位有限、“人满就发车”,是否发车与时间无关。每个窗口截取数据的个数,就是窗口的大小。
      计数窗口相比时间窗口就更加简单,我们只需指定窗口大小,就可以把数据分配到对应的窗口中了。在 Flink 内部也并没有对应的类来表示计数窗口,底层是通过“全局窗口”(Global Window)来实现的

    1. 按照窗口分配数据的规则分类
      时间窗口和计数窗口,只是对窗口的一个大致划分;在具体应用时,还需要定义更加精细的规则,来控制数据应该划分到哪个窗口中去。不同的分配数据的方式,就可以有不同的功能应用。
      根据分配数据的规则,窗口的具体实现可以分为 4 类:
      滚动窗口(Tumbling Window)、
      滑动窗口(Sliding Window)、
      会话窗口(Session Window),
      以及全局窗口(Global Window)。
      下面我们来做具体介绍。
    • 滚动窗口(Tumbling Windows)请添加图片描述

    • 滑动窗口
      请添加图片描述

    • 会话窗口
      对于会话窗口而言,最重要的参数就是这段时间的长度(size),它表示会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果 gap 大于 size,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。在具体实现上,我们可以设置静态固定的大小(size),也可以通过一个自定义的提取器(gap extractor)动态提取最小间隔 gap 的值。请添加图片描述

    • 全局窗口
      会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以
      这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,
      还需要自定义“触发器”(Trigger)。

    窗口 API 概览

    1. 按键分区(Keyed)和非按键分区(Non-Keyed)
      在调用窗口算子之前,是否有 keyBy 操作。
      (1)按键分区窗口(Keyed Windows)
      相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。
      (2)非按键分区
      如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。
      在代码中,直接基于 DataStream 调用.windowAll()定义窗口。
      stream.windowAll(…)
      这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作。
    2. 代码中窗口API调用
      有了前置的基础,接下来我们就可以真正在代码中实现一个窗口操作了。简单来说,窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

    stream.keyBy()
    .window()
    .aggregate()

    窗口分配器(Window Assigners)

    窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个
    WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。

    • 时间窗口
      Flink1.12 以后 默认就是事件时间;如果想用处理时间,那么在这里传入处理时间的窗口分配器就可以了
      (1)滚动处理时间窗口
      窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()

      stream.keyBy(…)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .aggregate(…)

    另外,.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和 offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。这里需要多做一些解释:对于我们之前的定义,滚动窗口其实只有一个 size 是不能唯一确定的。
    比如我们定义 1 天的滚动
    窗口,从每天的 0 点开始计时是可以的,统计的就是一个自然日的所有数据;而如果从每天的凌晨 2 点开始计时其实也完全没问题,只不过统计的数据变成了每天 2 点到第二天 2 点。这个起始点的选取,其实对窗口本身的类型没有影响;而为了方便应用,默认的起始点时间戳是窗口大小的整倍数。也就是说,如果我们定义 1 天的窗口,默认就从 0 点开始;如果定义 1 小时的窗口,默认就从整点开始。而如果我们非要不从这个默认值开始,那就可以通过设置偏移量offset 来调整。
    那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了:

    .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))

    (2)滑动处理时间窗口
    窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。

    stream.keyBy(…)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .aggregate(…)

    这里.of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。

    滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

    (3)处理时间会话窗口
    窗口分配器由类 ProcessingTimeSessionWindows 提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。

    stream.keyBy(…)
    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
    .aggregate(…)

    这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。我们这里创建了静态会话超时时间为 10 秒的会话窗口。

    .window(ProcessingTimeSessionWindows.withDynamicGap(new
    SessionWindowTimeGapExtractor>() {
    @Override
    public long extract(Tuple2 element) {
    // 提取 session gap 值返回, 单位毫秒
    return element.f0.length() * 1000;
    }
    }))

    这里.withDynamicGap()方法需要传入一个 SessionWindowTimeGapExtractor 作为参数,用来定义 session gap 的动态提取逻辑。在这里,我们提取了数据元素的第一个字段,用它的长度乘以 1000 作为会话超时的间隔。
    (4)滚动事件时间窗口

    stream.keyBy(…)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .aggregate(…)

    (5)滑动事件时间窗口

    stream.keyBy(…)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .aggregate(…)

    (6)事件时间会话窗口

    stream.keyBy(…)
    .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
    .aggregate(…)

    • 计数窗口
      计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink 为我们提供了非常方便的接口:直接调用.countWindow()方法。根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类,下面我们就来看它们的具体实现
      (1)滚动计数窗口
      滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。

    stream.keyBy(…)
    .countWindow(10)

    我们定义了一个长度为 10 的滚动计数窗口,当窗口中元素数量达到 10 的时候,就会触发
    计算执行并关闭窗口。
    (2)滑动计数窗口
    与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size 和 slide,前者表示窗口大小,后者表示滑动步长。

    stream.keyBy(…)
    .countWindow(10,3)

    我们定义了一个长度为 10、滑动步长为 3 的滑动计数窗口。每个窗口统计 10 个数据,每
    隔 3 个数据就统计输出一次结果。

    • 全局窗口
      全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调
      用.window(),分配器由 GlobalWindows 类提供。

      stream.keyBy(…)
      .window(GlobalWindows.create());

    需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

    窗口函数(Window Functions)

    定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。

    经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream请添加图片描述
    窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。

    1. 增量聚合函数(incremental aggregation functions)
      窗口将数据收集起来,最基本的处理操作当然就是进行聚合。这相当于真的在用批处理的思路来做实时流处理。
      为了提高实时性,我们可以再次将流处理的思路发扬光大:就像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。
      典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction。
      (1)归约函数(ReduceFunction)
      就是将窗口中收集到的数据两两进行归约。当我们进行流处
      理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。

    窗口函数中也提供了 ReduceFunction:只要基于 WindowedStream 调用.reduce()方法,然后传入 ReduceFunction 作为参数,就可以指定以归约两个元素的方式去对窗口中数据进行聚合了。这里的 ReduceFunction 其实与简单聚合时用到的 ReduceFunction 是同一个函数类接口,所以使用方式也是完全一样的。

    public class WindowReduceExample {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            // 从自定义数据源读取数据,并提取时间戳、生成水位线
            SingleOutputStreamOperator<Event> stream = env.addSource(new
                            ClickSource())
                    .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                                    .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                        @Override
                                        public long extractTimestamp(Event element, long recordTimestamp) {
                                            return element.timestamp;
                                        }
                                    }));
            stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> map(Event value) throws Exception {
                            // 将数据转换成二元组,方便计算
                            return Tuple2.of(value.user, 1L);
                        }
                    })
                    .keyBy(r -> r.f0)
                    // 设置滚动事件时间窗口
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> reduce(Tuple2<String, Long> value1,
                                                           Tuple2<String, Long> value2) throws Exception {
                            // 定义累加规则,窗口闭合时,向下游发送累加结果
                            return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                        }
                    })
                    .print();
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    我们用 ReduceFunction 对 count 值做了增量聚合:窗口中会将当前的总 count
    值保存成一个归约状态,每来一条数据,就会调用内部的 reduce 方法,将新数据中的 count值叠加到状态上,并得到新的状态保存起来。等到了 5 秒窗口的结束时间,就把归约好的状态直接输出。
    (2)聚合函数(AggregateFunction)

    **ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。**这就迫使我们必须在聚合前,先将数据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使ReduceFunction 就会非常麻烦

    例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?很明显,这时我们需要
    计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用 ReduceFunction,那么我们应该先把数据转换成二元组(sum, count)的形式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效

    Flink 的 Window API 中的 aggregate 就提供了这样的操作。直接基于 WindowedStream 调用.aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction 的实现类作为参数。
    源码:

    public interface AggregateFunction extends Function, Serializable
    {
    ACC createAccumulator();
    ACC add(IN value, ACC accumulator);
    OUT getResult(ACC accumulator);
    155
    ACC merge(ACC a, ACC b);
    }

    AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。
    接口中有四个方法:
    ⚫ createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚
    合任务只会调用一次。
    ⚫ add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进
    一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器
    accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之
    后都会调用这个方法。
    ⚫ getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,
    然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均
    值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终
    结果。这个方法只在窗口要输出结果时调用。
    ⚫ merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在
    需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景
    就是会话窗口(Session Windows)。

    所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便

    下面来看一个具体例子。我们知道,在电商网站中,PV(页面浏览量)和 UV(独立访客数)是非常重要的两个流量指标。一般来说,PV 统计的是所有的点击量;而对用户 id 进行去重之后,得到的就是 UV。所以有时我们会用 PV/UV 这个比值,来表示人均重复访问量,也就是平均每个用户会访问多少次页面,这在一定程度上代表了用户的粘度。

    public class WindowAggregateFunctionExample {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            SingleOutputStreamOperator<Event> stream = env.addSource(new
                            ClickSource())
                    .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                @Override
                                public long extractTimestamp(Event element, long recordTimestamp) {
                                    return element.timestamp;
                                }
                            }));
            // 所有数据设置相同的 key,发送到同一个分区统计 PV 和 UV,再相除
            stream.keyBy(data -> true)
                    .window(SlidingEventTimeWindows.of(Time.seconds(10),
                            Time.seconds(2)))
                    .aggregate(new AvgPv())
                    .print();
            env.execute();
        }
    
        public static class AvgPv implements AggregateFunction<Event,
                Tuple2<HashSet<String>, Long>, Double> {
            @Override
            public Tuple2<HashSet<String>, Long> createAccumulator() {
                // 创建累加器
                return Tuple2.of(new HashSet<String>(), 0L);
            }
    
            @Override
            public Tuple2<HashSet<String>, Long> add(Event value,
                                                     Tuple2<HashSet<String>, Long> accumulator) {
                // 属于本窗口的数据来一条累加一次,并返回累加器
                accumulator.f0.add(value.user);
                return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);
            }
    
            @Override
            public Double getResult(Tuple2<HashSet<String>, Long> accumulator) {
                // 窗口闭合时,增量聚合结束,将计算结果发送到下游
                return (double) accumulator.f1 / accumulator.f0.size();
            }
    
            @Override
            public Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long>
                                                               a, Tuple2<HashSet<String>, Long> b) {
                return null;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    代码中我们创建了事件时间滑动窗口,统计 10 秒钟的“人均 PV”,每 2 秒统计一次。由于聚合的状态还需要做处理计算,因此窗口聚合时使用了更加灵活的 AggregateFunction。为了统计 UV,我们用一个 HashSet 保存所有出现过的用户 id,实现自动去重;而 PV 的统计则类似一个计数器,每来一个数据加一就可以了。所以这里的状态,定义为包含一个 HashSet 和一个 count 值的二元组(Tuple2),每来一条数据,就将 user 存入 HashSet,
    同时 count 加 1。这里的 count 就是 PV,而 HashSet 中元素的个数(size)就是 UV;所以最终窗口的输出结果,就是它们的比值。
    这里没有涉及会话窗口,所以 merge()方法可以不做任何操作。

    另外,Flink 也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于
    WindowedStream 调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与 KeyedStream 的简单
    聚合非常相似。它们的底层,其实都是通过 AggregateFunction 来实现的。

    通过 ReduceFunction 和 AggregateFunction 我们可以发现,增量聚合函数其实就是在用流处理的思路来处理有界数据集,核心是保持一个聚合状态,当数据到来时不停地更新状态。这
    就是 Flink 所谓的“有状态的流处理”,通过这种方式可以极大地提高程序运行的效率,所以在实际应用中最为常见
    2. 全窗口函数(full window functions)
    窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算

    很明显,这就是典型的批处理思路了——先攒数据,等一批都到齐了再正式启动处理流程。
    这样做毫无疑问是低效的:因为窗口全部的计算任务都积压在了要输出结果的那一瞬间,而在之前收集数据的漫长过程中却无所事事。这就好比平时不用功,到考试之前通宵抱佛脚,肯定不如把工夫花在日常积累上。

    那为什么还需要有全窗口函数呢?这是因为有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。所以,我们还需要有更丰富的窗口计算方式,这就可以用全窗口函数来实现

    在 Flink 中,全窗口函数也有两种:WindowFunction 和 ProcessWindowFunction。
    (1)窗口函数(WindowFunction)
    WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

    stream
    .keyBy()
    .window()
    .apply(new MyWindowFunction());

    这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口
    (Window)本身的信息。WindowFunction 接口在源码中实现如下:

    public interface WindowFunction extends Function,
    Serializable {
    void apply(KEY key, W window, Iterable input, Collector out) throws
    Exception;
    }

    当窗口到达结束时间需要触发计算时,就会调用这里的 apply 方法。我们可以从 input 集
    合中取出窗口收集的数据,结合 key 和 window 信息,通过收集器(Collector)输出结果。这
    里 Collector 的用法,与 FlatMapFunction 中相同。
    弃用边缘 ,直接使用 ProcessWindowFunction 就可以了。
    (2)处理窗口函数(ProcessWindowFunction)

    ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,
    ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员,关于处
    理函数我们会在后续章节展开讲解。

    当 然 , 这些好处是以牺牲性能和资源为代价的 。 作 为 一 个 全 窗 口 函 数 ,ProcessWindowFunction 同样需要将所有数据缓存下来、等到窗口触发计算时才使用。它其实
    就是一个增强版的 WindowFunction。

    public class UvCountByWindowExample {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            SingleOutputStreamOperator<Event> stream = env.addSource(new
                            ClickSource())
                    .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                                    .withTimestampAssigner(new
                                                                   SerializableTimestampAssigner<Event>() {
                                                                       @Override
                                                                       public long extractTimestamp(Event element, long
                                                                               recordTimestamp) {
                                                                           return element.timestamp;
                                                                       }
                                                                   }));
            // 将数据全部发往同一分区,按窗口统计 UV
            stream.keyBy(data -> true)
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .process(new UvCountByWindow())
                    .print();
            env.execute();
        }
    
        // 自定义窗口处理函数
        public static class UvCountByWindow extends ProcessWindowFunction<Event,
                    String, Boolean, TimeWindow> {
    
            private SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
            @Override
            public void process(Boolean aBoolean, Context context, Iterable<Event>
                    elements, Collector<String> out) throws Exception {
                HashSet<String> userSet = new HashSet<>();
                // 遍历所有数据,放到 Set 里去重
                for (Event event : elements) {
                    userSet.add(event.user);
                }
                // 结合窗口信息,包装输出内容
                Long start = context.window().getStart();
                Long end = context.window().getEnd();
                out.collect(" 窗 口 : " + sdf.format(start) + " ~ " +
                        sdf.format(end)
                        + " 的独立访客数量是:" + userSet.size());
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    这里我们使用的是事件时间语义。定义 10 秒钟的滚动事件窗口后,直接使用
    ProcessWindowFunction 来定义处理的逻辑。我们可以创建一个 HashSet,将窗口所有数据的userId 写入实现去重,最终得到 HashSet 的元素个数就是 UV 值。

    当 然 , 这 里 我 们 并 没 有 用 到 上 下 文 中 其 他 信 息 , 所 以 其 实 没 有 必 要 使 用ProcessWindowFunction。全窗口函数因为运行效率较低,很少直接单独使用,往往会和增量聚合函数结合在一起,共同实现窗口的处理计算。

    1. 增量聚合和全窗口函数的结合使用
    // ReduceFunction 与 WindowFunction 结合
    public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> 
    reduceFunction, WindowFunction<T, R, K, W> function) 
    // ReduceFunction 与 ProcessWindowFunction 结合
    public <R> SingleOutputStreamOperator<R> reduce(
     ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> 
    function)
    // AggregateFunction 与 WindowFunction 结合
    public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
    
     AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> 
    windowFunction)
    // AggregateFunction 与 ProcessWindowFunction 结合
    public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
     AggregateFunction<T, ACC, V> aggFunction,
     ProcessWindowFunction<V, R, K, W> windowFunction)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。

    下面我们举一个具体的实例来说明。在网站的各种统计指标中,一个很重要的统计指标就是热门的链接;想要得到热门的 url,前提是得到每个链接的“热门度”。一般情况下,可以用url 的浏览量(点击量)表示热门度。我们这里统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次;
    另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果。

    public class UrlViewCountExample {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            SingleOutputStreamOperator<Event> stream = env.addSource(new
                            ClickSource())
                    .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                            .withTimestampAssigner(new
                                                           SerializableTimestampAssigner<Event>() {
                                                               @Override
                                                               public long extractTimestamp(Event element, long
                                                                       recordTimestamp) {
                                                                   return element.timestamp;
                                                               }
                                                           }));
            // 需要按照 url 分组,开滑动窗口统计
            stream.keyBy(data -> data.url)
                    .window(SlidingEventTimeWindows.of(Time.seconds(10),
                            Time.seconds(5)))
                    // 同时传入增量聚合函数和全窗口函数
                    .aggregate(new UrlViewCountAgg(), new UrlViewCountResult())
                    .print();
            env.execute();
        }
    
        // 自定义增量聚合函数,来一条数据就加一
        public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
            @Override
            public Long createAccumulator() {
                return 0L;
            }
    
            @Override
            public Long add(Event value, Long accumulator) {
                return accumulator + 1;
            }
    
            @Override
            public Long getResult(Long accumulator) {
                return accumulator;
            }
    
            @Override
            public Long merge(Long a, Long b) {
                return null;
            }
        }
    
        // 自定义窗口处理函数,只需要包装窗口信息
        public static class UrlViewCountResult extends ProcessWindowFunction<Long,
                UrlViewCount, String, TimeWindow> {
            @Override
            public void process(String url, Context context, Iterable<Long> elements,
                                Collector<UrlViewCount> out) throws Exception {
                // 结合窗口信息,包装输出内容
                Long start = context.window().getStart();
                Long end = context.window().getEnd();
                // 迭代器中只有一个元素,就是增量聚合函数的计算结果
                out.collect(new UrlViewCount(url, elements.iterator().next(), start,
                        end));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    思想:
    1. 水位线设置延迟时间
    2. keyBy分区
    3. 开窗:每5秒钟统计最近10秒的记录
    4. 聚合函数这里我感觉更像分组,当触发了窗口时间之后,分好组的数据进入全窗口函数,统一处理

    代码中用一个 AggregateFunction 来实现增量聚合,每来一个数据就计数加一;得到的结果交给 ProcessWindowFunction,结合窗口信息包装成我们想要的 UrlViewCount,最终输出统计结果。
    注:ProcessWindowFunction 是处理函数中的一种,后面我们会详细讲解。这里只用它来将增量聚合函数的输出结果包裹一层窗口信息。
    窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。

    测试水位线和窗口的使用

    
    public class WatermarkTest {
        /*
    Alice, ./home, 1000
    Alice, ./cart, 2000
    Alice, ./prod?id=100, 10000
    Alice, ./prod?id=200, 8000
    Alice, ./prod?id=300, 15000
         */
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            // 将数据源改为 socket 文本流,并转换成 Event 类型
            env.socketTextStream("hadoop102", 7777)
                    .map(new MapFunction<String, Event>() {
                        @Override
                        public Event map(String value) throws Exception {
                            String[] fields = value.split(",");
                            return new Event(fields[0].trim(), fields[1].trim(),
                                    Long.valueOf(fields[2].trim()));
                        }
                    })
                    // 插入水位线的逻辑
                    .assignTimestampsAndWatermarks(
                            // 针对乱序流插入水位线,延迟时间设置为 5s
    
                            WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                    .withTimestampAssigner(new
                                                                   SerializableTimestampAssigner<Event>() {
                                                                       // 抽取时间戳的逻辑
                                                                       @Override
                                                                       public long extractTimestamp(Event element, long
                                                                               recordTimestamp) {
                                                                           return element.timestamp;
                                                                       }
                                                                   })
                    )
                    // 根据 user 分组,开窗统计
                    .keyBy(data -> data.user)
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .process(new WatermarkTestResult())
                    .print();
            env.execute();
        }
    
        // 自定义处理窗口函数,输出当前的水位线和窗口信息
        public static class WatermarkTestResult extends ProcessWindowFunction<Event,
                    String, String, TimeWindow> {
            @Override
            public void process(String s, Context context, Iterable<Event> elements,
                                Collector<String> out) throws Exception {
                Long start = context.window().getStart();
                Long end = context.window().getEnd();
                Long currentWatermark = context.currentWatermark();
                Long count = elements.spliterator().getExactSizeIfKnown();
                out.collect("窗口" + start + " ~ " + end + "中共有" + count + "个元素, 窗口闭合计算时,水位线处于:" + currentWatermark);
    
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    我们这里设置的最大延迟时间是 5 秒,所以当我们在终端启动 nc 程序,也就是 nc –lk 7777 然后输入如下数据时:

    Alice, ./home, 1000
    Alice, ./cart, 2000
    Alice, ./prod?id=100, 10000
    Alice, ./prod?id=200, 8000
    Alice, ./prod?id=300, 15000

    我们会看到如下结果:

    窗口 0 ~ 10000 中共有 3 个元素,窗口闭合计算时,水位线处于:9999

    我们就会发现,当最后输入[Alice, ./prod?id=300, 15000]时,流中会周期性地(默认 200毫秒)插入一个时间戳为 15000L – 5 * 1000L – 1L = 9999 毫秒的水位线,已经到达了窗口[0,10000)的结束时间,所以会触发窗口的闭合计算。而后面再输入一条[Alice, ./prod?id=200, 9000]时,将不会有任何结果;因为这是一条迟到数据,它所属于的窗口已经触发计算然后销毁了(窗口默认被销毁),所以无法再进入到窗口中,自然也就无法更新计算结果了。窗口中的迟到数据默认会被丢弃,这会导致计算结果不够准确。Flink 提供了有效处理迟到数据的手段,我们会在稍后 详细介绍。

    其他 API

    1. 触发器(Trigger)
      触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
      基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

    stream.keyBy(…)
    .window(…)
    .trigger(new MyTrigger())

    Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTrigger 和 CountTrigger。所以一般情况下是不需要自定义触发器的,不过我们依然有必要了解它的原理。

    Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:
    ⚫ onElement():窗口中每到来一个元素,都会调用这个方法。
    ⚫ onEventTime():当注册的事件时间定时器触发时,将调用这个方法。
    ⚫ onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
    ⚫ clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态

    可以看到,除了 clear()比较像生命周期方法,其他三个方法其实都是对某种事件的响应。
    onElement()是对流中数据元素到来的响应;而另两个则是对时间的响应。这几个方法的参数中都有一个“触发器上下文”(TriggerContext)对象,可以用来注册定时器回调(callback)。这
    里提到的“定时器”(Timer),其实就是我们设定的一个“闹钟”,代表未来某个时间点会执行的事件;当时间进展到设定的值时,就会执行定义好的操作。很明显,对于时间窗口(TimeWindow)而言,就应该是在窗口的结束时间设定了一个定时器,这样到时间就可以触发窗口的计算输出了。关于定时器的内容,我们在后面讲解处理函数(process function)时还会提到。

    上面的前三个方法可以响应事件,那它们又是怎样跟窗口操作联系起来的呢?这就需要了
    解一下它们的返回值。这三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型。
    ⚫ CONTINUE(继续):什么都不做
    ⚫ FIRE(触发):触发计算,输出结果
    ⚫ PURGE(清除):清空窗口中的所有数据,销毁窗口
    ⚫ FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口

    我们可以看到,Trigger 除了可以控制触发计算,还可以定义窗口什么时候关闭(销毁)。上面的四种类型,其实也就是这两个操作交叉配对产生的结果。一般我们会认为,到了窗口的结束时间,那么就会触发计算输出结果,然后关闭窗口——似乎这两个操作应该是同时发生的;但 TriggerResult 的定义告诉我们,两者可以分开。

    下面我们举一个例子。在日常业务场景中,我们经常会开比较大的窗口来计算每个窗口的pv 或者 uv 等数据。但窗口开的太大,会使我们看到计算结果的时间间隔变长。所以我们可以使用触发器,来隔一段时间触发一次窗口计算。我们在代码中计算了每个 url 在 10 秒滚动窗口的 pv 指标,然后设置了触发器,每隔 1 秒钟触发一次窗口的计算。

    
    public class TriggerExample {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env
                    .addSource(new ClickSource())
                    .assignTimestampsAndWatermarks(
                            WatermarkStrategy.<Event>forMonotonousTimestamps()
                                    .withTimestampAssigner(new
                                                                   SerializableTimestampAssigner<Event>() {
                                                                       @Override
                                                                       public long extractTimestamp(Event event, long l) {
                                                                           return event.timestamp;
                                                                       }
                                                                   })
                    )
                    .keyBy(r -> r.url)
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .trigger(new MyTrigger())
                    .process(new WindowResult())
                    .print();
            env.execute();
        }
    
        public static class WindowResult extends ProcessWindowFunction<Event,
                UrlViewCount, String, TimeWindow> {
            @Override
            public void process(String s, Context context, Iterable<Event> iterable,
                                Collector<UrlViewCount> collector) throws Exception {
                collector.collect(
                        new UrlViewCount(
                                s,
                                // 获取迭代器中的元素个数
                                iterable.spliterator().getExactSizeIfKnown(),
                                context.window().getStart(),
                                context.window().getEnd()
                        )
                );
            }
        }
    
        public static class MyTrigger extends Trigger<Event, TimeWindow> {
            @Override
            public TriggerResult onElement(Event event, long l, TimeWindow timeWindow,
                                           TriggerContext triggerContext) throws Exception {
                ValueState<Boolean> isFirstEvent =
                        triggerContext.getPartitionedState(
                                new ValueStateDescriptor<Boolean>("first-event",
                                        Types.BOOLEAN)
                        );
                if (isFirstEvent.value() == null) {
                    for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i =
                            i + 1000L) {
                        triggerContext.registerEventTimeTimer(i);
                    }
                    isFirstEvent.update(true);
                }
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public TriggerResult onEventTime(long l, TimeWindow timeWindow,
                                             TriggerContext triggerContext) throws Exception {
                return TriggerResult.FIRE;
            }
    
            @Override
            public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
                                                  TriggerContext triggerContext) throws Exception {
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
                    throws Exception {
                ValueState<Boolean> isFirstEvent =
                        triggerContext.getPartitionedState(
                                new ValueStateDescriptor<Boolean>("first-event",
                                        Types.BOOLEAN)
                        );
                isFirstEvent.clear();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    1. 移除器(Evictor)
      移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器

    stream.keyBy(…)
    .window(…)
    .evictor(new MyEvictor())

    Evictor 接口定义了两个方法:
    ⚫ evictBefore():定义执行窗口函数之前的移除数据操作
    ⚫ evictAfter():定义执行窗口函数之后的以处数据操作
    默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的。
    3. 允许延迟(Allowed Lateness)
    在事件时间语义下,窗口中可能会出现数据迟到的情况。这是因为在乱序流中,水位线(watermark)并不一定能保证时间戳更早的所有数据不会再来。当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃。这也很好理解:窗口触发计算就像发车,如果要赶的车已经开走了,又不能坐其他的车(保证分配窗口的正确性),那就只好放弃坐班车了。
    不过在多数情况下,直接丢弃数据也会导致统计结果不准确,我们还是希望该上车的人都能上来。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。
    直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。

    基于 WindowedStream 调用.allowedLateness()方法,传入一个 Time 类型的延迟时间,就可以表示允许这段时间内的延迟数据。

    stream.keyBy(…)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .allowedLateness(Time.minutes(1))

    比如上面的代码中,我们定义了 1 小时的滚动窗口,并设置了允许 1 分钟的延迟数据。也就是说,在不考虑水位线延迟的情况下,对于 8 点~9 点的窗口,本来应该是水位线到达 9 点整就触发计算并关闭窗口;现在允许延迟 1 分钟,那么 9 点整就只是触发一次计算并输出结果,并不会关窗。后续到达的数据,只要属于 8 点~9 点窗口,依然可以在之前统计的基础上继续叠加,并且再次输出一个更新后的结果。直到水位线到达了 9 点零 1 分,这时就真正清空状态、关闭窗口,之后再来的迟到数据就会被丢弃了。
    从这里我们就可以看到,窗口的触发计算(Fire)和清除(Purge)操作确实可以分开。不过在默认情况下,允许的延迟是 0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口,两个操作看起来就是同时发生了。当窗口被清除(关闭)之后,再来的数据就会被丢弃。

    1. 将迟到的数据放入侧输出流
      我们自然会想到,即使可以设置窗口的延迟时间,终归还是有限的,后续的数据还是会被丢弃。如果不想丢弃任何一个数据,又该怎么做呢?
      Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据

    基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。

    DataStream<Event> stream = env.addSource(...);
    OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
    stream.keyBy(...)
     .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .sideOutputLateData(outputTag)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的
    DataStream,调用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的
    流了。

    SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
     .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .sideOutputLateData(outputTag)
    .aggregate(new MyAggregateFunction())
    DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这里注意,getSideOutput()是 SingleOutputStreamOperator 的方法,获取到的侧输出流数据
    类型应该和 OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同

    窗口声明周期函数

    熟悉了窗口 API 的使用,我们再回头梳理一下窗口本身的生命周期,这也是对窗口所有操作的一个总结。

    1. 窗口的创建
      窗口的类型和基本信息由窗口分配器(window assigners)指定,但窗口不会预先创建好,而是由数据驱动创建。当第一个应该属于这个窗口的数据元素到达时,就会创建对应的窗口。

    2. 窗口计算的触发
      除了窗口分配器,每个窗口还会有自己的窗口函数(window functions)和触发器(trigger)。
      窗口函数可以分为增量聚合函数和全窗口函数,主要定义了窗口中计算的逻辑;而触发器则是指定调用窗口函数的条件。对于不同的窗口类型,触发计算的条件也会不同。例如,一个滚动事件时间窗口,应该在水位线到达窗口结束时间的时候触发计算,属于“定点发车”;而一个计数窗口,会在窗口中元素数量达到定义大小时触发计算,属于“人满就发车”。所以 Flink 预定义的窗口类型都有对应内置的触发器。
      对于事件时间窗口而言,除去到达结束时间的“定点发车”,还有另一种情形。当我们设置了允许延迟,那么如果水位线超过了窗口结束时间、但还没有到达设定的最大延迟时间,这期间内到达的迟到数据也会触发窗口计算。这类似于没有准时赶上班车的人又追上了车,这时车要再次停靠、开门,将新的数据整合统计进来。

    3. 窗口的销毁
      一般情况下,当时间达到了结束点,就会直接触发计算输出结果、进而清除状态销毁窗口。
      这时窗口的销毁可以认为和触发计算是同一时刻。这里需要注意,Flink 中只对时间窗口(TimeWindow)有销毁机制;由于计数窗口(CountWindow)是基于全局窗口(GlobalWindw)实现的,而全局窗口不会清除状态,所以就不会被销毁。
      在特殊的场景下,窗口的销毁和触发计算会有所不同。事件时间语义下,如果设置了允许延迟,那么在水位线到达窗口结束时间时,仍然不会销毁窗口;窗口真正被完全删除的时间点,是窗口的结束时间加上用户指定的允许延迟时间。

    4. 窗口 API 调用总结
      到目前为止,我们已经彻底明白了 Flink 中窗口的概念和 Window API 的调用,我们再用一张图做一个完整总结,请添加图片描述

    熟记:

    Window API 首先按照时候按键分区分成两类。keyBy 之后的 KeyedStream,可以调
    用.window()方法声明按键分区窗口(Keyed Windows);而如果不做 keyBy,DataStream 也可以直接调用.windowAll()声明非按键分区窗口。之后的方法调用就完全一样了。
    接下来首先是通过.window()/.windowAll()方法定义窗口分配器,得到 WindowedStream;
    然 后 通 过 各 种 转 换 方 法 ( reduce/aggregate/apply/process ) 给 出 窗 口 函 数
    (ReduceFunction/AggregateFunction/ProcessWindowFunction),定义窗口的具体计算处理逻辑,转换之后重新得到 DataStream。这两者必不可少,是窗口算子(WindowOperator)最重要的组成部分。
    此外,在这两者之间,还可以基于 WindowedStream 调用.trigger()自定义触发器、调
    用.evictor()定义移除器、调用.allowedLateness()指定允许延迟时间、调用.sideOutputLateData()
    将迟到数据写入侧输出流,这些都是可选的 API,一般不需要实现。而如果定义了侧输出流,可以基于窗口聚合之后的 DataStream 调用.getSideOutput()获取侧输出流。

    迟到数据的处理

    有了事件时间、水位线和窗口的相关知识,现在就可以系统性地讨论一下怎样处理迟到数据了。我们知道,所谓的“迟到数据”(late data),是指某个水位线之后到来的数据,它的时间戳其实是在水位线之前的。所以只有在事件时间语义下,讨论迟到数据的处理才是有意义的。

    事件时间里用来表示时钟进展的就是水位线(watermark)。对于乱序流,水位线本身就可以设置一个延迟时间;而做窗口计算时,我们又可以设置窗口的允许延迟时间;另外窗口还有将迟到数据输出到测输出流的用法。所有的这些方法,它们之间有什么关系,我们又该怎样合理利用呢?这一节我们就来讨论这个问题

    设置水位线延迟时间

    水位线是事件时间的进展,它是我们整个应用的全局逻辑时钟。水位线生成之后,会随着数据在任务间流动,从而给每个任务指明当前的事件时间。所以从这个意义上讲,水位线是一个覆盖万物的存在,它并不只针对事件时间窗口有效。

    之前我们讲到触发器时曾提到过“定时器”,时间窗口的操作底层就是靠定时器来控制触发的。既然是底层机制,定时器自然就不可能是窗口的专利了;事实上它是 Flink 底层 API——处理函数(process function)的重要部分

    所以水位线其实是所有事件时间定时器触发的判断标准。那么水位线的延迟,当然也就是全局时钟的滞后,相当于是上帝拨动了琴弦,所有人的表都变慢了

    既然水位线这么重要,那一般情况就不应该把它的延迟设置得太大,否则流处理的实时性就会大大降低。因为水位线的延迟主要是用来对付分布式网络传输导致的数据乱序,而网络传输的乱序程度一般并不会很大,大多集中在几毫秒至几百毫秒。所以实际应用中,我们往往会给水位线设置一个“能够处理大多数乱序数据的小延迟”,视需求一般设在毫秒~秒级

    当我们设置了水位线延迟时间后,所有定时器就都会按照延迟后的水位线来触发。如果一个数据所包含的时间戳,小于当前的水位线,那么它就是所谓的“迟到数据”

    允许窗口处理迟到数据

    水位线延迟设置的比较小,那之后如果仍有数据迟到该怎么办?对于窗口计算而言,如果水位线已经到了窗口结束时间,默认窗口就会关闭,那么之后再来的数据就要被丢弃了

    自然想到,Flink 的窗口也是可以设置延迟时间,允许继续处理迟到数据的。

    这种情况下,由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果;然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。这样就可以逐步修正计算结果,最终得到准确的统计值了

    类比班车的例子,我们可以这样理解:大多数人是在发车时刻前后到达的,所以我们只要把表调慢,稍微等一会儿,绝大部分人就都上车了,这个把表调慢的时间就是水位线的延迟;到点之后,班车就准时出发了,不过可能还有该来的人没赶上。于是我们就先慢慢往前开,这段时间内,如果迟到的人抓点紧还是可以追上的;如果有人追上来了,就停车开门让他上来,然后车继续向前开。当然我们的车不能一直慢慢开,需要有一个时间限制,这就是窗口的允许延迟时间。一旦超过了这个时间,班车就不再停留,开上高速疾驰而去了

    所以我们将水位线的延迟和窗口的允许延迟数据结合起来,最后的效果就是先快速实时地输出一个近似的结果,而后再不断调整,最终得到正确的计算结果。回想流处理的发展过程,这不就是著名的 Lambda 架构吗?原先需要两套独立的系统来同时保证实时性和结果的最终正确性,如今 Flink 一套系统就全部搞定了

    将迟到数据放入窗口侧输出流

    即使我们有了前面的双重保证,可窗口不能一直等下去,最后总要真正关闭。窗口一旦关闭,后续的数据就都要被丢弃了。那如果真的还有漏网之鱼又该怎么办呢?

    那就要用到最后一招了:用窗口的侧输出流来收集关窗以后的迟到数据。这种方式是最后“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。我们只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到据,判断数据所属的窗口,手动对结果进行合并更新。尽管有些烦琐,实时性也不够强,但能够保证最终结果一定是正确的。

    如果还用赶班车来类比,那就是车已经上高速开走了,这班车是肯定赶不上了。不过我们还留下了行进路线和联系方式,迟到的人如果想办法辗转到了目的地,还是可以和大部队会合的。最终,所有该到的人都会在目的地出现

    所以总结起来,Flink 处理迟到数据,对于结果的正确性有三重保障:水位线的延迟,窗口允许迟到数据,以及将迟到数据放入窗口侧输出流。我们可以回忆一下之前
    每个 url 浏览次数的代码 UrlViewCountExample,稍作改进,增加处理迟到数据的功能。

    
    import com.atguigu.wordcount.Event;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    import java.time.Duration;
    
    public class ProcessLateDataExample {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            // 读取 socket 文本流
            SingleOutputStreamOperator<Event> stream =
                    env.socketTextStream("hadoop102", 7777)
                            .map(new MapFunction<String, Event>() {
                                @Override
                                public Event map(String value) throws Exception {
                                    String[] fields = value.split(" ");
                                    return new Event(fields[0].trim(), fields[1].trim(),
                                            Long.valueOf(fields[2].trim()));
                                }
                            })
                            // 方式一:设置 watermark 延迟时间,2 秒钟
                            .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                    .withTimestampAssigner(new
                                                                   SerializableTimestampAssigner<Event>() {
                                                                       @Override
                                                                       public long extractTimestamp(Event element, long
                                                                               recordTimestamp) {
                                                                           return element.timestamp;
                                                                       }
                                                                   }));
            // 定义侧输出流标签
            OutputTag<Event> outputTag = new OutputTag<Event>("late") {
            };
            SingleOutputStreamOperator<UrlViewCount> result = stream.keyBy(data ->
                            data.url)
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    // 方式二:允许窗口处理迟到数据,设置 1 分钟的等待时间
                    .allowedLateness(Time.minutes(1))
                    // 方式三:将最后的迟到数据输出到侧输出流
                    .sideOutputLateData(outputTag)
                    .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
            result.print("result");
            result.getSideOutput(outputTag).print("late");
            // 为方便观察,可以将原始数据也输出
            stream.print("input");
            env.execute();
        }
    
        public static class UrlViewCountAgg implements AggregateFunction<Event, Long,
                Long> {
            @Override
            public Long createAccumulator() {
                return 0L;
            }
    
            @Override
            public Long add(Event value, Long accumulator) {
                return accumulator + 1;
            }
    
            @Override
            public Long getResult(Long accumulator) {
                return accumulator;
            }
    
            @Override
            public Long merge(Long a, Long b) {
                return null;
            }
        }
    
        public static class UrlViewCountResult extends ProcessWindowFunction<Long,
                UrlViewCount, String, TimeWindow> {
            @Override
            public void process(String url, Context context, Iterable<Long> elements,
                                Collector<UrlViewCount> out) throws Exception {
                // 结合窗口信息,包装输出内容
                Long start = context.window().getStart();
                Long end = context.window().getEnd();
                out.collect(new UrlViewCount(url, elements.iterator().next(), start,
                        end));
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    总结

    在流处理中,由于对实时性的要求非常高,同时又要求能够保证窗口操作结果的正确,所以必须引入水位线来描述事件时间。而窗口正是时间相关的最佳应用场景,所以 Flink 为我们提供了丰富的窗口类型和处理操作;与此同时,在实际应用中很难对乱序流给出一个最佳延迟时间,单独依赖水位线去保证结果正确性是不够的,所以需要结合窗口(Window)处理迟到数据的相关 API。

    我们详细了解了 Flink 中时间语义和水位线的概念、窗口 API 的用法以及处理迟到数据的相关知识,这些内容对于实时流处理来说非常重要。Flink 的时间语义和窗口,主要就是为了处理大规模的乱序数据流时,同时保证低延迟、高吞吐和结果的正确性。这部分设计基本上是对谷歌(Google)著名论文《数据流模型:一种在大规模、无界、无序数据处理中平衡正确性、延迟和性能的实用方法》(The Dataflow Model:
    A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing)的具体实现,如果读者有兴趣可以读一下原始论文,会对流处理有更加深刻的理解

  • 相关阅读:
    G1D13-Apt论文阅读&fraud&git&KGbook&rce33-36&php环境搭建
    【转载】分布式训练和集合通信
    Firewalld防火墙
    免费的国产数据集成平台推荐
    数据结构第三部分——树和二叉树(C语言版)
    CSS 01
    【Matlab】常用函数汇总(一)
    深度神经网络模型有哪些,深度神经网络预测模型
    Excel最基本的常用函数
    在 AWS Marketplace 上订阅 EMQX Cloud 按量计费版
  • 原文地址:https://blog.csdn.net/zhouhe_/article/details/127993834