• 大数据-玩转数据-Flink状态编程(上)


    一、Flink状态编程

    有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。
    SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。
    Flink的状态管理是它的优势之一。

    二、什么是状态

    在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作)。

    流式计算分为无状态计算和有状态计算两种情况。
    无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。

    在简单聚合、窗口聚合、处理函数的应用,都会有状态的身影出现。在Flink这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生故障时能正确地恢复,这就需要一套完整的管理机制来处理所有状态。

    三、为什么需要管理状态

    下面的几个场景都需要使用流处理的状态功能:
    去重: 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
    检测: 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
    聚合: 对一个时间窗口内的数据进行聚合分析,分析一个小时内水位的情况
    更新机器学习模型: 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

    四、Flink中的状态分类

    Managed State
    状态管理方式 Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩
    状态数据结构 Flink提供多种常用数据结构, 例如:ListState, MapState等
    使用场景 绝大数Flink算子。

    Raw State
    状态管理方式 用户自己管理
    状态数据结构 字节数组: byte[]
    使用场景 所有算子

    从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State一般是在已有算子和Managed State不够用时,用户自定义算子时使用。
    在我们平时的使用中Managed State已经足够我们使用。

    在这里插入图片描述
    对Managed State继续细分,它又有2种类型
    Operator State(算子状态)
    Keyed State(键控状态)

    Operator State
    适用用算子类型: 可用于所有算子: 常用于source, sink,
    例如:FlinkKafkaConsumer
    状态分配:一个算子的子任务对应一个状态
    创建和访问方式: 实现CheckpointedFunction或ListCheckpointed(已经过时)接口
    横向扩展 :并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量
    支持的数据结构: ListState,UnionListStste和BroadCastState

    Keyed State
    适用用算子类型: 只能用于用于KeyedStream上的算子
    状态分配 :一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State
    创建和访问方式:重写RichFunction, 通过里面的RuntimeContext访问w
    横向扩展 :并发改变, State随着Key在实例间迁移
    支持的数据结构:ValueState, ListState,MapState ReduceState, AggregatingState

    五、算子状态的使用

    Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。

    注意: 算子子任务之间的状态不能互相访问
    Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。

    Flink为算子状态提供三种基本数据结构:
    列表状态(List state),将状态表示为一组数据的列表

    联合列表状态(Union list state),也是将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。

    广播状态(Broadcast state)
    是一种特殊的算子状态. 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

    三种状态的实现
    在这里插入图片描述

    六、键控状态的使用

    键控状态是根据输入数据流中定义的键(key)来维护和访问的,只能用于KeyedStream(keyBy算子处理之后)。相同key的所有数据都会访问相同的状态。
    键控状态支持的数据类型
    在这里插入图片描述
    注意:
    a)所有的类型都有clear(), 清空当前key的状态
    b)这些状态对象仅用于用户与状态进行交互.
    c)状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方
    d)从状态获取的值与输入元素的key相关

    七、状态后端

    状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端。
    状态后端主要负责两件事:
    本地(taskmanager)的状态管理
    将检查点(checkpoint)状态写入远程存储

    状态后端的分类及配置
    状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适的状态后端。
    在这里插入图片描述
    MemoryStateBackend
    内存级别的状态后端(默认),
    存储方式:本地状态存储在TaskManager的内存中, checkpoint 存储在JobManager的内存中.
    特点:快速, 低延迟, 但不稳定
    使用场景: 1. 本地测试 2. 几乎无状态的作业(ETL) 3. JobManager不容易挂, 或者挂了影响不大. 4. 不推荐在生产环境下使用
    FsStateBackend
    存储方式: 本地状态在TaskManager内存, Checkpoint时, 存储在文件系统(hdfs)中
    特点: 拥有内存级别的本地访问速度, 和更好的容错保证
    使用场景: 1. 常规使用状态的作业. 例如分钟级别窗口聚合, join等 2. 需要开启HA的作业 3. 可以应用在生产环境中
    RocksDBStateBackend
    将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储)
    存储方式: 1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘) 2. Checkpoint在外部文件系统(hdfs)中.
    使用场景: 1. 超大状态的作业, 例如天级的窗口聚合 2. 需要开启HA的作业 3. 对读写状态性能要求不高的作业 4. 可以使用在生产环境

    八、案例列表状态

    package com.lyh.flink09;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.List;
    
    public class state_programe1_s {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
            env.socketTextStream("hadoop100",9999)
                    .map(new MyMapFunctin())
                    .print();
            env.execute();
    
        }
        public static class MyMapFunctin implements MapFunction<String,Long>, CheckpointedFunction {
            private Long count = 0L;
            private ListState<Long> state;
            @Override
            public Long map(String value) throws Exception {
                count++;
                return count;
            }
            // 初始化时会调用这个方法,向本地状态中填充数据. 每个子任务调用一次
            @Override
            public void initializeState(FunctionInitializationContext context) throws Exception {
                System.out.println("initialize.....");
                state = context.getOperatorStateStore()
                        .getListState(new ListStateDescriptor<Long>("state",Long.class));
    
                    for (Long c : state.get()) {
                        count += c;
                    }
                }
    
            // Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化
            @Override
            public void snapshotState(FunctionSnapshotContext context) throws Exception {
                System.out.println("snapshot.....");
                state.clear();
                state.add(count);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    九、案例广播状态

    package com.lyh.flink09;
    
    import org.apache.flink.api.common.state.BroadcastState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    import org.apache.flink.streaming.api.datastream.BroadcastStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    import org.apache.flink.util.Collector;
    
    public class state_broad1_s {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment
                    .getExecutionEnvironment()
                    .setParallelism(3);
            DataStreamSource<String> dataStream = env.socketTextStream("hadoop100", 9999);
            DataStreamSource<String> controlStream = env.socketTextStream("hadoop100", 8888);
    
    
            MapStateDescriptor<String, String> stateDescriptor = new MapStateDescriptor<>("state", String.class, String.class);
            // 广播流
            BroadcastStream<String> broadcastStream = controlStream.broadcast(stateDescriptor);
            dataStream
                    .connect(broadcastStream)
                    .process(new BroadcastProcessFunction<String, String, String>() {
                        @Override
                        public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                            // 从广播状态中取值, 不同的值做不同的业务
                            ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);
                            if ("1".equals(state.get("switch"))) {
                                out.collect("切换到1号配置....");
                            } else if ("0".equals(state.get("switch"))) {
                                out.collect("切换到0号配置....");
                            } else {
                                out.collect("切换到其他配置....");
                            }
                        }
    
                        @Override
                        public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                            BroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);
                            // 把值放入广播状态
                            state.put("switch", value);
                        }
                    })
                    .print();
    
            env.execute();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
  • 相关阅读:
    【附源码】计算机毕业设计JAVA智能社区管理系统
    java语言概述
    计算机中的逻辑运算详解(与、或、非、同或、异或)
    UML中类之间的六种主要关系
    DAY3-深度学习100例-卷积神经网络(CNN)服装图像分类
    简单网页制作代码 HTML+CSS+JavaScript香港美食(8页)
    Linux下安装Nginx
    【面试突击算法第一天】剑指offer + Leetcode Hot100
    开发框架DevExpress XAF - Entity Framework Core 8支持.NET 8性能基准
    剑指 Offer 28. 对称的二叉树
  • 原文地址:https://blog.csdn.net/s_unbo/article/details/132642473