• Flink Java 之 Checkpointing 状态容错


    先上代码

    package com.daidai.checkpoint;
    
    import com.daidai.source.mocksource.domain.Order;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.OutputTag;
    
    import java.time.Duration;
    import java.util.Random;
    import java.util.UUID;
    
    public class WaterMarksAllowedLatenessCheckPoint {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //开启checkpoint
            env.enableCheckpointing(5000);
            //设置checkpoint
            env.setStateBackend(new FsStateBackend("file:///E:/Code/IdeaCode/Flink1.13_Java/checkpoint"));
    
            DataStreamSource<Order> orderDataStreamSource = env.addSource(new SourceFunction<Order>() {
    
                private boolean flag = true;
    
                @Override
                public void run(SourceContext<Order> ctx) throws Exception {
                    while (flag) {
                        Random random = new Random();
                        Order order = new Order();
                        order.setId(UUID.randomUUID().toString());
                        order.setUserId(random.nextInt(3));
                        order.setCreateTime(System.currentTimeMillis() - random.nextInt(15) * 1000);
                        order.setMoney(random.nextInt(100));
    
                        ctx.collect(order);
                    }
                }
    
                @Override
                public void cancel() {
                    flag = false;
                }
            });
    
    
            OutputTag<Order> later = new OutputTag<>("later", TypeInformation.of(Order.class));
    
            SingleOutputStreamOperator<Order> timestampsAndWatermarks = orderDataStreamSource
                    .assignTimestampsAndWatermarks(
                            WatermarkStrategy
                                    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                    .withTimestampAssigner((context, timestmp) -> context.getCreateTime()));
    
            SingleOutputStreamOperator<Order> sum = timestampsAndWatermarks.keyBy(Order::getUserId)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .allowedLateness(Time.seconds(5))
                    .sideOutputLateData(later)
                    .sum("money");
    
            sum.print("正常数据");
            DataStream<Order> laterDS = sum.getSideOutput(later);
            laterDS.print("迟到严重数据");
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    开启 checkpoint

        /**为流式作业启用检查点。流式数据流的分布式状态将被定期快照。如果发生故障,流式数据流将从最新完成的检查点重新启动。此方法选择CheckpointingMode.EXACTLY_ONCE保证。
    该作业在给定的时间间隔内定期绘制检查点。状态将存储在配置的状态后端中。
    注意:目前不正确支持检查点迭代流数据流。因此,如果与启用的检查点一起使用,则不会启动迭代作业。要覆盖此机制,请使用enableCheckpointing(long, CheckpointingMode, boolean)方法。
    参数:interval – 状态检查点之间的时间间隔,以毫秒为单位。*/
        public StreamExecutionEnvironment enableCheckpointing(long interval) {
            checkpointCfg.setCheckpointInterval(interval);
            return this;
        }
    
        /**为流式作业启用检查点。流式数据流的分布式状态将被定期快照。如果发生故障,流式数据流将从最新完成的检查点重新启动。该作业在给定的时间间隔内定期绘制检查点。系统使用给定的CheckpointingMode进行检查点(“恰好一次”与“至少一次”)。状态将存储在配置的状态后端中。注意:目前不正确支持检查点迭代流数据流。因此,如果与启用的检查点一起使用,则不会启动迭代作业。要覆盖此机制,请使用enableCheckpointing(long, CheckpointingMode, boolean)方法。
    参数:interval – 状态检查点之间的时间间隔,以毫秒为单位。 mode – 检查点模式,保证在“恰好一次”和“至少一次”之间进行选择。 */
        public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
            checkpointCfg.setCheckpointingMode(mode);
            checkpointCfg.setCheckpointInterval(interval);
            return this;
        }
    
        /**为流式作业启用检查点。流式数据流的分布式状态将被定期快照。如果发生故障,流式数据流将从最新完成的检查点重新启动。该作业在给定的时间间隔内定期绘制检查点。状态将存储在配置的状态后端中。
    注意:目前不正确支持检查点迭代流数据流。如果“force”参数设置为true,系统仍然会执行作业。
    已弃用
    请改用enableCheckpointing(long, CheckpointingMode) 。未来将删除强制检查点。
    参数:interval – 状态检查点之间的时间间隔,以毫秒为单位。 mode – 检查点模式,保证在“恰好一次”和“至少一次”之间进行选择。 force - 如果为迭代作业也启用真正的检查点。*/
        @Deprecated
        @SuppressWarnings("deprecation")
        @PublicEvolving
        public StreamExecutionEnvironment enableCheckpointing(
                long interval, CheckpointingMode mode, boolean force) {
            checkpointCfg.setCheckpointingMode(mode);
            checkpointCfg.setCheckpointInterval(interval);
            checkpointCfg.setForceCheckpointing(force);
            return this;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    CheckpointingMode

    EXACTLY_ONCE(默认)-- 每条数据只会被处理一次
        将检查点模式设置为“恰好一次”。这种模式意味着系统将以这样一种方式检查操作员和用户功能状态,即在恢复时,每条记录都将在操作员状态中准确地反映一次。
        例如,如果用户函数计算流中元素的数量,则该数字将始终等于流中实际元素的数量,而不管故障和恢复如何。
        请注意,这并不意味着每条记录只流过一次流数据流。这意味着在恢复时,操作符/函数的状态将被恢复,以便恢复的数据流恰好在对状态的最后一次修改之后拾取。
    请注意,此模式不保证与外部系统交互时的Exactly-once 行为(仅在Flink 的算子和用户函数中的状态)。原因是两个系统之间需要一定程度的“协作”才能实现完全一次的保证。但是,对于某些系统,可以编写连接器来促进这种协作。
        这种模式维持高吞吐量。根据数据流图和操作,这种模式可能会增加记录延迟,因为操作员需要对齐他们的输入流,以便创建一致的快照点。简单数据流(无重新分区)的延迟增加可以忽略不计。对于具有重新分区的简单数据流,平均延迟仍然很小,但最慢的记录通常具有增加的延迟
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    AT_LEAST_ONCE  -- 至少执行一次
        将检查点模式设置为“至少一次”。这种模式意味着系统将以更简单的方式检查操作员和用户功能状态。在失败和恢复时,一些记录可能会多次反映在操作员状态中。
    例如,如果用户函数计算流中元素的数量,则在出现故障和恢复的情况下,该数字将等于或大于流中的实际元素数量。
        此模式对延迟的影响最小,并且在延迟非常低的情况下可能更可取,在这种情况下需要持续的非常低的延迟(例如几毫秒),并且偶尔重复消息(在恢复时)并不重要。
    
    • 1
    • 2
    • 3
    • 4

    State Backends

    Flink 有两种 state backend 的实现 ----
    一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,
    另一种基于堆的 state backend,将其工作状态保存在 Java 的堆内存中。这种基于堆的 state backend 有两种类型:FsStateBackend,将其状态快照持久化到分布式文件系统;MemoryStateBackend,它使用 JobManager 的堆保存状态快照。

    在这里插入图片描述

    确保精确一次(exactly once)

    当流处理应用程序发生错误的时候,结果可能会产生丢失或者重复。Flink 根据你为应用程序和集群的配置,可以产生以下结果:

    Flink 不会从快照中进行恢复(at most once)
    没有任何丢失,但是你可能会得到重复冗余的结果(at least once)
    没有丢失或冗余重复(exactly once)
    Flink 通过回退和重新发送 source 数据流从故障中恢复,当理想情况被描述为精确一次时,这并不意味着每个事件都将被精确一次处理。相反,这意味着 每一个事件都会影响 Flink 管理的状态精确一次。

    Barrier 只有在需要提供精确一次的语义保证时需要进行对齐(Barrier alignment)。如果不需要这种语义,可以通过配置 CheckpointingMode.AT_LEAST_ONCE 关闭 Barrier 对齐来提高性能。

  • 相关阅读:
    【Java面试】大厂裁员,小厂倒闭,如何搞定面试官Java SPI是什么?有什么用?
    antDesign Form表单校验(react)
    在 Mac 上通过“启动转换助理”安装 Windows 10
    数字孪生推动智慧城市建设「可视化平台解决方案」
    23软考备考已开始,网络工程师知识点速记~(3)
    特征工程设计思路
    前端开发禁用F12和右键检查元素处理
    flink系列(一)flink部署及架构简介
    python学习笔记12:小数类型的角度到度分秒的转换
    zookeeper应用场景(一)
  • 原文地址:https://blog.csdn.net/weixin_46376562/article/details/125613749