• Checkpoint机制和生产配置


    1.前提

    在将Checkpoint之前,先回顾一下flink处理数据的流程:

    在这里插入图片描述

    2. 概述

    Checkpoint机制,又叫容错机制,可以保证流式任务中,不会因为异常时等原因,造成任务异常退出。可以保证任务正常运行。
    (1)能在集群异常时,保持已计算的数据,下次恢复时能在已保存数据的基础上,继续计算(类似于快照);
    (2)避免数据丢失(通过Barrier实现)

    3.机制运行流程

    在这里插入图片描述
    解释:

    (1)主节点上的检查点协调器(CheckpointCoordinator)会周期性地发送一个个地Barrier(栅栏,前面说的 偏移量做标识),Barrier会混在数据里,随着数据流,流向source算子;

    (2)source算子在摄入数据的时候,如果碰到Barrier栅栏,不会去处理,Barrier就会让先算子去汇报当前的状态

    (3)处理完之后,Barrier就会随着数据流,流向下一个算子;

    (4)下一个算子收到Barrier,同样会停下手里的工作,也会向检查点协调器汇报当前的状态,把状态往主节点传递一份(备份,防止算子出错,状态丢失)
    (5)上一步处理完之后,Barrier又会随着数据流向下一个算子,以此类推。
    (6)等Barrier流经所有的算子之后,这一轮的快照就算制作完成

    4. 状态后端

    状态后端,StateBackend,就是Flink存储状态的介质(存储状态的地方)。Flink提供了三种状态后端的存储方式:

    • MemoryStateBackend(内存,使用HashMapStateBackend实现,生产一般不用)
    • FsStateBackend(文件系统,比如说HDFS,生产常用)
    • RocksDBStateBackend(RocksDB数据库,生产常用)
    • 同时也可以把状态外置到 Hbase和Redis,解决大状态存储问题
    MemoryStateBackend

    内存,掉电易失。不安全。基本不用。
    在这里插入图片描述
    配置如下:

    state.backend: hashmap
    # 可选,当不指定 checkpoint 路径时,默认自动使用 JobManagerCheckpointStorage
    state.checkpoint-storage: jobmanager
    
    • 1
    • 2
    • 3
    FsStateBackend

    FsStateBackend,文件系统的状态后端,就是把状态保存在文件系统中,常用来保存状态的文件系统有HDFS;
    工作中常用;
    在这里插入图片描述
    配置如下:

    state.backend: hashmap 
    state.checkpoints.dir: file:///checkpoint-dir/ 
    
    # 默认为FileSystemCheckpointStorage 
    state.checkpoint-storage: filesystem
    
    • 1
    • 2
    • 3
    • 4
    • 5
    RocksDBStateBackend

    RocksDBStateBackend,把状态保存在RocksDB数据库中。

    RocksDB,是一个小型文件系统的数据库。

    配置如下:

    state.backend: rocksdb
    state.checkpoints.dir: file:///checkpoint-dir/
    
    # Optional, Flink will automatically default to FileSystemCheckpointStorage
    # when a checkpoint directory is specified.
    state.checkpoint-storage: filesystem
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    特点:可以保持巨大的状态,且支持增量状态保存。

    5.重启策略

    5.1 重启策略概述

    Flink流式任务,需要长期运行,就算遇到一些数据异常问题等,也不能随便退出。

    Flink为了让任务能够在遇到异常退出时,能够重新启动,正常运行,Flink提出了重启策略的概念。

    5.2 Flink的重启策略

    Flink支持四种类型的重启策略:

    • none:没有重启。任务一旦遇到异常,就退出。

    • fixed-delay:固定延迟重启策略。也就是说,可以配置一个重启的次数。超过次数后,才会退出。

    • failure-rate:失败率重启策略。也就是说,任务的失败频率。超过该频率后才退出。在设定的频率之内,不会退出。

    • exponential-delay:指数延迟重启策略。也就是说,任务在失败后,下一次的延迟时间是随着指数增长的。

    5.3案例演示
    模拟异常的代码
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * Flink 代码实现流处理,进行单词统计。数据源来自于socket数据。
     * todo 演示Flink遇到异常重启。
     */
    public class RestartStrategy {
        public static void main(String[] args) throws Exception {
            //1.构建流式执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
            env.setParallelism(1);
            //2.数据输入(数据源)
            //从socket读取数据,socket = hostname + port
            DataStreamSource<String> source = env.socketTextStream("node1", 9999);
            //3.数据处理
            //3.1 使用flatMap进行扁平化处理
            SingleOutputStreamOperator<String> flatMapStream = source.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) throws Exception {
                    String[] words = value.split(" ");
                    for (String word : words) {
                        if (word.equals("evil")) {
                            //evil:恶魔,魔鬼,程序如果碰到魔鬼就退出。
                            throw new Exception("魔鬼来了,程序退出");
                        }
                        out.collect(word);
                    }
                }
            });
            //3.2 使用map进行转换,转换成(单词,1)
            SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            });
            //3.3使用keyBy进行单词分组
            KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> value) throws Exception {
                    return value.f0;
                }
            });
            //3.4 使用reduce(sum)进行聚合操作,sum:就是根据第一个元素(Integer)进行sum操作
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedStream.sum(1);
            //4.数据输出
            result.print();
            //5.启动流式任务
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    5.4Checkpoint配置

    修改flink-conf.yaml文件

    execution.checkpointing.interval: 5000
    #设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE        
    execution.checkpointing.mode: EXACTLY_ONCE
    state.backend: hashmap
    #设置checkpoint的存储方式
    state.checkpoint-storage: filesystem
    #设置checkpoint的存储位置
    state.checkpoints.dir: hdfs://node1:8020/checkpoints
    #设置savepoint的存储位置
    state.savepoints.dir: hdfs://node1:8020/checkpoints
    #设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
    execution.checkpointing.timeout: 600000
    #设置两次checkpoint之间的最小时间间隔
    execution.checkpointing.min-pause: 500
    #设置并发checkpoint的数目
    execution.checkpointing.max-concurrent-checkpoints: 1
    #开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
    state.checkpoints.num-retained: 3
    #默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动
    清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
    #ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
    #RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
    #DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
    execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
    
    #------------------------------------
    
    # 设置固定延迟策略
    restart-strategy: fixed-delay
    # 尝试重启次数
    restart-strategy.fixed-delay.attempts: 3
    # 两次连续重启的间隔时间
    restart-strategy.fixed-delay.delay: 3 s
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    fixed-delay重启策略

    提交命令:

    #1.启动HDFS
    #2.把jar包上传到Linux
    #3.配置Flink的Checkpoint和重启策略
    #4.提交任务
    	cd $FLINK_HOME
    	bin/flink run -c test.RestartStrategy /root/original-gz_flinkbase-1.0-SNAPSHOT.jar
    #5.在socket中数据单词
    nc -lk 9999
    hadoop
    hive
    flink
    evil
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    运行结果:
    在这里插入图片描述

    6.官方推荐的配置

    在这里插入图片描述

  • 相关阅读:
    行业专网对比公网,优势在哪儿?能满足什么特定要求?
    C#语言进阶(二)—事件 第二篇(.net标准事件模型)
    Python与MySQL交互
    Spring Cloud实战 |分布式系统的流量控制、熔断降级组件Sentinel如何使用
    Rust GUI 库的状态
    TS泛型的使用
    uniapp实现时间选择器
    使用WPF开发自定义用户控件,以及实现相关自定义事件的处理
    PAT 1011 World Cup Betting
    hevc vps解析
  • 原文地址:https://blog.csdn.net/weixin_43753599/article/details/138140587