• 源码解析FlinkKafkaConsumer支持punctuated水位线发送


    背景

    FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码

    punctuated 水位线发送源码解析

    1.首先KafkaFetcher中的runFetchLoop方法

    public void runFetchLoop() throws Exception {
            try {
                // kick off the actual Kafka consumer
                consumerThread.start();
    
                while (running) {
                    // this blocks until we get the next records
                    // it automatically re-throws exceptions encountered in the consumer thread
                    final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
    
                    // get the records for each topic partition
                    for (KafkaTopicPartitionState<T, TopicPartition> partition :
                            subscribedPartitionStates()) {
    
                        List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                                records.records(partition.getKafkaPartitionHandle());
    // 算子任务消费的每个分区都调用这个方法
                        partitionConsumerRecordsHandler(partitionRecords, partition);
                    }
                }
            } finally {
                // this signals the consumer thread that no more work is to be done
                consumerThread.shutdown();
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线

        protected void emitRecordsWithTimestamps(
                Queue<T> records,
                KafkaTopicPartitionState<T, KPH> partitionState,
                long offset,
                long kafkaEventTimestamp) {
            // emit the records, using the checkpoint lock to guarantee
            // atomicity of record emission and offset state update
            synchronized (checkpointLock) {
                T record;
                while ((record = records.poll()) != null) {
                    long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);
                    // 发送kafka记录到下游算子
                    sourceContext.collectWithTimestamp(record, timestamp);
    
                    // this might emit a watermark, so do it after emitting the record
                    // 处理分区的水位线,记录这个分区的水位线,并在满足条件时更新整个算子任务的水位线
                    partitionState.onEvent(record, timestamp);
                }
                partitionState.setOffset(offset);
            }
        }```
    
    3.处理每个分区的水位线
    
    ```java
        public void onEvent(T event, long timestamp) {
            watermarkGenerator.onEvent(event, timestamp, immediateOutput);
        }
        public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
            final org.apache.flink.streaming.api.watermark.Watermark next =
                    wms.checkAndGetNextWatermark(event, eventTimestamp);
    
            if (next != null) {
                output.emitWatermark(new Watermark(next.getTimestamp()));
            }
        }
        其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下
                public void emitWatermark(Watermark watermark) {
                long timestamp = watermark.getTimestamp();
                // 更新每个分区对应的水位线,并且更新
                boolean wasUpdated = state.setWatermark(timestamp);
    
                // if it's higher than the max watermark so far we might have to update the
                // combined watermark 这个表明这个算子任务的最低水位线,也就是算子任务级别的水位线,而不是分区级别的了
                if (wasUpdated && timestamp > combinedWatermark) {
                    updateCombinedWatermark();
                }
            }
     //每个分区水位线的更新如下
             public boolean setWatermark(long watermark) {
                this.idle = false;
                final boolean updated = watermark > this.watermark;
                this.watermark = Math.max(watermark, this.watermark);
                return updated;
            }       
            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    4.最后是发送算子任务级别的水位线的方法

    private void updateCombinedWatermark() {
            long minimumOverAllOutputs = Long.MAX_VALUE;
    
            boolean hasOutputs = false;
            boolean allIdle = true;
            for (OutputState outputState : watermarkOutputs) {
                if (!outputState.isIdle()) {
                    minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());
                    allIdle = false;
                }
                hasOutputs = true;
            }
    
            // if we don't have any outputs minimumOverAllOutputs is not valid, it's still
            // at its initial Long.MAX_VALUE state and we must not emit that
            if (!hasOutputs) {
                return;
            }
    
            if (allIdle) {
                underlyingOutput.markIdle();
            } else if (minimumOverAllOutputs > combinedWatermark) {
                combinedWatermark = minimumOverAllOutputs;
                underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    你可以看这个流程,是不是意味着如果使用Punctuated的方式,是不支持Idle空闲时间的?–答案是的

  • 相关阅读:
    redis集群模式下key过期时间监听
    LSKA(大可分离核注意力):重新思考CNN大核注意力设计
    深入理解HTTP/HTTPS协议
    前端面试题日常练-day45 【面试题】
    Tomcat加载静态资源--防止SpringMVC拦截
    C++对C的扩展
    剑指 Offer 07. 重建二叉树
    Docker的基本使用
    生成逆向调试符号的几款工具
    微服务集成Spring Cloud Zipkin实现链路追踪并集成Dubbo
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133823145