@Internal
public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
// 链化策略为:ALWAYS
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// StreamOperator会调用它对应的xxxFunction处理接入的数据
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
RichFunction接口扩展了“状态计算”的功能,它提供了控制函数生命周期的open、close方法,获取RuntimeContext的getRuntimeContext方法,可以让我们通过RuntimeContext来初始化State、操作State数据。
RichFunction接口是对Function接口在“状态计算”方面的功能扩展,抽象类AbstractRichFunction提供了RichFunction的基本实现。
RichFunction之所以能够支持State计算,全都仰仗于RuntimeContext。RuntimeContext是xxxFunction执行时的上下文环境,xxxFunction的每一个并行实例都有一个RuntimeContext。xxxFunction可以在运行时通过AbstractRichFunction#getRuntimeContext()获取到RuntimeContext。有了RuntimeContext,就可以“一通get”
同时Flink为了满足不同的StreamOperator对RuntimeContext的获取,特地提供了不同类型的RuntimeContext实现子类,不同类型的StreamOperator可以对应的创建不同的RuntimeContext实现子类。其中处理流式数据用到的RuntimeContext实现子类为AbstractRuntimeUDFContext(主要用于获取自定义函数xxxFunction在运行时的相关信息)的子类:StreamingRuntimeContext。也就是说,在绝大部分的转换操作中,我们为了使用State计算,要依赖StreamingRuntimeContext去“一通get”。
SourceFunction接口继承了Function接口,是对Function接口进行了“读取数据源方面”的扩展。读到的数据会被转换为StreamRecord(数据结构)发送给下游的StreamOperator。
SourceFunction接口定义了用于读取数据的run()方法和数据接入过程中用到上下文SourceContext接口
@Public
public interface SourceFunction<T> extends Function, Serializable {
// 开始读取Source,实现子类可以使用SourceContext向后发射数据
void run(SourceContext<T> ctx) throws Exception;
// 大部分的Source源的实现run()方法中都有一个while循环,本方法会跳出循环
void cancel();
// SourceFunction用来发射数据元素或WaterMark的接口
@Public
interface SourceContext<T> {
// 收集外部数据源读取到的数据,发送到下游算子中
void collect(T element);
// 将外部数据源读到的数据和timestamp,发送到下游算子中
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
// 在SourceFunction中生成WaterMark,并发送到下游算子中处理
@PublicEvolving
void emitWatermark(Watermark mark);
@PublicEvolving
void markAsTemporarilyIdle();
// 获取checkpoint lock(使用FlinkKafkaConsumer时,会使用checkpoint lock来保证“记录发出的原子性和offset更新”)
Object getCheckpointLock();
void close();
}
}
默认情况下,SourceFunction接口不支持并行读取,所以ParallelSourceFunction接口对它进行了补充。由于抽象类AbstractRichFunction提供了RichFunction接口的基本实现方法,而RichFunction接口支持了State计算。“一个负责貌美如花,一个负责插花”。于是,拥有并行读取能力的ParallelSourceFunction接口 + 拥有State计算能力的抽象类AbstractRichFunction = 既能并行读取又能支持状态计算的抽象类RichParallelSourceFunction。最经典的RichParallelSourceFunction实例就是FlinkKafkaConsumer:使用OperatorState保存Kafka的消费offset,实现端到端exactly once的语义。
RichParallelSourceFunction可以在“支持State计算”的前提下,并行的读取到外部数据源的数据。那么抽取timestamp、生成Watermark、把读到的数据collect到下游等一系列操作,就交给了SourceContext。
SourceContext是SourceFunction接口的内部接口,主要用于收集SourceFunction中的上下文信息。SourceContext定义了以下的接口方法:
总的来说,SourceFunction接口的实现类提供run()方法读取外部数据源,数据通过SourceContext提供的collect()方法发送到下游(当然也会提供其他方法抽取timestamp、生成Watermark等)。
StreamSourceContexts#getSourceContext()方法会根据Developer选择的时间语义,创建不同类型的SourceContext实现子类:
截止到目前为止,我们已经定义好了一个自定义函数xxxFunction,它可以并行的读取外部数据、抽取timestamp生成WaterMark、顺利的collect给下游。通过DataStream的转换过程,我们知道StreamOperator会通过成员变量userFunction来持有自定义函数xxxFunction。于是在StreamSource算子中,就可以在合适的时机,get到我们使用的时间语义,并根据时间语义的类型创建出对应的SourceContext实现子类。接着就能利用StreamSource(作为StreamOperator)“合法”持有的userFunction,调用我们自定义函数xxxFunction内的方法做一些事,比如:并行读取外部数据、抽取timestamp生成WaterMark、通过时间语义对应的SourceContext实现子类将数据发送给下游(sourceContext.collect…)
/**
* StreamSource#run()方法调用SourceFunction#run()方法
*/
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector,
final OperatorChain<?, ?> operatorChain) throws Exception {
// 获取TimeCharacteristic
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
// 创建Configuration
final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: configuration.getLong(MetricOptions.LATENCY_INTERVAL);
// 创建LatencyMarksEmitter(在SourceFunction中输出Latency标记,即周期性的生成时间戳)
// 当下游算子收到SourceOperator发送的LatencyMark后,当前时间 - LatencyMark = 该算子处理数据的延时情况
// 最后算子将LatencyMark监控指标以Metric的形式发送到外部监控系统中
LatencyMarksEmitter<OUT> latencyEmitter = null;
if (latencyTrackingInterval > 0) {
latencyEmitter = new LatencyMarksEmitter<>(
getProcessingTimeService(),
collector,
latencyTrackingInterval,
this.getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());
}
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
// 创建SourceContext
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);
try {
// 调用、执行SourceFunction实例(SourceContext被包装在SourceFunction内,SourceFunction可以直接操作SourceContext)
userFunction.run(ctx);
// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time, and end inputs
// of the operator chain
if (!isCanceledOrStopped()) {
// in theory, the subclasses of StreamSource may implement the BoundedOneInput interface,
// so we still need the following call to end the input
synchronized (lockingObject) {
operatorChain.endHeadOperatorInput(1);
}
}
} finally {
if (latencyEmitter != null) {
latencyEmitter.close();
}
}
}
SinkFunction是将上游的数据,sink到外部数据源中。SinkFunction继承Function接口,是对Function接口进行了“输出外部系统”的扩展。
提供State计算基本实现的AbstractRichFunction + 扩展了“Sink功能”的SinkFunction = RichSinkFunction。TwoPhaseCommitSinkFunction作为RichSinkFunction的子类,可以基于两阶段提交来保证端到端的数据一致性。而FlinkKafkaProducer作为TwoPhaseCommitSinkFunction的子类,
@Public
public interface SinkFunction<IN> extends Function, Serializable {
// 每来一条record,都会调用一次:将给定的value,写入到Sink。
@Deprecated
default void invoke(IN value) throws Exception {}
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
/**
* Sink操作相关的上下文
*/
@Public
interface Context<T> {
// 返回当前的ProcessingTime
long currentProcessingTime();
// 返回当前的Watermark
long currentWatermark();
// 返回当前的timest,如果没有就return null
Long timestamp();
}
}
当然,也需要通过SinkContext获取sink操作相关的上下文。SinkFunction提供了write到外部系统的invoke()方法,每来一条数据都会调用一次。SinkContext提供了从数据中获取到ProcessingTime、水印、timestamp等方法。
StreamSink算子提供了SinkFunction.Context的默认实现:
private class SimpleContext<IN> implements SinkFunction.Context<IN> {
private StreamRecord<IN> element;
private final ProcessingTimeService processingTimeService;
public SimpleContext(ProcessingTimeService processingTimeService) {
this.processingTimeService = processingTimeService;
}
@Override
public long currentProcessingTime() {
return processingTimeService.getCurrentProcessingTime();
}
@Override
public long currentWatermark() {
return currentWatermark;
}
@Override
public Long timestamp() {
if (element.hasTimestamp()) {
return element.getTimestamp();
}
return null;
}
}
SinkFunction提供了invoke()方法写出到外部Sink,StreamSink算子(作为StreamOperator)通过成员变量userFunction已经“合法”地持有了自定义函数xxxFunction。当StreamSink算子被调用processElement()方法去“处理数据”时,会通过成员变量userFunction调用自定义函数的invoke(),同时将SinkContext作为参数传入。在执行invoke()的同时,会按情况“合理利用”SinkContext,如:获取当前数据的Watermark、timestamp等。
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// SinkContext的初始化是在StreamSink算子的open()完成的
sinkContext.element = element;
// 通过成员变量userFunction,调用自定义函数的invoke()方法
userFunction.invoke(element.getValue(), sinkContext);
}
总的来说,SinkFunction的设计思想跟SourceFunction一样,都是SourceFunction/SinkFunction提供读入/写出方法,由SourceContext/SinkContext提供一系列处理数据的方法。而StreamSource/StreamSink算子作为StreamOperator,可以通过一个成员变量userFunction“合法”地持有我们的自定义函数xxxFunction。当StreamOperator需要处理数据时,就会通过自定义函数xxxFunction间接的处理!
根据数据是否进行了KeyBy,将ProcessFunction分为:
不管是哪种ProcessFunction,都继承了AbstractRichFunction,即支持State计算了。
KeyedProcessFunction内部定义了2种Context:
@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
// 读取数据元素,并处理
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
/**
* 定义了从数据元素中获取timestamp、从运行时获取TimerService
*/
public abstract class Context {
// 从数据元素中获取timestamp
public abstract Long timestamp();
// 获取TimerService:注册Timer
public abstract TimerService timerService();
// 用于侧输出
public abstract <X> void output(OutputTag<X> outputTag, X value);
// 获取当前正在被处理的元素的KeyBy的key
public abstract K getCurrentKey();
}
/**
* 应用在KeyedProcessFunction#OnTimer方法中,是对Context的“额外”扩展
*/
public abstract class OnTimerContext extends Context {
// 指定触发Timer的是基于 ProcessingTime or EventTime
public abstract TimeDomain timeDomain();
@Override
public abstract K getCurrentKey();
}
}
在KeyedProcessFunction中通过抽象方法processElement()读取、处理数据,并按需创建Timer,Timer会被注册到Context的TimerService定时器队列中(内部专用的InternalTimerService中) --> Context.timerService().registerEventTimeTimer()
当满足Timer触发条件时,会回调OnTimer方法执行Timer的计算逻辑 --> OnTimerContext.timerService().deleteEventTimeTimer(xxx)