事务写入 sink 连接器需要和 Flink 的检查点机制集成,因为只有在检查点成功完成以后,事务写入 sink 连接器才会向外部系统 commit 数据。
为了简化事务性 sink 的实现,Flink 提供了两个模版用来实现自定义 sink 运算符。这两个模版都实现了 CheckpointListener 接口。CheckpointListener 接口将会从作业管理器接收到检查点完成的通知。
GenericWriteAheadSink 模版会收集检查点之前的所有的数据,并将数据存储到 sink任务的运算符状态中。状态保存到了检查点中,并在任务故障的情况下恢复。当任务接收到检查点完成的通知时,任务会将所有的数据写入到外部系统中。
TwoPhaseCommitSinkFunction 模版利用了外部系统的事务特性。对于每一个检查点,任务首先开始一个新的事务,并将接下来所有的数据都写到外部系统的当前事务上下文中去。当任务接收到检查点完成的通知时,sink 连接器将会 commit 这个事务。
GenericWriteAheadSink 使得 sink 运算符可以很方便的实现。这个运算符和 Flink 的检查点机制集成使用,目标是将每一条数据恰好一次写入到外部系统中去。需要注意的是,在发生故障的情况下,write-ahead log sink 可能会不止一次的发送相同的数据。所以GenericWriteAheadSink 无法提供完美无缺的恰好处理一次语义的一致性保证,而是仅能提供 at-least-once 这样的保证。
GenericWriteAheadSink 的原理是将接收到的所有数据都追加到有检查点分割好的预写式日志中去。每当 sink 运算符碰到检查点屏障,运算符将会开辟一个新的 section,并将接下来的所有数据都追加到新的 section 中去。WAL(预写式日志)将会保存到运算符状态中。由于 log 能被恢复,所有不会有数据丢失。
当 GenericWriteAheadSink 接收到检查点完成的通知时,将会发送对应检查点的 WAL中存储的所有数据。当所有数据发送成功,对应的检查点必须在内部提交。
检查点的提交分两步。第一步,sink 持久化检查点被提交的信息。第二步,删除 WAL中所有的数据。我们不能将 commit 信息保存在 Flink 应用程序状态中,因为状态不是持久化的,会在故障恢复时重置状态。相反,GenericWriteAheadSink 依赖于可插拔的组件在一个外部持久化存储中存储和查找提交信息。这个组件就是 CheckpointCommitter。
继承 GenericWriteAheadSink 的运算符需要提供三个构造器函数。
还有,write-ahead 运算符需要实现一个单独的方法:
boolean sendValues(Iterable
当检查点完成时,GenericWriteAheadSink 调用 sendValues() 方法来将数据写入到外部存储系统中。这个方法接收一个检查点对应的所有数据的迭代器,检查点的 ID,检查点被处理时的时间戳。当数据写入成功时,方法必须返回 true,写入失败返回 false。
GenericWriteAheadSink 无法提供完美的 exactly-once 保证。有两个故障状况会导致数据可能被发送不止一次。
Flink 提供了 TwoPhaseCommitSinkFunction 接口来简化 sink 函数的实现。这个接口保证了端到端的 exactly-once 语义。2PC sink 函数是否提供这样的一致性保证取决于我们的实现细节。这里就需要讨论一个问题:“2PC 协议是否开销太大?”
通常来讲,为了保证分布式系统的一致性,2PC 是一个非常昂贵的方法。尽管如此,在 Flink 的语境下,2PC 协议针对每一个检查点只运行一次。TwoPhaseCommitSinkFunction和 WAL sink 很相似,不同点在于前者不会将数据收集到 state 中,而是会写入到外部系统事务的上下文中。
TwoPhaseCommitSinkFunction 实现了以下协议。在 sink 任务发送出第一条数据之前,任务将在外部系统中开始一个事务,所有接下来的数据将被写入这个事务的上下文中。当作业管理器初始化检查点并将检查点屏障插入到流中的时候,2PC 协议的投票阶段开始。当运算符接收到检查点屏障,运算符将保存它的状态,当保存完成时,运算符将发送
一个 acknowledgement 信息给作业管理器。当 sink 任务接收到检查点屏障时,运算符将会持久化它的状态,并准备提交当前的事务,以及 acknowledge JobManager 中的检查点。发送给作业管理器的 acknowledgement 信息类似于 2PC 协议中的 commit 投票。sink 任务还不能提交事务,因为它还没有保证所有的任务都已经完成了它们的检查点操作。sink
任务也会为下一个检查点屏障之前的所有数据开始一个新的事务。
当作业管理器成功接收到所有任务实例发出的检查点操作成功的通知时,作业管理器将会把检查点完成的通知发送给所有感兴趣的任务。这里的通知对应于 2PC 协议的提交命令。当 sink 任务接收到通知时,它将 commit 所有处于开启状态的事务。一旦 sink 任务 acknowledge 了检查点操作,它必须能够 commit 对应的事务,即使任务发生故障。如
果 commit 失败,数据将会丢失。
外部系统需要满足的要求:
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
使用flink + kafka完成每一个窗口的热门商品。
使用生产者将数据写入kafka
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.put("bootstrap.servers","hadoop101:9092");
env
.readTextFile("E:\\develop\\MyWork\\flink2022tutorial\\src\\main\\resources\\UserBehavior.csv")
.addSink(new FlinkKafkaProducer<String>(
// topic名字
"user-behavior",
new SimpleStringSchema(),
properties
));
}
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty(