先上代码
package com.daidai.checkpoint;
import com.daidai.source.mocksource.domain.Order;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
public class WaterMarksAllowedLatenessCheckPoint {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpoint
env.enableCheckpointing(5000);
//设置checkpoint
env.setStateBackend(new FsStateBackend("file:///E:/Code/IdeaCode/Flink1.13_Java/checkpoint"));
DataStreamSource<Order> orderDataStreamSource = env.addSource(new SourceFunction<Order>() {
private boolean flag = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
while (flag) {
Random random = new Random();
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setUserId(random.nextInt(3));
order.setCreateTime(System.currentTimeMillis() - random.nextInt(15) * 1000);
order.setMoney(random.nextInt(100));
ctx.collect(order);
}
}
@Override
public void cancel() {
flag = false;
}
});
OutputTag<Order> later = new OutputTag<>("later", TypeInformation.of(Order.class));
SingleOutputStreamOperator<Order> timestampsAndWatermarks = orderDataStreamSource
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((context, timestmp) -> context.getCreateTime()));
SingleOutputStreamOperator<Order> sum = timestampsAndWatermarks.keyBy(Order::getUserId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5))
.sideOutputLateData(later)
.sum("money");
sum.print("正常数据");
DataStream<Order> laterDS = sum.getSideOutput(later);
laterDS.print("迟到严重数据");
env.execute();
}
}
/**为流式作业启用检查点。流式数据流的分布式状态将被定期快照。如果发生故障,流式数据流将从最新完成的检查点重新启动。此方法选择CheckpointingMode.EXACTLY_ONCE保证。
该作业在给定的时间间隔内定期绘制检查点。状态将存储在配置的状态后端中。
注意:目前不正确支持检查点迭代流数据流。因此,如果与启用的检查点一起使用,则不会启动迭代作业。要覆盖此机制,请使用enableCheckpointing(long, CheckpointingMode, boolean)方法。
参数:interval – 状态检查点之间的时间间隔,以毫秒为单位。*/
public StreamExecutionEnvironment enableCheckpointing(long interval) {
checkpointCfg.setCheckpointInterval(interval);
return this;
}
/**为流式作业启用检查点。流式数据流的分布式状态将被定期快照。如果发生故障,流式数据流将从最新完成的检查点重新启动。该作业在给定的时间间隔内定期绘制检查点。系统使用给定的CheckpointingMode进行检查点(“恰好一次”与“至少一次”)。状态将存储在配置的状态后端中。注意:目前不正确支持检查点迭代流数据流。因此,如果与启用的检查点一起使用,则不会启动迭代作业。要覆盖此机制,请使用enableCheckpointing(long, CheckpointingMode, boolean)方法。
参数:interval – 状态检查点之间的时间间隔,以毫秒为单位。 mode – 检查点模式,保证在“恰好一次”和“至少一次”之间进行选择。 */
public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
checkpointCfg.setCheckpointingMode(mode);
checkpointCfg.setCheckpointInterval(interval);
return this;
}
/**为流式作业启用检查点。流式数据流的分布式状态将被定期快照。如果发生故障,流式数据流将从最新完成的检查点重新启动。该作业在给定的时间间隔内定期绘制检查点。状态将存储在配置的状态后端中。
注意:目前不正确支持检查点迭代流数据流。如果“force”参数设置为true,系统仍然会执行作业。
已弃用
请改用enableCheckpointing(long, CheckpointingMode) 。未来将删除强制检查点。
参数:interval – 状态检查点之间的时间间隔,以毫秒为单位。 mode – 检查点模式,保证在“恰好一次”和“至少一次”之间进行选择。 force - 如果为迭代作业也启用真正的检查点。*/
@Deprecated
@SuppressWarnings("deprecation")
@PublicEvolving
public StreamExecutionEnvironment enableCheckpointing(
long interval, CheckpointingMode mode, boolean force) {
checkpointCfg.setCheckpointingMode(mode);
checkpointCfg.setCheckpointInterval(interval);
checkpointCfg.setForceCheckpointing(force);
return this;
}
EXACTLY_ONCE(默认)-- 每条数据只会被处理一次
将检查点模式设置为“恰好一次”。这种模式意味着系统将以这样一种方式检查操作员和用户功能状态,即在恢复时,每条记录都将在操作员状态中准确地反映一次。
例如,如果用户函数计算流中元素的数量,则该数字将始终等于流中实际元素的数量,而不管故障和恢复如何。
请注意,这并不意味着每条记录只流过一次流数据流。这意味着在恢复时,操作符/函数的状态将被恢复,以便恢复的数据流恰好在对状态的最后一次修改之后拾取。
请注意,此模式不保证与外部系统交互时的Exactly-once 行为(仅在Flink 的算子和用户函数中的状态)。原因是两个系统之间需要一定程度的“协作”才能实现完全一次的保证。但是,对于某些系统,可以编写连接器来促进这种协作。
这种模式维持高吞吐量。根据数据流图和操作,这种模式可能会增加记录延迟,因为操作员需要对齐他们的输入流,以便创建一致的快照点。简单数据流(无重新分区)的延迟增加可以忽略不计。对于具有重新分区的简单数据流,平均延迟仍然很小,但最慢的记录通常具有增加的延迟
AT_LEAST_ONCE -- 至少执行一次
将检查点模式设置为“至少一次”。这种模式意味着系统将以更简单的方式检查操作员和用户功能状态。在失败和恢复时,一些记录可能会多次反映在操作员状态中。
例如,如果用户函数计算流中元素的数量,则在出现故障和恢复的情况下,该数字将等于或大于流中的实际元素数量。
此模式对延迟的影响最小,并且在延迟非常低的情况下可能更可取,在这种情况下需要持续的非常低的延迟(例如几毫秒),并且偶尔重复消息(在恢复时)并不重要。
Flink 有两种 state backend 的实现 ----
一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,
另一种基于堆的 state backend,将其工作状态保存在 Java 的堆内存中。这种基于堆的 state backend 有两种类型:FsStateBackend,将其状态快照持久化到分布式文件系统;MemoryStateBackend,它使用 JobManager 的堆保存状态快照。
当流处理应用程序发生错误的时候,结果可能会产生丢失或者重复。Flink 根据你为应用程序和集群的配置,可以产生以下结果:
Flink 不会从快照中进行恢复(at most once)
没有任何丢失,但是你可能会得到重复冗余的结果(at least once)
没有丢失或冗余重复(exactly once)
Flink 通过回退和重新发送 source 数据流从故障中恢复,当理想情况被描述为精确一次时,这并不意味着每个事件都将被精确一次处理。相反,这意味着 每一个事件都会影响 Flink 管理的状态精确一次。
Barrier 只有在需要提供精确一次的语义保证时需要进行对齐(Barrier alignment)。如果不需要这种语义,可以通过配置 CheckpointingMode.AT_LEAST_ONCE 关闭 Barrier 对齐来提高性能。