• Function接口设计



    **DataStream转换操作中的数据处理逻辑是通过自定义函数xxxFunction实现的,Function接口是所有自定义函数的父类,专门用于处理StreamOperator接入的数据。StreamOperator会调用内部的xxxFunction,处理接入的数据,然后将处理后的数据发送到下游算子继续处理。**如StreamMap算子:

    @Internal
    public class StreamMap<IN, OUT>
    		extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
    		implements OneInputStreamOperator<IN, OUT> {
    
    	private static final long serialVersionUID = 1L;
    
    	public StreamMap(MapFunction<IN, OUT> mapper) {
    		super(mapper);
    		// 链化策略为:ALWAYS
    		chainingStrategy = ChainingStrategy.ALWAYS;
    	}
    
    	@Override
    	public void processElement(StreamRecord<IN> element) throws Exception {
    		// StreamOperator会调用它对应的xxxFunction处理接入的数据
    		output.collect(element.replace(userFunction.map(element.getValue())));
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    RichFunction接口扩展了“状态计算”的功能,它提供了控制函数生命周期的open、close方法,获取RuntimeContext的getRuntimeContext方法,可以让我们通过RuntimeContext来初始化State、操作State数据。

    1.RichFunction接口

    RichFunction接口是对Function接口在“状态计算”方面的功能扩展,抽象类AbstractRichFunction提供了RichFunction的基本实现。

    在这里插入图片描述

    RichFunction之所以能够支持State计算,全都仰仗于RuntimeContext。RuntimeContext是xxxFunction执行时的上下文环境,xxxFunction的每一个并行实例都有一个RuntimeContext。xxxFunction可以在运行时通过AbstractRichFunction#getRuntimeContext()获取到RuntimeContext。有了RuntimeContext,就可以“一通get”

    同时Flink为了满足不同的StreamOperator对RuntimeContext的获取,特地提供了不同类型的RuntimeContext实现子类,不同类型的StreamOperator可以对应的创建不同的RuntimeContext实现子类。其中处理流式数据用到的RuntimeContext实现子类为AbstractRuntimeUDFContext(主要用于获取自定义函数xxxFunction在运行时的相关信息)的子类:StreamingRuntimeContext。也就是说,在绝大部分的转换操作中,我们为了使用State计算,要依赖StreamingRuntimeContext去“一通get”。

    2.SourceFunction接口

    SourceFunction接口继承了Function接口,是对Function接口进行了“读取数据源方面”的扩展。读到的数据会被转换为StreamRecord(数据结构)发送给下游的StreamOperator。

    SourceFunction接口定义了用于读取数据的run()方法数据接入过程中用到上下文SourceContext接口

    @Public
    public interface SourceFunction<T> extends Function, Serializable {
    
        // 开始读取Source,实现子类可以使用SourceContext向后发射数据
        void run(SourceContext<T> ctx) throws Exception;
    
        // 大部分的Source源的实现run()方法中都有一个while循环,本方法会跳出循环
        void cancel();
    
        // SourceFunction用来发射数据元素或WaterMark的接口
        @Public 
        interface SourceContext<T> {
    
            // 收集外部数据源读取到的数据,发送到下游算子中
            void collect(T element);
    
            //  将外部数据源读到的数据和timestamp,发送到下游算子中
            @PublicEvolving
            void collectWithTimestamp(T element, long timestamp);
    
            // 在SourceFunction中生成WaterMark,并发送到下游算子中处理
            @PublicEvolving
            void emitWatermark(Watermark mark);
    
    
            @PublicEvolving
            void markAsTemporarilyIdle();
    
            // 获取checkpoint lock(使用FlinkKafkaConsumer时,会使用checkpoint lock来保证“记录发出的原子性和offset更新”)
            Object getCheckpointLock();
    
            void close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    默认情况下,SourceFunction接口不支持并行读取,所以ParallelSourceFunction接口对它进行了补充。由于抽象类AbstractRichFunction提供了RichFunction接口的基本实现方法,而RichFunction接口支持了State计算。“一个负责貌美如花,一个负责插花”。于是,拥有并行读取能力的ParallelSourceFunction接口 + 拥有State计算能力的抽象类AbstractRichFunction = 既能并行读取又能支持状态计算的抽象类RichParallelSourceFunction。最经典的RichParallelSourceFunction实例就是FlinkKafkaConsumer:使用OperatorState保存Kafka的消费offset,实现端到端exactly once的语义。

    RichParallelSourceFunction可以在“支持State计算”的前提下,并行的读取到外部数据源的数据。那么抽取timestamp、生成Watermark、把读到的数据collect到下游等一系列操作,就交给了SourceContext。

    SourceContext是SourceFunction接口的内部接口,主要用于收集SourceFunction中的上下文信息。SourceContext定义了以下的接口方法:

    • collect()方法:将外部数据源读取到的数据,发送到下游算子中
    • collectWithTimestamp()方法 :将外部数据源读到的数据和timestamp,发送到下游算子中
    • emitWatermark()方法 :在SourceFunction中生成WaterMark,并发送到下游算子中处理
    • getCheckpointLock()方法:获取checkpoint lock(使用FlinkKafkaConsumer时,会使用checkpoint lock来保证“记录发出的原子性和offset更新”)

    总的来说,SourceFunction接口的实现类提供run()方法读取外部数据源,数据通过SourceContext提供的collect()方法发送到下游(当然也会提供其他方法抽取timestamp、生成Watermark等)。

    StreamSourceContexts#getSourceContext()方法会根据Developer选择的时间语义,创建不同类型的SourceContext实现子类:

    • WatermarkContext implements SourceFunction.SourceContext:支持抽取timestamp、生成WaterMark
      • ManualWatermarkContext extends WatermarkContext:对应事件时间
      • AutomaticWatermarkContext extends WatermarkContext:对应接入时间
    • NonTimestampContext implements SourceFunction.SourceContext:对应ProcessingTime

    截止到目前为止,我们已经定义好了一个自定义函数xxxFunction,它可以并行的读取外部数据、抽取timestamp生成WaterMark、顺利的collect给下游。通过DataStream的转换过程,我们知道StreamOperator会通过成员变量userFunction来持有自定义函数xxxFunction。于是在StreamSource算子中,就可以在合适的时机,get到我们使用的时间语义,并根据时间语义的类型创建出对应的SourceContext实现子类。接着就能利用StreamSource(作为StreamOperator)“合法”持有的userFunction,调用我们自定义函数xxxFunction内的方法做一些事,比如:并行读取外部数据、抽取timestamp生成WaterMark、通过时间语义对应的SourceContext实现子类将数据发送给下游(sourceContext.collect…)

    /**
     * StreamSource#run()方法调用SourceFunction#run()方法
     */
    public void run(final Object lockingObject,
                    final StreamStatusMaintainer streamStatusMaintainer,
                    final Output<StreamRecord<OUT>> collector,
                    final OperatorChain<?, ?> operatorChain) throws Exception {
    
        // 获取TimeCharacteristic
        final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
    
        // 创建Configuration
        final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
        final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
            ? getExecutionConfig().getLatencyTrackingInterval()
            : configuration.getLong(MetricOptions.LATENCY_INTERVAL);
    
        // 创建LatencyMarksEmitter(在SourceFunction中输出Latency标记,即周期性的生成时间戳)
        // 当下游算子收到SourceOperator发送的LatencyMark后,当前时间 - LatencyMark = 该算子处理数据的延时情况
        // 最后算子将LatencyMark监控指标以Metric的形式发送到外部监控系统中
        LatencyMarksEmitter<OUT> latencyEmitter = null;
        if (latencyTrackingInterval > 0) {
            latencyEmitter = new LatencyMarksEmitter<>(
                getProcessingTimeService(),
                collector,
                latencyTrackingInterval,
                this.getOperatorID(),
                getRuntimeContext().getIndexOfThisSubtask());
        }
    
        final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
    
        // 创建SourceContext
        this.ctx = StreamSourceContexts.getSourceContext(
            timeCharacteristic,
            getProcessingTimeService(),
            lockingObject,
            streamStatusMaintainer,
            collector,
            watermarkInterval,
            -1);
    
        try {
            // 调用、执行SourceFunction实例(SourceContext被包装在SourceFunction内,SourceFunction可以直接操作SourceContext)
            userFunction.run(ctx);
    
            // if we get here, then the user function either exited after being done (finite source)
            // or the function was canceled or stopped. For the finite source case, we should emit
            // a final watermark that indicates that we reached the end of event-time, and end inputs
            // of the operator chain
            if (!isCanceledOrStopped()) {
                // in theory, the subclasses of StreamSource may implement the BoundedOneInput interface,
                // so we still need the following call to end the input
                synchronized (lockingObject) {
                    operatorChain.endHeadOperatorInput(1);
                }
            }
        } finally {
            if (latencyEmitter != null) {
                latencyEmitter.close();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    3.SinkFunction接口

    SinkFunction是将上游的数据,sink到外部数据源中。SinkFunction继承Function接口,是对Function接口进行了“输出外部系统”的扩展。

    提供State计算基本实现的AbstractRichFunction + 扩展了“Sink功能”的SinkFunction = RichSinkFunction。TwoPhaseCommitSinkFunction作为RichSinkFunction的子类,可以基于两阶段提交来保证端到端的数据一致性。而FlinkKafkaProducer作为TwoPhaseCommitSinkFunction的子类,

    @Public
    public interface SinkFunction<IN> extends Function, Serializable {
    
        // 每来一条record,都会调用一次:将给定的value,写入到Sink。
    	@Deprecated
    	default void invoke(IN value) throws Exception {}
    	default void invoke(IN value, Context context) throws Exception {
    		invoke(value);
    	}
    
        /**
         * Sink操作相关的上下文
         */
    	@Public
    	interface Context<T> {
    
            // 返回当前的ProcessingTime
    		long currentProcessingTime();
    
            // 返回当前的Watermark
    		long currentWatermark();
    
            // 返回当前的timest,如果没有就return null
    		Long timestamp();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    当然,也需要通过SinkContext获取sink操作相关的上下文。SinkFunction提供了write到外部系统的invoke()方法,每来一条数据都会调用一次。SinkContext提供了从数据中获取到ProcessingTime、水印、timestamp等方法。

    StreamSink算子提供了SinkFunction.Context的默认实现:

    private class SimpleContext<IN> implements SinkFunction.Context<IN> {
    
        private StreamRecord<IN> element;
    
        private final ProcessingTimeService processingTimeService;
    
        public SimpleContext(ProcessingTimeService processingTimeService) {
            this.processingTimeService = processingTimeService;
        }
    
        @Override
        public long currentProcessingTime() {
            return processingTimeService.getCurrentProcessingTime();
        }
    
        @Override
        public long currentWatermark() {
            return currentWatermark;
        }
    
        @Override
        public Long timestamp() {
            if (element.hasTimestamp()) {
                return element.getTimestamp();
            }
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    SinkFunction提供了invoke()方法写出到外部Sink,StreamSink算子(作为StreamOperator)通过成员变量userFunction已经“合法”地持有了自定义函数xxxFunction。当StreamSink算子被调用processElement()方法去“处理数据”时,会通过成员变量userFunction调用自定义函数的invoke(),同时将SinkContext作为参数传入。在执行invoke()的同时,会按情况“合理利用”SinkContext,如:获取当前数据的Watermark、timestamp等。

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        // SinkContext的初始化是在StreamSink算子的open()完成的
        sinkContext.element = element;
        // 通过成员变量userFunction,调用自定义函数的invoke()方法
        userFunction.invoke(element.getValue(), sinkContext);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    总的来说,SinkFunction的设计思想跟SourceFunction一样,都是SourceFunction/SinkFunction提供读入/写出方法,由SourceContext/SinkContext提供一系列处理数据的方法。而StreamSource/StreamSink算子作为StreamOperator,可以通过一个成员变量userFunction“合法”地持有我们的自定义函数xxxFunction。当StreamOperator需要处理数据时,就会通过自定义函数xxxFunction间接的处理!

    4.ProcessFunction抽象类

    根据数据是否进行了KeyBy,将ProcessFunction分为:

    • KeyedProcessFunction:使用更多
    • ProcessFunction:主要实现类之一有用于实现维表关联的LookupJoinRunner

    不管是哪种ProcessFunction,都继承了AbstractRichFunction,即支持State计算了。

    KeyedProcessFunction内部定义了2种Context:

    • Context :应用于KeyedProcessFunction#processElement()方法中,定义了侧输出output(分流)、获取TimerService服务(注册Timer)等
    • OnTimerContext extends Context:应用于KeyedProcessFunction#onTimer()方法中,相比于Context而言,额外扩展了枚举类TimeDomain(用来指定触发Timer的是基于 ProcessingTime or EventTime)
    @PublicEvolving
    public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    
    	private static final long serialVersionUID = 1L;
    
    	// 读取数据元素,并处理
    	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    
    	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    
    	/**
    	 * 定义了从数据元素中获取timestamp、从运行时获取TimerService
    	 */
    	public abstract class Context {
    
    		// 从数据元素中获取timestamp
    		public abstract Long timestamp();
    
    		// 获取TimerService:注册Timer
    		public abstract TimerService timerService();
    
    		// 用于侧输出
    		public abstract <X> void output(OutputTag<X> outputTag, X value);
    
    		// 获取当前正在被处理的元素的KeyBy的key
    		public abstract K getCurrentKey();
    	}
    
    	/**
    	 * 应用在KeyedProcessFunction#OnTimer方法中,是对Context的“额外”扩展
    	 */
    	public abstract class OnTimerContext extends Context {
    
    		// 指定触发Timer的是基于 ProcessingTime or EventTime
    		public abstract TimeDomain timeDomain();
    
    		@Override
    		public abstract K getCurrentKey();
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    在KeyedProcessFunction中通过抽象方法processElement()读取、处理数据,并按需创建Timer,Timer会被注册到Context的TimerService定时器队列中(内部专用的InternalTimerService中) --> Context.timerService().registerEventTimeTimer()

    当满足Timer触发条件时,会回调OnTimer方法执行Timer的计算逻辑 --> OnTimerContext.timerService().deleteEventTimeTimer(xxx)

  • 相关阅读:
    实验一 将调试集成到vscode
    工作积累——Web请求中使用ThreadLocal遇见的问题
    缓存三大坑
    kubernetes集群安装实战
    MySQL入门(SQL语句、约束)
    Java中set集合简介说明
    [附源码]java毕业设计构建养猪场管理系统
    编译构建 meson ninja
    信道复用技术
    vueshowpdf 移动端pdf文件预览
  • 原文地址:https://blog.csdn.net/qq_36299025/article/details/127536945