• 源码解析FlinkKafkaConsumer支持周期性水位线发送


    背景

    flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程

    FlinkKafkaConsumer水位线发送

    1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动

            // if we have periodic watermarks, kick off the interval scheduler
            if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
                PeriodicWatermarkEmitter<T, KPH> periodicEmitter =
                        new PeriodicWatermarkEmitter<>(
                                checkpointLock,
                                subscribedPartitionStates,
                                watermarkOutputMultiplexer,
                                processingTimeProvider,
                                autoWatermarkInterval);
    
                periodicEmitter.start();
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2.随后,PeriodicWatermarkEmitter中注册处理时间定时器,周期性执行

            public void start() {
                timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
            }
    
            @Override
            public void onProcessingTime(long timestamp) {
    
                synchronized (checkpointLock) {
                    for (KafkaTopicPartitionState<?, ?> state : allPartitions) {
                        // 这里当前算子任务消费的kafka 分区分别记录每个分区的水位值
                        state.onPeriodicEmit();
                    }
    				//这里当前算子会把自己消费的kafka分区的所有水位线取最小值后当成当前算子任务自身的水位线发送出去,注意这里是当前算子任务级别的
                    watermarkOutputMultiplexer.onPeriodicEmit();
                }
    
                // schedule the next watermark
                timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    3.对应state.onPeriodicEmit();记录每个kafka分区的水位线方法

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();
            if (next != null) {
                output.emitWatermark(new Watermark(next.getTimestamp()));
            }
        }
    其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代码如下:
            public DeferredOutput(OutputState state) {
                this.state = state;
            }
    
            @Override
            public void emitWatermark(Watermark watermark) {
                state.setWatermark(watermark.getTimestamp());
            }
    所以这里最终效果只是对应state(kafka分区[注意,一个算子任务有可能消费好几个kafka分区])上设置了水位线
            /**
             * Returns true if the watermark was advanced, that is if the new watermark is larger than
             * the previous one.
             *
             * 

    Setting a watermark will clear the idleness flag. */ public boolean setWatermark(long watermark) { this.idle = false; final boolean updated = watermark > this.watermark; // 这里也可以看出来,即使代码里面发送了更小值的水位线,水位线也不会回退 this.watermark = Math.max(watermark, this.watermark); return updated; }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    4.对应算子任务组合当前任务消费的所有分区水位线的方法

    private void updateCombinedWatermark() {
            long minimumOverAllOutputs = Long.MAX_VALUE;
    
            boolean hasOutputs = false;
            boolean allIdle = true;
            for (OutputState outputState : watermarkOutputs) {
                if (!outputState.isIdle()) {
                    minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());
                    allIdle = false;
                }
                hasOutputs = true;
            }
    
            // if we don't have any outputs minimumOverAllOutputs is not valid, it's still
            // at its initial Long.MAX_VALUE state and we must not emit that
            // 如果算子任务不消费任何分区,它不会发出任何水位线,这里是不是就是kafka消费者要小于kafka主题的原因所在???
            if (!hasOutputs) {
                return;
            }
    
            if (allIdle) {// 如果当前算子任务处于空闲时间,标识空闲,以便后续算子可以继续推进
                underlyingOutput.markIdle();
            } else if (minimumOverAllOutputs > combinedWatermark) {
                combinedWatermark = minimumOverAllOutputs;
                underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));
            }
        }```
    
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
  • 相关阅读:
    如何设计一个优惠券系统
    第一章 赛前准备工作
    基于C++MFC的网络安全扫描器的设计与实现
    javaweb教师人事管理系统的设计
    Java多线程开发系列之六:无限分解流----Fork/Join框架
    【目标检测】|yolov6 结构代码分析
    跨足泛娱乐:TikTok如何重新定义娱乐产业?
    调用网络时报错name weight already exists
    pdf只要其中一页,pdf只要其中几页怎么弄
    【MAPBOX基础功能】22、mapbox根据指定中心点绘制指定公里数的矩形
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133822433