• Flink系列之Flink中State设计详解与企业案例实践



    title: Flink系列


    二、Flink State 设计详解

    Flink 官网解释:Apache Flink® — Stateful Computations over Data Streams

    前课中 WordCount 的例子,可以得知:其实我们会发现,单词出现的次数有累计的效果。如果没有状态的管理,是不会有累计的效果的,所以 Flink 里面有 state 的概念。

    需求:统计路口每个小时里面的总车流量

    路口编号小时总的车流量
    路口010-12000
    路口021-21800
    路口032-31600
    路口043-41500

    State: 假设某个路口统计到 0:30 的时候,临时总车流量 就是当前这个 路口对应的 Task 的 状态

    Window操作:每隔 10s 统计过去 20s 的总车流量

    在这里插入图片描述

    计算平均值:1 =>(1,1) ; 2 => (2,3) ; 3=>(3,6) ;…

    ​ 通用的:n => (state.count +1 , state.sum + n) 最终输出的结果: state.sum / state.count

    ​ Flink 的一个重要特性就是有状态计算:Stateful Computations over Data Streams

    ​ State 简单说,就是 Flink Job 的 Task 在运行过程中,产生的一些状态数据。这些状态数据,会辅助 Task 执行某些有状态计算,同时也涉及到 Flink Job 的重启状态恢复。所以,保存和管理每个 Task 的状态是非常重要的一种机制。这也是 Flink 有别于其他分布式计算引擎的最重要的区别。

    ​ State 需要配合检查点 Checkpoint 机制来保证 Flink 作业失败后能正确地进行错误恢复。

    ​ Flink 中的状态分为两类,Keyed State 和 Operator State 。

    • Keyed State 是和具体的 Key 相绑定的,只能在 KeyedStream 上的函数和算子中使用。
    • Opeartor State 则是和 Operator 的一个特定的并行实例相绑定的,例如 Kafka Connector 中,每一个并行的 Kafka Consumer 都在 Operator State 中维护当前 Consumer 订阅的 partiton 和 offset。

    ​ 由于 Flink 中的 keyBy 操作保证了每一个键相关联的所有消息都会送给下游算子的同一个并行实例处理,因此 Keyed State 也可以看作是 Operator State 的一种分区(partitioned)形式,每一个 key 都关联一个状态分区(state-partition)。

    ​ 从另一个角度来看,无论 Operator State 还是 Keyed State,都有两种形式,Managed State 和 Raw State。 Managed State 的数据结构由 Flink 进行托管,而Raw State 的数据结构对 Flink 是透明的。 Flink 的建议是尽量使用 Managed State,这样 Flink 可以在并行度改变等情况下重新分布状态,并且可以更好地进行内存管理。

    简单总结一下:

    state:一般指一个具体的 task/operator 的状态。State 可以被记录,在失败的情况下数据还可以恢复

    Flink 中有两种基本类型的 State:Keyed StateOperator State,他们两种都可以以两种形式存在:原始状态(raw state) 和 托管状态(managed state)。

    Keyed State:在做 keyBy 之后,每个 key 都会携带一个状态。这种状态,就是 key state 
    
    Operator State: 一个 Task 一个 State
    
    1、托管状态:由 Flink 框架管理的状态,我们通常使用的就是这种。
    
    2、原始状态:由用户自行管理状态具体的数据结构,框架在做 checkpoint 的时候,使用 byte[] 来读写状态内容,对其内部数据结构一无所知。通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的 operator 时,会使用到原始状态。但是我们工作中一般不常用,所以我们不考虑它。
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ZooKeeper: ZNode 的分类:持久类型 + 临时类型, 这两个类型的 ZNode 又可以划分成另外两种类型:带顺序编号的,不带顺序的

    Flink 的 State 类型,通过一张图来理解

    在这里插入图片描述

    关于上图的理解补充:

    1、假设 kafka 中有一个 Topic ,有 4 个分区
    
    2、Flink Application 应用程序的 Source Operator 的并行度,一般就会必然设置成 4 ,一般不会设置多,也不会设置少
    
    3、每个 Source Task 必然要去记录 当前这个 SourceTask 消费到 对应的那个 Topic Partition 的 offset
    
    	就是把所有的 Source Task 的状态 Operator State 进行持久化
    
    4、Flink + Kafka 的整合
    (1)记录 当前应用程序消费 Topic 的 每个分区的 offset
    (2)Flink 的 Applicatioin 的 Sink 操作必须输出的时候确保数据一致性(确保数据消费语义有且仅一次)
    	幂等输出
    	2PC 两阶段分布式事务提交
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    补充一个知识点:keyed state 记录的是每个 key 的状态
    Keyed State 托管状态有五种类型:

    1、ValueState 单个值(Integer, String, Tuple2, Student)
    2、ListState 多个值的(List)
    3、MapState key-value类型的值的
    4、ReducingState 聚合逻辑
    5、AggregatingState 聚合逻辑
    
    • 1
    • 2
    • 3
    • 4
    • 5

    并不是所有的计算,都是有状态的, 也有一些计算类型是,是无状态(KeyState)的: 比如大写 转 小写, 比如 ETL 。

    三、Flink Keyed State 企业案例实战

    Keyed State 主要有下面五种状态:

    ValueState

    ListState

    MapState

    ReducingState

    AggregatingState

    相关的实战案例课上直播编写代码。

    3.1 FlinkState案例之ValueState

    package com.aa.flinkjava.state;
    
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.state
     * 需求:每 N个相同的key输出的他们的平均值,例如 N=4。
     */
    public class FlinkState_01_ValueState {
        public static void main(String[] args) throws Exception {
            //1、环境准备。获取编程入口对象
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            //executionEnvironment.setParallelism(2);
    
            //2、准备一些数据
            DataStreamSource<Tuple2<Long, Long>> dataStreamSource = executionEnvironment.fromElements(
                    Tuple2.of(1L, 1L),
                    Tuple2.of(1L, 2L),
                    Tuple2.of(2L, 3L),
                    Tuple2.of(2L, 4L),
                    Tuple2.of(1L, 5L),
                    Tuple2.of(1L, 6L),
                    Tuple2.of(2L, 7L),
                    Tuple2.of(2L, 8L)
            );
    
            //3、状态编程
            /**
             * 三种 State 来计算
             * 1、 ValueState 存储单一的值,一个 Key 对应一个 ValueState
             * 2、 ListState 存储数据列表,一个 Key 对应一个 ListState
             * 3、 MapState 存储数据集合,一个 Key 对应一个 MapState
             */
            //状态编程
            SingleOutputStreamOperator<Tuple2<Long, Float>> result = dataStreamSource.keyBy(0).flatMap(new MyAverageWithValueState());
    
            //4、输出
            result.print();
    
            //5、提交执行
            executionEnvironment.execute("FlinkState_01_ValueState");
        }
    
        /**
         * RichFlatMapFunction  两个泛型,一个输入,一个输出
         */
        private static class MyAverageWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Float>> {
            //定义一个成员变量
            private ValueState<Tuple2<Long, Long>> countAndSumState;
    
            //初始化的一些方法在这个里面
            @Override
            public void open(Configuration parameters) throws Exception {
                //下面其实就固定步骤: 先声明一个 XXStateDesc
                ValueStateDescriptor<Tuple2<Long, Long>> countAndSumStateDsc = new ValueStateDescriptor<>("countAndSumState", Types.TUPLE(Types.LONG, Types.LONG));
    
                countAndSumState = getRuntimeContext().getState(countAndSumStateDsc);
            }
    
            //这个里面实现具体的业务逻辑的。
            @Override
            public void flatMap(Tuple2<Long, Long> record, Collector<Tuple2<Long, Float>> collector) throws Exception {
                //如果当前的这个key的ValueState不存在,则初始化一个。
                Tuple2<Long, Long> currentState = countAndSumState.value();
                if (currentState == null){
                    currentState = Tuple2.of(0L,0L);
                }
    
                //进行状态的处理,key出现的次数加1,key的值进行了累加
                currentState.f0 += 1;
                currentState.f1 += record.f1;
    
                //状态更新一下
                countAndSumState.update(currentState);
    
                //下面其实就是具体的业务逻辑了
                if (currentState.f0 == 4){
                    //执行计算,求平均值
                    float avg = (float)currentState.f1 / currentState.f0;
                    //输出出去
                    collector.collect(Tuple2.of(record.f0,avg));
                    //清空状态
                    countAndSumState.clear();
                }
    
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101

    3.2 FlinkState案例之ListState

    package com.aa.flinkjava.state;
    
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.Collections;
    
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.state
     */
    public class FlinkState_02_ListState {
        public static void main(String[] args) throws Exception {
            //1、环境准备。获取编程入口对象
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            //executionEnvironment.setParallelism(2);
    
            //2、准备一些数据
            DataStreamSource<Tuple2<Long, Long>> dataStreamSource = executionEnvironment.fromElements(
                    Tuple2.of(1L, 1L),
                    Tuple2.of(1L, 2L),
                    Tuple2.of(2L, 3L),
                    Tuple2.of(2L, 4L),
                    Tuple2.of(1L, 5L),
                    Tuple2.of(1L, 6L),
                    Tuple2.of(2L, 7L),
                    Tuple2.of(2L, 8L)
            );
    
            //3、状态编程
            /**
             * 三种 State 来计算
             * 1、 ValueState 存储单一的值,一个 Key 对应一个 ValueState
             * 2、 ListState 存储数据列表,一个 Key 对应一个 ListState
             * 3、 MapState 存储数据集合,一个 Key 对应一个 MapState
             */
            //状态编程
            SingleOutputStreamOperator<Tuple2<Long, Float>> result = dataStreamSource.keyBy(0).flatMap(
                    new MyAverageWithListState());
    
            //4、输出
            result.print();
    
            //5、提交执行
            executionEnvironment.execute("FlinkState_02_ListState");
        }
    
        /**
         * RichFlatMapFunction  两个泛型,一个输入,一个输出
         */
        private static class MyAverageWithListState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Float>> {
            //定义一个成员变量
            private ListState<Tuple2<Long, Long>> listState;
    
            //初始化的一些方法在这个里面
            @Override
            public void open(Configuration parameters) throws Exception {
                //下面其实就固定步骤: 先声明一个 XXStateDescr
                ListStateDescriptor<Tuple2<Long, Long>> listStateDsc = new ListStateDescriptor<>(
                        "list_State", Types.TUPLE(Types.LONG, Types.LONG));
    
                listState = getRuntimeContext().getListState(listStateDsc);
            }
    
            //这个里面实现具体的业务逻辑的。
            @Override
            public void flatMap(Tuple2<Long, Long> record, Collector<Tuple2<Long, Float>> collector) throws Exception {
                //如果当前的这个key的listState不存在,则初始化一个。
                Iterable<Tuple2<Long, Long>> dataIterator = listState.get();
                if (dataIterator == null){
                    listState.addAll(Collections.EMPTY_LIST);
                }
    
                //将接受到数据放到listState中,然后再去做判断。
                listState.add(record);
    
                //获取数据集的状态
                ArrayList<Tuple2<Long, Long>> dataList = Lists.newArrayList(listState.get());
    
                //下面其实就是具体的业务逻辑了,也就是判断是否达到统计的条件
                if (dataList.size() == 4){
                    //执行计算,求平均值
                    //先统计一个和
                    long total = 0;
                    for (Tuple2<Long, Long> data : dataList) {
                        total += data.f1;
                    }
                    float avg = (float) total / 4 ;
    
                    //输出出去
                    collector.collect(Tuple2.of(record.f0,avg));
                    //清空状态
                    listState.clear();
                }
    
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113

    3.3 FlinkState案例之MapState

    package com.aa.flinkjava.state;
    
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.UUID;
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.state
     * MapState 案例
     */
    public class FlinkState_03_MapState {
        public static void main(String[] args) throws Exception {
            //1、环境准备。获取编程入口对象
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            //executionEnvironment.setParallelism(2);
    
            //2、准备一些数据
            DataStreamSource<Tuple2<Long, Long>> dataStreamSource = executionEnvironment.fromElements(
                    Tuple2.of(1L, 1L),
                    Tuple2.of(1L, 2L),
                    Tuple2.of(2L, 3L),
                    Tuple2.of(2L, 4L),
                    Tuple2.of(1L, 5L),
                    Tuple2.of(1L, 6L),
                    Tuple2.of(2L, 7L),
                    Tuple2.of(2L, 8L)
            );
    
            //3、状态编程
            /**
             * 三种 State 来计算
             * 1、 ValueState 存储单一的值,一个 Key 对应一个 ValueState
             * 2、 ListState 存储数据列表,一个 Key 对应一个 ListState
             * 3、 MapState 存储数据集合,一个 Key 对应一个 MapState
             */
            //状态编程
            SingleOutputStreamOperator<Tuple2<Long, Float>> result = dataStreamSource.keyBy(0).flatMap(
                    new MyAverageWithMapState()
            );
    
            //4、输出
            result.print();
    
            //5、提交执行
            executionEnvironment.execute("FlinkState_03_MapState");
        }
    
        /**
         * RichFlatMapFunction  两个泛型,一个输入,一个输出
         */
        private static class MyAverageWithMapState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Float>> {
            //定义一个成员变量
            private MapState<String, Long> mapState;
    
            //初始化的一些方法在这个里面
            @Override
            public void open(Configuration parameters) throws Exception {
                //下面其实就固定步骤: 先声明一个 XXStateDescr
                MapStateDescriptor<String, Long> mapStateDse = new MapStateDescriptor<>("map_State", String.class, Long.class);
    
                mapState = getRuntimeContext().getMapState(mapStateDse);
            }
    
            //这个里面实现具体的业务逻辑的。
            @Override
            public void flatMap(Tuple2<Long, Long> record, Collector<Tuple2<Long, Float>> collector) throws Exception {
                //记录状态数据。
                mapState.put(UUID.randomUUID().toString(), record.f1);
    
                //执行判断逻辑
                Iterable<Long> values = mapState.values();
                ArrayList<Long> arrayList = Lists.newArrayList(values);
    
                //下面其实就是具体的业务逻辑了
                if (arrayList.size() == 4){
                    //执行计算,求平均值
                    //先统计一个和
                    long total = 0;
                    for (Long data : arrayList) {
                        total += data;
                    }
                    float avg = (float) total / 4 ;
    
                    //输出出去
                    collector.collect(Tuple2.of(record.f0,avg));
                    //清空状态
                    arrayList.clear();
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106

    3.4 FlinkState案例之ReducingState

    package com.aa.flinkjava.state;
    
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.state.ReducingState;
    import org.apache.flink.api.common.state.ReducingStateDescriptor;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.state
     * ReducingState 案例
     * 自定义累加求和
     */
    public class FlinkState_04_ReducingState {
        public static void main(String[] args) throws Exception {
            //1、环境准备。获取编程入口对象
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            //executionEnvironment.setParallelism(1);
    
            //2、准备一些数据
            DataStreamSource<Tuple2<Long, Long>> dataStreamSource = executionEnvironment.fromElements(
                    Tuple2.of(1L, 1L),
                    Tuple2.of(1L, 2L),
                    Tuple2.of(2L, 3L),
                    Tuple2.of(2L, 4L),
                    Tuple2.of(1L, 5L),
                    Tuple2.of(1L, 6L),
                    Tuple2.of(2L, 7L),
                    Tuple2.of(2L, 8L)
            );
    
            //3、状态编程
            SingleOutputStreamOperator<Tuple2<Long, Long>> result = dataStreamSource.keyBy(0).flatMap(
                    new MySumByReducingStateFunction()
            );
    
            //4、输出
            result.print();
    
            //5、提交执行
            executionEnvironment.execute("FlinkState_04_ReducingState");
        }
    
        /**
         * RichFlatMapFunction  两个泛型,一个输入,一个输出
         */
        private static class MySumByReducingStateFunction extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
            //定义一个成员变量
            private ReducingState<Long> reducingState;
    
            //初始化的一些方法在这个里面
            @Override
            public void open(Configuration parameters) throws Exception {
                ReducingStateDescriptor<Long> reducingStateDse = new ReducingStateDescriptor<Long>("reduce_state",
                        new ReduceFunction<Long>() {
                    //初始化ReducingState的时候,定义了一个聚合函数
                            @Override
                            public Long reduce(Long value1, Long value2) throws Exception {
                                return value1 + value2;
                            }
                        },Long.class
                );
                reducingState = getRuntimeContext().getReducingState(reducingStateDse);
            }
    
            //这个里面实现具体的业务逻辑的。
            @Override
            public void flatMap(Tuple2<Long, Long> record, Collector<Tuple2<Long, Long>> collector) throws Exception {
                //将输入合并到 ReducingState 当中
                reducingState.add(record.f1);
    
                //输出
                collector.collect(Tuple2.of(record.f0,reducingState.get()));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    3.5 FlinkState案例之AggregatingState

    package com.aa.flinkjava.state;
    
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.state.AggregatingState;
    import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    import org.apache.flink.api.common.state.ReducingState;
    import org.apache.flink.api.common.state.ReducingStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.state
     * AggregatingState案例
     * 输出每个key对应的value列表
     */
    public class FlinkState_05_AggregatingState {
        public static void main(String[] args) throws Exception {
            //1、环境准备。获取编程入口对象
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            //executionEnvironment.setParallelism(1);
    
            //2、准备一些数据
            DataStreamSource<Tuple2<Long, Long>> dataStreamSource = executionEnvironment.fromElements(
                    Tuple2.of(1L, 1L),
                    Tuple2.of(1L, 2L),
                    Tuple2.of(2L, 3L),
                    Tuple2.of(2L, 4L),
                    Tuple2.of(1L, 5L),
                    Tuple2.of(1L, 6L),
                    Tuple2.of(2L, 7L),
                    Tuple2.of(2L, 8L)
            );
    
            //3、状态编程
            SingleOutputStreamOperator<Tuple2<Long, String>> result = dataStreamSource.keyBy(0).flatMap(
                    new MySumByAggregatingStateFunction()
            );
    
            //4、输出
            result.print();
    
            //5、提交执行
            executionEnvironment.execute("FlinkState_05_AggregatingState");
        }
    
        /**
         * RichFlatMapFunction  两个泛型,一个输入,一个输出
         */
        private static class MySumByAggregatingStateFunction extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {
            //定义一个成员变量
            private AggregatingState<Long,String> aggregatingState;
    
            //初始化的一些方法在这个里面
            @Override
            public void open(Configuration parameters) throws Exception {
                AggregatingStateDescriptor<Long, String, String> aggregatingStateDse = new AggregatingStateDescriptor<>("agg_state",
                        new AggregateFunction<Long, String, String>() {
                            //创建一个初始变量
                            @Override
                            public String createAccumulator() {
                                return "valueList:";
                            }
    
                            /**
                             * 进行 value的累加
                             * 例如传输过来的数据是 1 2 3
                             * 那么 结果的
                             * valueList: 1
                             * valueList: 1 2
                             * valueList: 1 2 3
                             * @param value   传递过来的值
                             * @param accumulator  字符串拼接结果
                             * @return
                             */
                            @Override
                            public String add(Long value, String accumulator) {
                                if (accumulator.equals("valueList:")) {
                                    return accumulator + value;
                                } else {
                                    return accumulator + "," + value;
                                }
                            }
    
                            /**
                             * 获取最终的结果
                             * @param accumulator
                             * @return
                             */
                            @Override
                            public String getResult(String accumulator) {
                                return accumulator;
                            }
    
                            /**
                             * 分区合并
                             * @param a
                             * @param b
                             * @return
                             */
                            @Override
                            public String merge(String a, String b) {
                                return a + "," + b;
                            }
                        }, String.class
                );
                aggregatingState = getRuntimeContext().getAggregatingState(aggregatingStateDse);
            }
    
            //这个里面实现具体的业务逻辑的。
            @Override
            public void flatMap(Tuple2<Long, Long> record, Collector<Tuple2<Long, String>> collector) throws Exception {
                //将输入合并到 ReducingState 当中
                aggregatingState.add(record.f1);
    
                //输出
                collector.collect(Tuple2.of(record.f0,aggregatingState.get()));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128

    四、Flink Operator State 企业案例实战

    4.1 OperatorState自定义输出案例

    package com.aa.flinkjava.state;
    
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.state
     *
     * OperatorState 案例
     * 自定义输出
     */
    public class FlinkState_01_OperatorState {
        public static void main(String[] args) throws Exception {
            //1、环境准备。获取编程入口对象
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            //executionEnvironment.setParallelism(2);
    
            //2、准备一些数据
            DataStreamSource<Tuple2<String, Integer>> dataStreamSource = executionEnvironment.fromElements(
                    Tuple2.of("zhangsan", 3),
                    Tuple2.of("lisi", 4),
                    Tuple2.of("wangwu", 5),
                    Tuple2.of("zhaoliu", 6),
                    Tuple2.of("sunqi", 7),
                    Tuple2.of("zhouba", 8),
                    Tuple2.of("wujiu", 9),
                    Tuple2.of("zhengshi", 10)
            );
    
            //4、输出
            dataStreamSource.addSink(new MyPrintSink(2)).setParallelism(1);
    
            //5、提交执行
            executionEnvironment.execute("FlinkState_01_OperatorState");
        }
    
        private static class MyPrintSink implements SinkFunction<Tuple2<String,Integer>>, CheckpointedFunction{
    
            private int recordNumber;
            private List<Tuple2<String,Integer>> bufferElements;
            private ListState<Tuple2<String,Integer>> listState;
    
            public MyPrintSink(int recordNumber) {
                this.recordNumber = recordNumber;
                this.bufferElements = new ArrayList<>();
            }
    
            /**
             * 对于数据的结合,拍摄state的快照
             * @param functionSnapshotContext
             * @throws Exception
             */
            @Override
            public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
                listState.clear();
                for (Tuple2<String, Integer> element : bufferElements) {
                    listState.add(element);
                }
            }
    
            /**
             * 初始化 state
             * @param context
             * @throws Exception
             */
            @Override
            public void initializeState(FunctionInitializationContext context) throws Exception {
                ListStateDescriptor<Tuple2<String, Integer>> listStateDescriptor = new ListStateDescriptor<>("MyPrintSink",
                        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
                        })
                );
    
                //初始化 listState
                ListState<Tuple2<String, Integer>> listState = context.getOperatorStateStore().getListState(listStateDescriptor);
    
                //状态恢复
                if (context.isRestored()){
                    for (Tuple2<String, Integer> element : listState.get()) {
                        bufferElements.add(element);
                    }
                }
            }
    
            /**
             * 帮数据给添加到 bufferElements 的数据集合中
             * @param value
             * @throws Exception
             */
            @Override
            public void invoke(Tuple2<String, Integer> value) throws Exception {
                bufferElements.add(value);
    
                if (bufferElements.size() == recordNumber){
                    System.out.println("输出格式为: " + bufferElements);
                    bufferElements.clear();
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115


    声明:
            文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


    By luoyepiaoxue2014

    B站: https://space.bilibili.com/1523287361 点击打开链接
    微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

  • 相关阅读:
    [MySQL]三、MySQL字符集、字段类型和限制条件
    最新CLion + STM32 + CubeMX 开发环境搭建
    java基础运算符 之 逻辑运算符
    SpringMVC框架的详细解读
    SpringMVC 启动流程源码分析
    ES6:ES6 的内置对象扩展
    图解java.util.concurrent并发包源码系列——深入理解ConcurrentHashMap并发容器,看完薪水涨一千
    dosbox调试模式下0000:0000地址中内容被修改的原因
    基于Redis实现特殊的消息队列
    给Series、DataFrame的索引增加前缀:add_prefix()函数
  • 原文地址:https://blog.csdn.net/luoyepiaoxue2014/article/details/128079391