• 尚硅谷Flink(四)处理函数


    目录

    🦍处理函数

    🐒基本处理函数 

    🐒按键分区处理函数(KeyedProcessFunction) 

    🐵定时器(Timer)和定时服务(TimerService) 

    // 1、事件时间的案例

    // 2、处理时间的案例

    // 3、获取 process的 当前watermark

    🐒侧输出流

    🦍状态管理

    🐒什么是状态

    🐒按键分区状态

    🐵值状态(ValueState) 

    🐵列表状态(ListState) 

    🐵Map 状态(MapState) 

    🐵归约状态(ReducingState) 

    🐵聚合状态(AggregatingState) 

    🐵状态生存时间(TTL)  

    🐒*算子状态

    🐒状态后端

    🐵状态后端的分类(HashMapStateBackend/RocksDB) 

    🐵如何选择正确的状态后端 

    🐵状态后端的配置 

    🦍容错机制 

    🐒检查点(Checkpoint) 

    🐵检查点的保存

    🐵恢复状态

    🐵检查点算法

    🐵配置

    🐵保存点

    🐒状态一致性 

    🐒端到端精确一次(End-To-End Exactly-Once) 

    🐵输入端保证 

    🐵输出端保证 

    🐵kafka

    🦍处理函数

    之前所介绍的流处理 API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream 进行转换的,所以可以统称为DataStream API。 
    在 Flink 更底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。 

    🐒基本处理函数 

    我们之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。跟时间相关的操,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。 

    处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用.process()方法就可以了。方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑。 

    Flink 提供了8 个不同的ProcessFunction: 
    (1)ProcessFunction 
    最基本的处理函数,基于DataStream 直接调用.process()时作为参数传入。 

    • .processElement() 
    • 用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义的。 
    • .onTimer() 
    • 这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。打个比方,注册定时器(timer)就是设了一个闹钟,到了设定时间就会响;而.onTimer()中定义的,就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”(callback)方法,通过时间的进展来触发;在事件时间语义下就是由水位线(watermark)来触发了。 
    • 注意:在 Flink 中,只有“按键分区流”KeyedStream 才支持设置定时器的操作。 

    (2)KeyedProcessFunction 
    对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用定时器,比如基于 KeyedStream。 
    (3)ProcessWindowFunction 
    开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream 调用.process()时作为参数传入。 
    (4)ProcessAllWindowFunction 
    同样是开窗之后的处理函数,基于AllWindowedStream 调用.process()时作为参数传入。 
    (5)CoProcessFunction 
    合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。 
    (6)ProcessJoinFunction 
    间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。 
    (7)BroadcastProcessFunction 
    广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。 
    (8)KeyedBroadcastProcessFunction 
    按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个KeyedStream 与广播流(BroadcastStream)做连接之后的产物。

    🐒按键分区处理函数(KeyedProcessFunction) 

    在上节中提到,只有在KeyedStream 中才支持使用TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy 分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。 

    🐵定时器(Timer)和定时服务(TimerService) 

    // 1、事件时间的案例

    1. public static void main(String[] args) throws Exception {
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. env.setParallelism(1);
    4. SingleOutputStreamOperator sensorDS = env
    5. .socketTextStream("localhost", 7777)
    6. .map(new MapFunction() {
    7. @Override
    8. public WaterSensor map(String value) throws Exception {
    9. String[] split = value.split(",");
    10. return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
    11. }
    12. })
    13. .assignTimestampsAndWatermarks(
    14. WatermarkStrategy
    15. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    16. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    17. );
    18. KeyedStream sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
    19. // TODO Process:keyed
    20. SingleOutputStreamOperator process = sensorKS.process(
    21. // key 输入类型 输出类型
    22. new KeyedProcessFunction() {
    23. /**
    24. * 来一条数据调用一次
    25. * @param value
    26. * @param ctx
    27. * @param out
    28. * @throws Exception
    29. */
    30. @Override
    31. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    32. //获取当前数据的key
    33. String currentKey = ctx.getCurrentKey();
    34. // TODO 1.定时器注册
    35. TimerService timerService = ctx.timerService();
    36. // 1、事件时间的案例
    37. Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间
    38. timerService.registerEventTimeTimer(5000L);
    39. System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");
    40. }
    41. /**
    42. * TODO 2.时间进展到定时器注册的时间,调用该方法
    43. * @param timestamp 当前时间进展,就是定时器被触发时的时间
    44. * @param ctx 上下文
    45. * @param out 采集器
    46. * @throws Exception
    47. */
    48. @Override
    49. public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
    50. super.onTimer(timestamp, ctx, out);
    51. String currentKey = ctx.getCurrentKey();
    52. System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");
    53. }
    54. }
    55. );
    56. process.print();
    57. env.execute();
    58. }

    ☢在以上代码的输出中,我定义的定时器是5s,为什么时间戳到了9秒才开始触发定时器

            在 Flink 中,定时器的触发是基于水印(Watermark)和事件时间的进展的。在你的代码中,你设置了一个事件时间的定时器,其触发时间是5秒。但触发时间是相对于事件时间的进展而言的,而不是绝对的时间点。

            定时器的触发受到水印的影响。水印用于表示事件时间的进展,以告知 Flink 什么时候认为事件已经到达了一定的事件时间。水印通常由数据源或处理算子生成,用于控制事件时间进展,以便定时器能够在合适的时间触发。

    在你的代码中,你使用了以下的水印策略:

    1. WatermarkStrategy
    2. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    3. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)

    // 2、处理时间的案例

    1. long currentTs = timerService.currentProcessingTime();
    2. timerService.registerProcessingTimeTimer(currentTs + 5000L);
    3. System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后后后后的定时器");

    // 3、获取 process的 当前watermark

    1. long currentWatermark = timerService.currentWatermark();
    2. System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);

    🐒侧输出流

    处理函数还有另外一个特有功能,就是将自定义的数据放入“侧输出流”(side output)输出。

    我们之前讲到的绝大多数转换算子,输出的都是单一流,流里的数据类型只能有一种。而侧输出流可以认为是“主流”上分叉出的“支流”,所以可以由一条流产生出多条流,而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。 

    具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文的.output()方法就可以了。 

    1. OutputTag outputTag = new OutputTag("side-output"){};
    2. SingleOutputStreamOperator longStream = stream.process(new ProcessFunction() {
    3. @Override
    4. public void processElement( Integer value, Context ctx, Collector out) throws Exception {
    5. // 转换成 Long,输出到主流中
    6. out.collect(Long.valueOf(value));
    7. // 转换成 String,输出到侧输出流中
    8. ctx.output(outputTag, "side-output: " + String.valueOf(value));
    9. }
    10. });

    🦍状态管理

    🐒什么是状态

    在Flink中,算子任务可以分为无状态和有状态两种情况。
    无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。我们之前讲到的基本转换算子,如map、filter、flatMap,计算时不依赖其他数据,就都属于无状态的算子。

    有状态的算子任务,则除当前数据之外,还需要一些其他数据来得到计算结果。这里的“其他数据”,就是所谓的状态(state)。我们之前讲到的算子中,聚合算子、窗口算子都属于有状态的算子。

    状态有两种:托管状态(Managed State)和原始状态(Raw State)。

    通常我们采用 Flink 托管状态来实现需求。 

    • 托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可;
    • 而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。 

    又可以将托管状态分为两类:算子状态和按键分区状态。

    算子状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。

    按键分区状态是根据输入流中定义的键(key )来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就keyBy之后才可以使用。(每个并行度自己维护一个状态)

    🐒按键分区状态

    🐵值状态(ValueState) 

    顾名思义,状态中只保存一个“值”(value)。

    在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState 的状态描述器构造方法如下: 

    public ValueStateDescriptor(String name, Class typeClass) { 
        super(name, typeClass, null); 

    1. public static void main(String[] args) throws Exception {
    2. StreamExecutionEnvironment env =
    3. StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. SingleOutputStreamOperator sensorDS = env
    6. .socketTextStream("hadoop102", 7777)
    7. .map(new WaterSensorMapFunction())
    8. .assignTimestampsAndWatermarks(
    9. WatermarkStrategy
    10. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    11. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    12. );
    13. sensorDS.keyBy(r -> r.getId())
    14. .process(
    15. new KeyedProcessFunction() {
    16. // TODO 1.定义状态
    17. ValueState lastVcState;
    18. @Override
    19. public void open(Configuration parameters) throws Exception {
    20. super.open(parameters);
    21. // TODO 2.在open 方法中,初始化状态
    22. // 状态描述器两个参数:第一个参数,起个名字,不重复;第二个参数,存储的类型
    23. lastVcState = getRuntimeContext()
    24. .getState(new ValueStateDescriptor("lastVcState", Types.INT));
    25. }
    26. @Override
    27. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    28. // lastVcState.value();
    29. // 取出 本组值状态 的数据
    30. // lastVcState.update(); // 更新 本组值状态 的数据
    31. // lastVcState.clear(); // 清除 本组值状态 的数据
    32. // 1. 取出上一条数据的水位值(Integer 默认值是null,判断)
    33. int lastVc = lastVcState.value() ==
    34. null ? 0 : lastVcState.value();
    35. // 2. 求差值的绝对值,判断是否超过 10
    36. Integer vc = value.getVc();
    37. if (Math.abs(vc - lastVc) > 10) {
    38. out.collect("传感器=" + value.getId() + "==>当前水位值=" + vc + ",与上一条水位值=" + lastVc + ",相差超过10!!!!");
    39. }
    40. // 3. 更新状态里的水位值
    41. lastVcState.update(vc);
    42. }
    43. }
    44. )
    45. .print();
    46. env.execute();
    47. }

    🐵列表状态(ListState) 

    将需要保存的数据,以列表(List)的形式组织起来。在 ListState接口中同样有一个类型参数T,表示列表中数据的类型。ListState 也提供了一系列的方法来操作状态,使用方式与一般的List 非常相似。

    1. public static void main(String[] args) throws Exception {
    2. StreamExecutionEnvironment env =
    3. StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. SingleOutputStreamOperator sensorDS = env
    6. .socketTextStream("hadoop102", 7777)
    7. .map(new WaterSensorMapFunction())
    8. .assignTimestampsAndWatermarks(
    9. WatermarkStrategy
    10. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    11. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    12. );
    13. sensorDS.keyBy(r -> r.getId())
    14. .process(
    15. new KeyedProcessFunction() {
    16. ListState vcListState;
    17. @Override
    18. public void open(Configuration parameters) throws Exception {
    19. super.open(parameters);
    20. vcListState =
    21. getRuntimeContext().getListState(new ListStateDescriptor("vcListState", Types.INT));
    22. }
    23. @Override
    24. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    25. // 1.来一条,存到list 状态里
    26. vcListState.add(value.getVc());
    27. // 2.从list 状态拿出来(Iterable), 拷贝到一个List 中,排序, 只留 3 个最大的
    28. Iterable vcListIt = vcListState.get();
    29. // 2.1 拷贝到List 中
    30. List vcList = new ArrayList<>();
    31. for (Integer vc : vcListIt) {
    32. vcList.add(vc);
    33. }
    34. // 2.2 对List 进行降序排序
    35. vcList.sort((o1, o2) -> o2 - o1);
    36. // 2.3 只保留最大的 3 个(list 中的个数一定是连续变大,一超过 3 就立即清理即可)
    37. if (vcList.size() > 3) {
    38. // 将最后一个元素清除(第 4 个)
    39. vcList.remove(3);
    40. }
    41. out.collect("传感器id 为" + value.getId() + ",最大的3 个水位值=" + vcList.toString());
    42. // 3.更新list 状态
    43. vcListState.update(vcList);
    44. // vcListState.get(); //取出 list 状态 本组的数据,是一个Iterable
    45. // vcListState.add(); //向 list 状态 本组 添加一个元素
    46. // vcListState.addAll(); //向 list 状态 本组 添加多个元素
    47. // vcListState.update(); //更新 list 状态 本组数据(覆盖)
    48. // vcListState.clear(); //清空List 状态 本组数据
    49. }
    50. }
    51. )
    52. .print();
    53. env.execute();
    54. }

    🐵Map 状态(MapState) 

    1. package com.atguigu.state;
    2. import com.atguigu.bean.WaterSensor;
    3. import com.atguigu.functions.WaterSensorMapFunction;
    4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    5. import org.apache.flink.api.common.state.ListState;
    6. import org.apache.flink.api.common.state.ListStateDescriptor;
    7. import org.apache.flink.api.common.state.MapState;
    8. import org.apache.flink.api.common.state.MapStateDescriptor;
    9. import org.apache.flink.api.common.typeinfo.Types;
    10. import org.apache.flink.configuration.Configuration;
    11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    13. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    14. import org.apache.flink.util.Collector;
    15. import java.time.Duration;
    16. import java.util.ArrayList;
    17. import java.util.List;
    18. import java.util.Map;
    19. /**
    20. * TODO 统计每种传感器每种水位值出现的次数
    21. *
    22. * @author cjp
    23. * @version 1.0
    24. */
    25. public class KeyedMapStateDemo {
    26. public static void main(String[] args) throws Exception {
    27. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    28. env.setParallelism(1);
    29. SingleOutputStreamOperator sensorDS = env
    30. .socketTextStream("hadoop102", 7777)
    31. .map(new WaterSensorMapFunction())
    32. .assignTimestampsAndWatermarks(
    33. WatermarkStrategy
    34. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    35. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    36. );
    37. sensorDS.keyBy(r -> r.getId())
    38. .process(
    39. new KeyedProcessFunction() {
    40. MapState vcCountMapState;
    41. @Override
    42. public void open(Configuration parameters) throws Exception {
    43. super.open(parameters);
    44. vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor("vcCountMapState", Types.INT, Types.INT));
    45. }
    46. @Override
    47. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    48. // 1.判断是否存在vc对应的key
    49. Integer vc = value.getVc();
    50. if (vcCountMapState.contains(vc)) {
    51. // 1.1 如果包含这个vc的key,直接对value+1
    52. Integer count = vcCountMapState.get(vc);
    53. vcCountMapState.put(vc, ++count);
    54. } else {
    55. // 1.2 如果不包含这个vc的key,初始化put进去
    56. vcCountMapState.put(vc, 1);
    57. }
    58. // 2.遍历Map状态,输出每个k-v的值
    59. StringBuilder outStr = new StringBuilder();
    60. outStr.append("======================================\n");
    61. outStr.append("传感器id为" + value.getId() + "\n");
    62. for (Map.Entry vcCount : vcCountMapState.entries()) {
    63. outStr.append(vcCount.toString() + "\n");
    64. }
    65. outStr.append("======================================\n");
    66. out.collect(outStr.toString());
    67. // vcCountMapState.get(); // 对本组的Map状态,根据key,获取value
    68. // vcCountMapState.contains(); // 对本组的Map状态,判断key是否存在
    69. // vcCountMapState.put(, ); // 对本组的Map状态,添加一个 键值对
    70. // vcCountMapState.putAll(); // 对本组的Map状态,添加多个 键值对
    71. // vcCountMapState.entries(); // 对本组的Map状态,获取所有键值对
    72. // vcCountMapState.keys(); // 对本组的Map状态,获取所有键
    73. // vcCountMapState.values(); // 对本组的Map状态,获取所有值
    74. // vcCountMapState.remove(); // 对本组的Map状态,根据指定key,移除键值对
    75. // vcCountMapState.isEmpty(); // 对本组的Map状态,判断是否为空
    76. // vcCountMapState.iterator(); // 对本组的Map状态,获取迭代器
    77. // vcCountMapState.clear(); // 对本组的Map状态,清空
    78. }
    79. }
    80. )
    81. .print();
    82. env.execute();
    83. }
    84. }

    🐵归约状态(ReducingState) 

    1. package com.atguigu.state;
    2. import com.atguigu.bean.WaterSensor;
    3. import com.atguigu.functions.WaterSensorMapFunction;
    4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    5. import org.apache.flink.api.common.functions.ReduceFunction;
    6. import org.apache.flink.api.common.state.MapState;
    7. import org.apache.flink.api.common.state.MapStateDescriptor;
    8. import org.apache.flink.api.common.state.ReducingState;
    9. import org.apache.flink.api.common.state.ReducingStateDescriptor;
    10. import org.apache.flink.api.common.typeinfo.Types;
    11. import org.apache.flink.configuration.Configuration;
    12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    14. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    15. import org.apache.flink.util.Collector;
    16. import java.time.Duration;
    17. import java.util.Map;
    18. /**
    19. * TODO 计算每种传感器的水位和
    20. *
    21. * @author cjp
    22. * @version 1.0
    23. */
    24. public class KeyedReducingStateDemo {
    25. public static void main(String[] args) throws Exception {
    26. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    27. env.setParallelism(1);
    28. SingleOutputStreamOperator sensorDS = env
    29. .socketTextStream("hadoop102", 7777)
    30. .map(new WaterSensorMapFunction())
    31. .assignTimestampsAndWatermarks(
    32. WatermarkStrategy
    33. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    34. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    35. );
    36. sensorDS.keyBy(r -> r.getId())
    37. .process(
    38. new KeyedProcessFunction() {
    39. ReducingState vcSumReducingState;
    40. @Override
    41. public void open(Configuration parameters) throws Exception {
    42. super.open(parameters);
    43. vcSumReducingState = getRuntimeContext()
    44. .getReducingState(
    45. new ReducingStateDescriptor(
    46. "vcSumReducingState",
    47. new ReduceFunction() {
    48. @Override
    49. public Integer reduce(Integer value1, Integer value2) throws Exception {
    50. return value1 + value2;
    51. }
    52. },
    53. Types.INT
    54. )
    55. );
    56. }
    57. @Override
    58. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    59. // 来一条数据,添加到 reducing状态里
    60. vcSumReducingState.add(value.getVc());
    61. Integer vcSum = vcSumReducingState.get();
    62. out.collect("传感器id为" + value.getId() + ",水位值总和=" + vcSum);
    63. // vcSumReducingState.get(); // 对本组的Reducing状态,获取结果
    64. // vcSumReducingState.add(); // 对本组的Reducing状态,添加数据
    65. // vcSumReducingState.clear(); // 对本组的Reducing状态,清空数据
    66. }
    67. }
    68. )
    69. .print();
    70. env.execute();
    71. }
    72. }

    🐵聚合状态(AggregatingState) 

    与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与 ReducingState 不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的 AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。 

    1. package com.atguigu.state;
    2. import com.atguigu.bean.WaterSensor;
    3. import com.atguigu.functions.WaterSensorMapFunction;
    4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    5. import org.apache.flink.api.common.functions.AggregateFunction;
    6. import org.apache.flink.api.common.functions.ReduceFunction;
    7. import org.apache.flink.api.common.state.AggregatingState;
    8. import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    9. import org.apache.flink.api.common.state.ReducingState;
    10. import org.apache.flink.api.common.state.ReducingStateDescriptor;
    11. import org.apache.flink.api.common.typeinfo.Types;
    12. import org.apache.flink.api.java.tuple.Tuple2;
    13. import org.apache.flink.configuration.Configuration;
    14. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    15. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    16. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    17. import org.apache.flink.util.Collector;
    18. import java.time.Duration;
    19. /**
    20. * TODO 计算每种传感器的平均水位
    21. *
    22. * @author cjp
    23. * @version 1.0
    24. */
    25. public class KeyedAggregatingStateDemo {
    26. public static void main(String[] args) throws Exception {
    27. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    28. env.setParallelism(1);
    29. SingleOutputStreamOperator sensorDS = env
    30. .socketTextStream("hadoop102", 7777)
    31. .map(new WaterSensorMapFunction())
    32. .assignTimestampsAndWatermarks(
    33. WatermarkStrategy
    34. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    35. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    36. );
    37. sensorDS.keyBy(r -> r.getId())
    38. .process(
    39. new KeyedProcessFunction() {
    40. AggregatingState vcAvgAggregatingState;
    41. @Override
    42. public void open(Configuration parameters) throws Exception {
    43. super.open(parameters);
    44. vcAvgAggregatingState = getRuntimeContext()
    45. .getAggregatingState(
    46. new AggregatingStateDescriptor, Double>(
    47. "vcAvgAggregatingState",
    48. new AggregateFunction, Double>() {
    49. @Override
    50. public Tuple2 createAccumulator() {
    51. return Tuple2.of(0, 0);
    52. }
    53. @Override
    54. public Tuple2 add(Integer value, Tuple2 accumulator) {
    55. return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
    56. }
    57. @Override
    58. public Double getResult(Tuple2 accumulator) {
    59. return accumulator.f0 * 1D / accumulator.f1;
    60. }
    61. @Override
    62. public Tuple2 merge(Tuple2 a, Tuple2 b) {
    63. // return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
    64. return null;
    65. }
    66. },
    67. Types.TUPLE(Types.INT, Types.INT))
    68. );
    69. }
    70. @Override
    71. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    72. // 将 水位值 添加到 聚合状态中
    73. vcAvgAggregatingState.add(value.getVc());
    74. // 从 聚合状态中 获取结果
    75. Double vcAvg = vcAvgAggregatingState.get();
    76. out.collect("传感器id为" + value.getId() + ",平均水位值=" + vcAvg);
    77. // vcAvgAggregatingState.get(); // 对 本组的聚合状态 获取结果
    78. // vcAvgAggregatingState.add(); // 对 本组的聚合状态 添加数据,会自动进行聚合
    79. // vcAvgAggregatingState.clear(); // 对 本组的聚合状态 清空数据
    80. }
    81. }
    82. )
    83. .print();
    84. env.execute();
    85. }
    86. }

    🐵状态生存时间(TTL)  

    在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”,当状态在内存中存在的时间超出这个值时,就将它清除。 

    状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。 

    StateTtlConfig ttlConfig = StateTtlConfig 
        .newBuilder(Time.seconds(10)) 
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) 
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) 
        .build(); 
     
    ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("my state",String.class); 
     
    stateDescriptor.enableTimeToLive(ttlConfig); 

    ⚫ .newBuilder() 
    状态TTL 配置的构造器方法,必须调用,返回一个Builder 之后再调用.build()方法就可以
    得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。 
    ⚫ .setUpdateType() 
    设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的 OnCreateAndWrite
    表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为OnCreateAndWrite。 
    ⚫ .setStateVisibility() 
    设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的 NeverReturnExpired 是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。 

    1. package com.atguigu.state;
    2. import com.atguigu.bean.WaterSensor;
    3. import com.atguigu.functions.WaterSensorMapFunction;
    4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    5. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    6. import org.apache.flink.api.common.state.StateTtlConfig;
    7. import org.apache.flink.api.common.state.ValueState;
    8. import org.apache.flink.api.common.state.ValueStateDescriptor;
    9. import org.apache.flink.api.common.time.Time;
    10. import org.apache.flink.api.common.typeinfo.Types;
    11. import org.apache.flink.configuration.Configuration;
    12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    14. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    15. import org.apache.flink.util.Collector;
    16. import java.time.Duration;
    17. /**
    18. * TODO
    19. *
    20. * @author cjp
    21. * @version 1.0
    22. */
    23. public class StateTTLDemo {
    24. public static void main(String[] args) throws Exception {
    25. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    26. env.setParallelism(1);
    27. SingleOutputStreamOperator sensorDS = env
    28. .socketTextStream("hadoop102", 7777)
    29. .map(new WaterSensorMapFunction())
    30. .assignTimestampsAndWatermarks(
    31. WatermarkStrategy
    32. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    33. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    34. );
    35. sensorDS.keyBy(r -> r.getId())
    36. .process(
    37. new KeyedProcessFunction() {
    38. ValueState lastVcState;
    39. @Override
    40. public void open(Configuration parameters) throws Exception {
    41. super.open(parameters);
    42. // TODO 1.创建 StateTtlConfig
    43. StateTtlConfig stateTtlConfig = StateTtlConfig
    44. .newBuilder(Time.seconds(5)) // 过期时间5s
    45. // .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入(更新) 更新 过期时间
    46. .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入(更新) 更新 过期时间
    47. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值
    48. .build();
    49. // TODO 2.状态描述器 启用 TTL
    50. ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);
    51. stateDescriptor.enableTimeToLive(stateTtlConfig);
    52. this.lastVcState = getRuntimeContext().getState(stateDescriptor);
    53. }
    54. @Override
    55. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    56. // 先获取状态值,打印 ==》 读取状态
    57. Integer lastVc = lastVcState.value();
    58. out.collect("key=" + value.getId() + ",状态值=" + lastVc);
    59. // 如果水位大于10,更新状态值 ===》 写入状态
    60. if (value.getVc() > 10) {
    61. lastVcState.update(value.getVc());
    62. }
    63. }
    64. }
    65. )
    66. .print();
    67. env.execute();
    68. }
    69. }

    🐒*算子状态

    算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State。 

    当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。 

    算 子状 态也 支持 不同的 结构 类型 ,主 要有三 种:ListState、UnionListState 和BroadcastState。 

    🐒状态后端

    在Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责管理本地状态的存储方式和位置。 

    🐵状态后端的分类(HashMapStateBackend/RocksDB) 

    Flink 中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。系统默认的状态后端是HashMapStateBackend。 

    (1)哈希表状态后端(HashMapStateBackend) 
    HashMapStateBackend是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager 的JVM 堆上。普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。 

    2)内嵌RocksDB 状态后端(EmbeddedRocksDBStateBackend) 
            RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB 默认存储在 TaskManager 的本地数据目录里。 

            RocksDB 的状态数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key 的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。 

            EmbeddedRocksDBStateBackend 始终执行的是异步快照,所以不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。 

    🐵如何选择正确的状态后端 

    HashMap 和RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里。 
            HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。 
            而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比 HashMapStateBackend 慢一个数量级。 

    🐵状态后端的配置 

            在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件flink-conf.yaml中指定的,配置的键名称为 state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。 

    (1)配置默认的状态后端 
    在 flink-conf.yaml 中,可以使用state.backend 来配置默认状态后端。 

    • 配置项的可能值为 hashmap,这样配置的就是 HashMapStateBackend;
    • 如果配置项的值是rocksdb,这样配置的就是EmbeddedRocksDBStateBackend。

    # 默认状态后端 
    state.backend: hashmap 
     
    # 存放检查点的文件路径 
    state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints 

    (2)为每个作业(Per-job/Application)单独配置状态后端 
    通过执行环境设置,HashMapStateBackend。 

    env.setStateBackend(new HashMapStateBackend()); 

    env.setStateBackend(new EmbeddedRocksDBStateBackend()); 

    需要注意,如果想在IDE 中使用EmbeddedRocksDBStateBackend,需要为Flink 项目添加
    依赖: 
     
        org.apache.flink 
        flink-statebackend-rocksdb 
        ${flink.version} 
     

    🦍容错机制 

    在 Flink 中,有一套完整的容错机制来保证故障后的恢复,其中最重要的就是检查点。

    🐒检查点(Checkpoint) 

    在流处理中,我们可以用存档读档的思路,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”(checkpoint)。

    所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把checkpoint叫做“一致性检查点

    🐵检查点的保存

    1)周期性的触发保存 
            “随时存档”确实恢复起来方便,可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,数据处理的速度就会受到影响。所以在Flink 中,检查点的保存是周期性触发的,间隔时间可以进行设置。 

    2)保存的时间点 
    我们应该在所有任务(算子)都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。 

    如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。当然这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;kafka 就是满足这些要求的一个最好的例子。 

    3)保存的具体流程 
    检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。

    🐵恢复状态

    🐵检查点算法

    采用了基于 Chandy-Lamport 算法的分布式快照,可以在不暂停整体流处理的前提下,将状态备份保存到检查点

    检查点分界线(Barrier) 

    借鉴水位线的设计,在数据流中插入一个特殊的数据结构,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source 任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。 这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫做检查点的“分界线”(Checkpoint Barrier)。 

    在JobManager中有一个“检查点协调器”,专门用来协调处理检查点的相关工作。检查点协调器会定期向TaskManager发出指令,要求保存检查点(带着检查点ID);TaskManager会让所有的Source任务把自己的偏移量(算子状态)保存起来,并将带有检查点ID的分界线插入到当前的数据流中,然后像正常的数据一样像下游传递;之后Source任务就可以继续读入新的数据了。

    分布式快照算法(Barrier 对齐的精准一次) 

    分布式快照算法(Barrier 对齐的至少一次) 

    分布式快照算法(非 Barrier 对齐的精准一次) 

    🐵配置

    检查点的作用是为了故障恢复,我们不能因为保存检查点占据了大量时间、导致数据处理性能明显降低。为了兼顾容错性和处理性能,我们可以在代码中对检查点进行各种配置

    默认情况下,Flink 程序是禁用检查点的。如果想要为 Flink 应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法:

    env.enableCheckpointing(1000);   // 每隔1 秒启动一次检查点保存 

    默认情况下,检查点存储在JobManager 的堆内存中。而对于大状态的持久化保存,Flink 也提供了在其他存储位置进行保存的接口。 

    具体可以通过调用检查点配置的.setCheckpointStorage()来配置,需要传入一个CheckpointStorage 的实现类。Flink 主要提供了两种 CheckpointStorage:作业管理器的堆内存和文件系统。

    // 配置存储检查点到 JobManager 堆内存 
    env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); 
     
    // 配置存储检查点到文件系统 
    env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoint
    s")); 

    不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。 (*目前标记为实验性功能)

    EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true); 

     如果数据源是有界的,就可能出现部分 Task 已经处理完所有数据,变成 finished 状态,不继续工作。从 Flink 1.14 开始,这些 finished 状态的Task,也可以继续执行检查点。自 1.15 起默认启用此功能,并且可以通过功能标志禁用它: 

    Configuration config = new Configuration(); 
    config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false); 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

    🐵保存点

    除了检查点外,Flink 还提供了另一个非常独特的镜像保存功能——保存点(savepoint)。 
    从名称就可以看出,这也是一个存盘的备份,它的原理和算法与检查点完全相同,只是多了一些额外的元数据

    而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。 

    需要注意的是,保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子 ID-状态名称这样的 key-value 组织起来的,算子 ID 可以在代码中直接调用 SingleOutputStreamOperator 的.uid()方法来进行指定: 

    DataStream stream = env 

        .addSource(new StatefulSource()).uid("source-id") 
        .map(new StatefulMapper()).uid("mapper-id") 
        .print(); 

    对于没有设置 ID 的算子,Flink 默认会自动进行设置,所以在重新启动应用后可能会导致 ID 不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定ID。 

    要在命令行中为运行的作业创建一个保存点镜像,只需要执行: 

    bin/flink savepoint :jobId [:targetDirectory]

    这里 jobId 需要填充要做镜像保存的作业 ID,目标路径 targetDirectory 可选,表示保存点存储的路径。 

    除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点: 
    bin/flink stop --savepointPath [:targetDirectory] :jobId 

    对于保存点的默认路径,可以通过配置文件 flink-conf.yaml 中的 state.savepoints.dir 项来设定: 

    state.savepoints.dir: hdfs:///flink/savepoints 

    现在要从保存点重启

    bin/flink run -s :savepointPath [:runArgs] 

    🐒状态一致性 

    一致性其实就是结果的正确性,一般从数据丢失、数据重复来评估。 

    一般说来,状态一致性有三种级别: 
    ⚫ 最多一次(At-Most-Once) 
    ⚫ 至少一次(At-Least-Once) 
    ⚫ 精确一次(Exactly-Once) 

    所以完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分。

    这个完整应用的一致性,就叫做“端到端(end-to-end)的状态一致性”,它取决于三个组件中最弱的那一环。

    🐒端到端精确一次(End-To-End Exactly-Once) 

    端到端一致性的关键点,就在于输入的数据源端和输出的外部存储端。 

    🐵输入端保证 

    输入端主要指的就是Flink 读取的外部数据源。对于一些数据源来说,并不提供数据的缓冲或是持久化保存,数据被消费之后就彻底不存在了,例如 socket 文本流。对于这样的数据源,故障后我们即使通过检查点恢复之前的状态,可保存检查点之后到发生故障期间的数据已不能重发了,这就会导致数据丢失。所以就只能保证at-most-once的一致性语义,相当于没有保证。 

    常见的做法就对数据进行持久化保存,并且可以重设数据的读取位置。一个最经典的应用就是 Kafka。

    🐵输出端保证 

    有了 Flink 的检查点机制,以及可重放数据的外部数据源,我们已经能做到 at-least-once了。但是想要实现 exactly-once 却有更大的困难:数据有可能重复写入外部系统。

    幂等(Idempotent)写入 
    所谓“幂等”操作,就是说一个操作可以重复执行很多次,但只导致一次结果更改。也就是说,后面再重复执行就不会对结果起作用了。 

    事务(Transactional)写入 
    如果说幂等写入对应用场景限制太多,那么事务写入可以说是更一般化的保证一致性的方式。 
    输出端最大的问题,就是写入到外部系统的数据难以撤回。而利用事务就可以实现对已写入数据的撤回。 

    (1)预写日志(write-ahead-log,WAL) 

    (2)两阶段提交(two-phase-commit,2PC) 

    🐵kafka

    也就是说,我们写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入 Kafka 时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”。

  • 相关阅读:
    前端面试题之性能优化篇
    项目实战:并发下保证接口的幂等性
    RAII技术学习
    C++ 信号处理
    2022年湖北省中国专利奖申报条件以及认定流程(附奖项设置解析)
    Java学习笔记(二十一)
    【动态规划】—— 背包问题
    线程安全问题及其解决
    JavaScript和TypeScript的特点
    篇(16)-Asp.Net Core入门实战-权限管理之用户创建与关联角色(ViewModel再用与模型验证二)
  • 原文地址:https://blog.csdn.net/qq_58551342/article/details/133858605