• 【Flink入门修炼】2-2 Flink State 状态


    • 什么是状态?状态有什么作用?
    • 如果你来设计,对于一个流式服务,如何根据不断输入的数据计算呢?
    • 又如何做故障恢复呢?

    一、为什么要管理状态#

    流计算不像批计算,数据是持续流入的,而不是一个确定的数据集。在进行计算的时候,不可能把之前已经输入的数据全都保存下来,然后再和新数据合并计算。效率低下不说,内存也扛不住。
    另外,如果程序出现故障重启,没有之前计算过的状态保存,那么也就无法再继续计算了。

    因此,就需要一个东西来记录各个算子之前已经计算过值的结果,当有新数据来的时候,直接在这个结果上计算更新。这个就是状态

    常见的流处理状态功能如下:

    • 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
    • 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
    • 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。
    • 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

    二、state 简介#

    Flink的状态是由算子的子任务来创建和管理的。一个状态更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。
    image.png

    状态的保存:
    需要考虑的问题:

    • container 异常后,状态不丢
    • 状态可能越来越大

    因此,状态不能直接放在内存中,以上两点问题都无法保证。
    需要有一个外部持久化存储方式,常见的如放到 HDFS 中。(此部分读者感兴趣可自行搜索资料探索)

    image.png

    一)Managed State 和 Raw State#

    • Managed State 是由 Flink 管理的。Flink帮忙存储、恢复和优化。
    • Raw State 是开发者自己管理的,需要自己序列化(较少用到)。

    在 Flink 中推荐用户使用Managed State管理状态数据 ,主要原因是 Managed State 能够更好地支持状态数据的重平衡以及更加完善的内存管理。

    Managed State Raw State
    状态管理方式 Flink Runtime 管理,自动存储,自动恢复,内存管理方式上优化明显 用户自己管理,需要用户自己序列化
    状态数据结构 已知的数据结构 value , list ,map flink不知道你存的是什么结构,都转换为二进制字节数据
    使用场景 大多数场景适用 需要满足特殊业务,自定义operator时使用,flink满足不了你的需求时候,使用复杂

    下文将重点介绍Managed State。

    二)Keyed State 和 Operator State#

    Managed State 又有两种类型:Keyed State 和 Operator State。

    keyed state operator state
    适用场景 只能应用在 KeyedSteam 上 可以用于所有的算子
    State 处理方式 每个 key 对应一个 state,一个 operator 处理多个 key ,会访问相应的多个 state 一个 operator 对应一个 state
    并发改变 并发改变时,state随着key在实例间迁移 并发改变时需要你选择分配方式,内置:1.均匀分配 2.所有state合并后再分发给每个实例
    访问方式 通过RuntimeContext访问,需要operator是一个richFunction 需要你实现CheckPointedFunction或ListCheckPointed接口
    支持数据结构 ValuedState, ListState, Reducing State, Aggregating State, MapState, FoldingState(1.4弃用) 只支持 listState

    Keyed State#

    简单来说,通过 keyBy 分组的就会用到 Keyed State。就是按照分组来的状态。(Keyed State 是Operator State的特例,区别在于 Keyed State 事先按照 key 对数据集进行了分区,每个 Key State 仅对应ー个Operator和 Key 的组合。)
    image.png

    Keyed State可以通过 Key Groups 进行管理,主要用于当算子并行度发生变化时,自动重新分布Keyed State数据 。分配代码如下:

    // KeyGroupRangeAssignment.java
        public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
    		return MathUtils.murmurHash(keyHash) % maxParallelism;
    	}
    

    Operator State#

    Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。
    例如 Kafka Connector 中,每一个并行的 Kafka Consumer 都在 Operator State 中维护当前 Consumer 订阅的 partiton 和 offset。
    image.png

    image.png

    在开发中,需要保存的状态也有不同的数据结构,那么 Flink 也提供了相应的类。
    如上图所示:

    • ValueState[T] 保存单一变量状态
    • MapState[K, V] 同 java map,保存 kv 型状态
    • ListState[T] 数组类型状态
    • ReducingState[T] 单一状态,将原状态和新状态合并后再更新
    • AggregatingState[IN, OUT] 同样是合并更新,只不过前后数据类型可以不一样

    四、实践#

    实现一个简单的计数窗口。
    输入数据是一个元组 Tuple2.of(1L, 3L),把元组的第一个元素当作 key(在示例中都 key 都是 “1”),第二个元素当 value。
    该函数将出现的次数以及总和存储在 ValueState 中。 一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。 请注意,我们会为每个不同的 key(元组中第一个元素)保存一个单独的值。

    public class CountWindowAverage extends RichFlatMapFunction, Tuple2> {
    
        /**
         * The ValueState handle. The first field is the count, the second field a running sum.
         */
        private transient ValueState> sum;
    
        @Override
        public void flatMap(Tuple2 input, Collector> out) throws Exception {
    
            // access the state value
            Tuple2 currentSum = sum.value();
    
            // update the count
            currentSum.f0 += 1;
    
            // add the second field of the input value
            currentSum.f1 += input.f1;
    
            // update the state
            sum.update(currentSum);
    
            // if the count reaches 2, emit the average and clear the state
            if (currentSum.f0 >= 2) {
                out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
                sum.clear();
            }
        }
    
        @Override
        public void open(Configuration config) {
            ValueStateDescriptor> descriptor =
                    new ValueStateDescriptor<>(
                            "average", // the state name
                            TypeInformation.of(new TypeHint>() {}), // type information
                            Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
            sum = getRuntimeContext().getState(descriptor);
        }
    }
    
    // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
    env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
            .keyBy(value -> value.f0)
            .flatMap(new CountWindowAverage())
            .print();
    
    // the printed output will be (1,4) and (1,5)
    

    四、小结#

    本节我们介绍了 Flink 状态,是用于流式计算中中间数据存储和故障恢复的。
    Flink 状态分为 Raw State 和 Manage State,其中 Manage State 中又包含 Keyed State 和 Operator State。最重要的是 Keyed State 要重点理解和掌握。
    在编程开发过程中,针对不同的数据结构,Flink 提供了对应的 State 类。并提供了一个 state demo 代码供学习。


    参考文章:
    七、Flink入门--状态管理_flink流式任务如何保证7*24小时运行-CSDN博客
    Flink状态管理详解:Keyed State和Operator List State深度解析
    爆肝 3 月,3w 字、15 章节详解 Flink 状态管理!(建议收藏)-腾讯云开发者社区-腾讯云(较详细)
    Flink 笔记二 Flink的State--状态原理及原理剖析_flink key state是每个key对应一个state还是每个分区对应一个state-CSDN博客(源码剖析)
    Flink 状态管理详解(State TTL、Operator state、Keyed state)-腾讯云开发者社区-腾讯云
    Flink 源码阅读笔记(10)- State 管理

  • 相关阅读:
    翻页-时钟
    Nreal for Unity SDK 发布安卓参数设置
    学习不同概率分布(二项分布、泊松分布等)概念及基础语法
    Linux之进度条
    抖音小店爆款制造指南:打造抖音爆款商品的八大技巧
    算法与诗数据结构 --- 查找 --- 线性表的查找
    Unity C# 网络学习(十)——UnityWebRequest(一)
    MySQL ——多表连接查询
    分布式调度 Elastic-job
    Halcon—3D测量算法的那点数学公式和代码实现
  • 原文地址:https://www.cnblogs.com/shuofxz/p/18057385