GenericWriteAheadSink原理是把接收记录按照检查点进行分段,每个到来的记录都放到对应的分段中,这些分段内的记录是作为算子状态的形式存储和故障恢复的,对于每个分段内的记录列表,flink会在收到检查点完成的通知时把他们都写到外部存储中,本文对其中的检查点完成后是否对应的事务必须成功这个点进行讲解
首先开始进行checkpoint时代码如下
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
// 把检查点id先放入本地变量中
saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());
this.checkpointedState.clear();
for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
// 把本地变量中的检查点存放到算子列表状态中
this.checkpointedState.add(pendingCheckpoint);
}
}
private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {
PendingCheckpoint pendingCheckpoint =
new PendingCheckpoint(checkpointId, subtaskIdx, timestamp, handle);
// 把检查点id先放到 pendingCheckpoints本地变量中
pendingCheckpoints.add(pendingCheckpoint);
}
其实接收检查点完成的通知:
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
synchronized (pendingCheckpoints) {
Iterator<PendingCheckpoint> pendingCheckpointIt = pendingCheckpoints.iterator();
while (pendingCheckpointIt.hasNext()) {
PendingCheckpoint pendingCheckpoint = pendingCheckpointIt.next();
long pastCheckpointId = pendingCheckpoint.checkpointId;
int subtaskId = pendingCheckpoint.subtaskId;
long timestamp = pendingCheckpoint.timestamp;
StreamStateHandle streamHandle = pendingCheckpoint.stateHandle;
//把历史的+当前的还没有成功提交的检查点id对应的事务,重新调用sendValue方法并提交对应检查点的事务
if (pastCheckpointId <= checkpointId) {
try {
// 历史的或者当前的事务未提交
if (!committer.isCheckpointCommitted(subtaskId, pastCheckpointId)) {
try (FSDataInputStream in = streamHandle.openInputStream()) {
// 调用sendValue方法写数据
boolean success =
sendValues(
new ReusingMutableToRegularIteratorWrapper<>(
new InputViewIterator<>(
new DataInputViewStreamWrapper(in),
serializer),
serializer),
pastCheckpointId,
timestamp);
if (success) {
//提交对应检查点对应的事务
committer.commitCheckpoint(subtaskId, pastCheckpointId);
streamHandle.discardState();
pendingCheckpointIt.remove();
}
}
} else {
streamHandle.discardState();
pendingCheckpointIt.remove();
}
} catch (Exception e) {
// we have to break here to prevent a new (later) checkpoint
// from being committed before this one
LOG.error("Could not commit checkpoint.", e);
break;
}
}
}
}
}
注意这里需要注意的是flink的检查点成功创建后才会使用notify方法进行通知,flink没有保证一定通知,此外通知之后不论这个notify方法中发生了什么异常都不影响flink已经创建了检查点的事实。
对应到我们这个例子,你就会发现在notify方法中有需要把历史检查点已经创建成功但是对应的事务没有提交的事务重新调用一次sendValue方法和提交对应检查点的事务,也就是说不是每一次检查点都能成功的提交事务,如果事务没有提交成功,等待下一次检查点的通知即可,下一个检查点的通知会把历史的检查点重新检测一次.