顾名思义,它的想法是分成两个阶段:先做“预提交”,等检查点完成之后再正式提交。
这种提交方式是真正基于事务的,它需要外部系统提供事务支持。
具体的实现步骤为:
①当第一条数据到来时,或者收到检查点的分界线时,Sink 任务都会启动一个事务。
②接下来接收到的所有数据,都通过这个事务写入外部系统;这时由于事务没有提交,所
以数据尽管写入了外部系统,但是不可用,是“预提交”的状态。
③当 Sink 任务收到 JobManager 发来检查点完成的通知时,正式提交事务,写入的结果就
真正可用了。
当中间发生故障时,当前未提交的事务就会回滚,于是所有写入外部系统的数据也就实现了撤回。这种两阶段提交(2PC
)的方式充分利用了
Flink
现有的检查点机制:分界线的到来,就标志着开始一个新事务;而收到来自 JobManager
的
checkpoint
成功的消息,就是提交事务的指令。每个结果数据的写入,依然是流式的,不再有预写日志时批处理的性能问题;最终提交时,也只需要额外发送一个确认信息。所以 2PC
协议不仅真正意义上实现了
exactly-once
,而且通过搭载 Flink
的检查点机制来实现事务,只给系统增加了很少的开销。 Flink 提供
TwoPhaseCommitSinkFunction
接口,方便我们自定义实现两阶段提交的 SinkFunction 的实现,提供了真正端到端的
exactly-once
保证。
Flink 和 Kafka 连接时的精确一次保证
整体介绍
(1)Flink 内部
Flink
内部可以通过检查点机制保证状态和处理结果的
exactly-once
语义。
(2)输入端
输入数据源端的
Kafka
可以对数据进行持久化保存,并可以重置偏移量(
offset
)。所以我们可以在 Source
任务(
FlinkKafkaConsumer
)中将当前读取的偏移量保存为算子状态,写入到检查点中;当发生故障时,从检查点中读取恢复状态,并由连接器 FlinkKafkaConsumer
向
Kafka重新提交偏移量,就可以重新消费数据、保证结果的一致性了。
(3)输出端
输出端保证
exactly-once
的最佳实现,当然就是两阶段提交(
2PC
)。作为与
Flink
天生一对的 Kafka
,自然需要用最强有力的一致性保证来证明自己。
Flink
官方实现的
Kafka
连接器中,提供了写入到
Kafka
的
FlinkKafkaProducer
,它就实现
了
TwoPhaseCommitSinkFunction
接口:
实现端到端 exactly-once 的具体过程
- 启动检查点保存: Source 任务会将检查点分界线(barrier)注入数据流。这个 barrier 可以将数据流中的数据,分为进入当前检查点的集合和进入下一个检查点的集合。\
- 算子任务对状态做快照 :分界线(barrier)会在算子间传递下去。每个算子收到 barrier 时,会将当前的状态做个快照,保存到状态后端。
- Sink 任务开启事务,进行预提交:分界线(barrier)终于传到了 Sink 任务,这时 Sink 任务会开启一个事务。接下来到来的所有数据,Sink 任务都会通过这个事务来写入 Kafka。对于 Kafka 而言,提交的数据会被标记为“未确认”(uncommitted)。这个过程就是所谓的“预提交”(pre-commit)
- 检查点保存完成,提交事务:当 Sink 任务收到确认通知后,就会正式提交之前的事务,把之前“未确认”的数据标为“已确认”,接下来就可以正常消费了。
需要的配置
(1)必须启用检查点;
(2)在 FlinkKafkaProducer 的构造函数中传入参数 Semantic.EXACTLY_ONCE;
(3)配置 Kafka 读取数据的消费者的隔离级别;
预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而 Kafka 中默认的隔离级别 isolation.level 是 read_uncommitted,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了。所以应该将隔离级别配置为 read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费数据就会有显著的延
迟。
(4)事务超时配置
Flink 的 Kafka连接器中配置的事务超时时间 transaction.timeout.ms 默认是 1小时,而Kafka集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是 15 分钟。所以在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而 Sink 任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者。