• Flink状态


    8.1 Flink中的状态

    8.1.1 概述

     

    状态的分类

    1)托管状态(Managed State)和原始状态(Raw State)

    Flink的状态有两种:托管状态(Managed State)原始状态(Raw State)托管状态就是由Flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。

    通常我们采用Flink托管状态来实现需求。

    2)算子状态(Operator State)和按键分区状态(Keyed State)

    接下来我们的重点就是托管状态(Managed State)。

    我们知道在Flink中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。

    而很多有状态的操作(比如聚合、窗口)都是要先做keyBy进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前key有效,所以状态也应该按照key彼此隔离。在这种情况下,状态的访问方式又会有所不同。

    基于这样的想法,我们又可以将托管状态分为两类:算子状态按键分区状态。

     

     

    另外,也可以通过富函数类(Rich Function)来自定义Keyed State,所以只要提供了富函数类接口的算子,也都可以使用Keyed State。所以即使是map、filter这样无状态的基本转换算子,我们也可以通过富函数类给它们“追加”Keyed State。比如RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是Keyed State。从这个角度讲,Flink中所有的算子都可以是有状态的

    无论是Keyed State还是Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。

    8.2 按键分区状态(Keyed State)

    按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以key为作用范围进行隔离。

    需要注意,使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。

    8.2.1 值状态(ValueState)

    顾名思义,状态中只保存一个“值”(value)。ValueState本身是一个接口,源码中定义如下:

    public interface ValueState extends State {

        T value() throws IOException;

        void update(T value) throws IOException;

    }

    这里的T是泛型,表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态,那么类型就是ValueState

    我们可以在代码中读写值状态,实现对于状态的访问和更新。

    1. T value():获取当前状态的值;
    2. update(T value):对状态进行更新,传入的参数value就是要覆写的状态值。

    在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState的状态描述器构造方法如下:

    public ValueStateDescriptor(String name, Class typeClass) {

        super(name, typeClass, null);

    }

    这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。

    案例需求:检测每种传感器的水位值,如果连续的两个水位值超过10,就输出报警。

    1. public class KeyedValueStateDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. SingleOutputStreamOperator sensorDS = env
    6. .socketTextStream("hadoop102", 7777)
    7. .map(new WaterSensorMapFunction())
    8. .assignTimestampsAndWatermarks(
    9. WatermarkStrategy
    10. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    11. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    12. );
    13. sensorDS.keyBy(r -> r.getId())
    14. .process(
    15. new KeyedProcessFunction() {
    16. // TODO 1.定义状态
    17. ValueState lastVcState;
    18. @Override
    19. public void open(Configuration parameters) throws Exception {
    20. super.open(parameters);
    21. // TODO 2.在open方法中,初始化状态
    22. // 状态描述器两个参数:第一个参数,起个名字,不重复;第二个参数,存储的类型
    23. lastVcState = getRuntimeContext().getState(new ValueStateDescriptor("lastVcState", Types.INT));
    24. }
    25. @Override
    26. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    27. // lastVcState.value(); // 取出 本组 值状态 的数据
    28. // lastVcState.update(); // 更新 本组 值状态 的数据
    29. // lastVcState.clear(); // 清除 本组 值状态 的数据
    30. // 1. 取出上一条数据的水位值(Integer默认值是null,判断)
    31. int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();
    32. // 2. 求差值的绝对值,判断是否超过10
    33. Integer vc = value.getVc();
    34. if (Math.abs(vc - lastVc) > 10) {
    35. out.collect("传感器=" + value.getId() + "==>当前水位值=" + vc + ",与上一条水位值=" + lastVc + ",相差超过10!!!!");
    36. }
    37. // 3. 更新状态里的水位值
    38. lastVcState.update(vc);
    39. }
    40. }
    41. )
    42. .print();
    43. env.execute();
    44. }
    45. }

     

    8.2.2 列表状态(ListState)

    将需要保存的数据,以列表(List)的形式组织起来。在ListState接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。

    1. Iterable get():获取当前的列表状态,返回的是一个可迭代类型Iterable
    2. update(List values):传入一个列表values,直接对状态进行覆盖;
    3. add(T value):在状态列表中添加一个元素value;
    4. addAll(List values):向列表中添加多个元素,以列表values形式传入。

    类似地,ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。

    案例:针对每种传感器输出最高的3个水位值

    1. public class KeyedListStateDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. SingleOutputStreamOperator sensorDS = env
    6. .socketTextStream("hadoop102", 7777)
    7. .map(new WaterSensorMapFunction())
    8. .assignTimestampsAndWatermarks(
    9. WatermarkStrategy
    10. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    11. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    12. );
    13. sensorDS.keyBy(r -> r.getId())
    14. .process(
    15. new KeyedProcessFunction() {
    16. ListState vcListState;
    17. @Override
    18. public void open(Configuration parameters) throws Exception {
    19. super.open(parameters);
    20. vcListState = getRuntimeContext().getListState(new ListStateDescriptor("vcListState", Types.INT));
    21. }
    22. @Override
    23. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    24. // 1.来一条,存到list状态里
    25. vcListState.add(value.getVc());
    26. // 2.从list状态拿出来(Iterable), 拷贝到一个List中,排序, 只留3个最大的
    27. Iterable vcListIt = vcListState.get();
    28. // 2.1 拷贝到List中
    29. List vcList = new ArrayList<>();
    30. for (Integer vc : vcListIt) {
    31. vcList.add(vc);
    32. }
    33. // 2.2 对List进行降序排序
    34. vcList.sort((o1, o2) -> o2 - o1);
    35. // 2.3 只保留最大的3个(list中的个数一定是连续变大,一超过3就立即清理即可)
    36. if (vcList.size() > 3) {
    37. // 将最后一个元素清除(第4个)
    38. vcList.remove(3);
    39. }
    40. out.collect("传感器id为" + value.getId() + ",最大的3个水位值=" + vcList.toString());
    41. // 3.更新list状态
    42. vcListState.update(vcList);
    43. // vcListState.get(); //取出 list状态 本组的数据,是一个Iterable
    44. // vcListState.add(); // 向 list状态 本组 添加一个元素
    45. // vcListState.addAll(); // 向 list状态 本组 添加多个元素
    46. // vcListState.update(); // 更新 list状态 本组数据(覆盖)
    47. // vcListState.clear(); // 清空List状态 本组数据
    48. }
    49. }
    50. )
    51. .print();
    52. env.execute();
    53. }
    54. }

    8.2.3 Map状态(MapState)

    把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。对应的MapState接口中,就会有UK、UV两个泛型,分别表示保存的key和value的类型。同样,MapState提供了操作映射状态的方法,与Map的使用非常类似。

    1. UV get(UK key):传入一个key作为参数,查询对应的value值;
    2. put(UK key, UV value):传入一个键值对,更新key对应的value值;
    3. putAll(Map map):将传入的映射map中所有的键值对,全部添加到映射状态中;
    4. remove(UK key):将指定key对应的键值对删除;
    5. boolean contains(UK key):判断是否存在指定的key,返回一个boolean值。

    另外,MapState也提供了获取整个映射相关信息的方法;

    1. Iterable> entries():获取映射状态中所有的键值对;
    2. Iterable keys():获取映射状态中所有的键(key),返回一个可迭代Iterable类型;
    3. Iterable values():获取映射状态中所有的值(value),返回一个可迭代Iterable类型;
    4. boolean isEmpty():判断映射是否为空,返回一个boolean值。

    案例需求:统计每种传感器每种水位值出现的次数。

    1. public class KeyedMapStateDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. SingleOutputStreamOperator sensorDS = env
    6. .socketTextStream("hadoop102", 7777)
    7. .map(new WaterSensorMapFunction())
    8. .assignTimestampsAndWatermarks(
    9. WatermarkStrategy
    10. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    11. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    12. );
    13. sensorDS.keyBy(r -> r.getId())
    14. .process(
    15. new KeyedProcessFunction() {
    16. MapState vcCountMapState;
    17. @Override
    18. public void open(Configuration parameters) throws Exception {
    19. super.open(parameters);
    20. vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor("vcCountMapState", Types.INT, Types.INT));
    21. }
    22. @Override
    23. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    24. // 1.判断是否存在vc对应的key
    25. Integer vc = value.getVc();
    26. if (vcCountMapState.contains(vc)) {
    27. // 1.1 如果包含这个vc的key,直接对value+1
    28. Integer count = vcCountMapState.get(vc);
    29. vcCountMapState.put(vc, ++count);
    30. } else {
    31. // 1.2 如果不包含这个vc的key,初始化put进去
    32. vcCountMapState.put(vc, 1);
    33. }
    34. // 2.遍历Map状态,输出每个k-v的值
    35. StringBuilder outStr = new StringBuilder();
    36. outStr.append("======================================\n");
    37. outStr.append("传感器id为" + value.getId() + "\n");
    38. for (Map.Entry vcCount : vcCountMapState.entries()) {
    39. outStr.append(vcCount.toString() + "\n");
    40. }
    41. outStr.append("======================================\n");
    42. out.collect(outStr.toString());
    43. // vcCountMapState.get(); // 对本组的Map状态,根据key,获取value
    44. // vcCountMapState.contains(); // 对本组的Map状态,判断key是否存在
    45. // vcCountMapState.put(, ); // 对本组的Map状态,添加一个 键值对
    46. // vcCountMapState.putAll(); // 对本组的Map状态,添加多个 键值对
    47. // vcCountMapState.entries(); // 对本组的Map状态,获取所有键值对
    48. // vcCountMapState.keys(); // 对本组的Map状态,获取所有键
    49. // vcCountMapState.values(); // 对本组的Map状态,获取所有值
    50. // vcCountMapState.remove(); // 对本组的Map状态,根据指定key,移除键值对
    51. // vcCountMapState.isEmpty(); // 对本组的Map状态,判断是否为空
    52. // vcCountMapState.iterator(); // 对本组的Map状态,获取迭代器
    53. // vcCountMapState.clear(); // 对本组的Map状态,清空
    54. }
    55. }
    56. )
    57. .print();
    58. env.execute();
    59. }
    60. }

    8.2.4 归约状态(ReducingState)

    类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。ReducingState这个接口调用的方法类似于ListState,只不过它保存的只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。

    归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍reduce聚合算子时讲到的ReduceFunction,所以状态类型跟输入的数据类型是一样的。

    public ReducingStateDescriptor(

        String name, ReduceFunction reduceFunction, Class typeClass) {...}

    这里的描述器有三个参数,其中第二个参数就是定义了归约聚合逻辑的ReduceFunction,另外两个参数则是状态的名称和类型。

    案例:计算每种传感器的水位和

    1. package com.atguigu.state;
    2. import com.atguigu.bean.WaterSensor;
    3. import com.atguigu.functions.WaterSensorMapFunction;
    4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    5. import org.apache.flink.api.common.functions.ReduceFunction;
    6. import org.apache.flink.api.common.state.MapState;
    7. import org.apache.flink.api.common.state.MapStateDescriptor;
    8. import org.apache.flink.api.common.state.ReducingState;
    9. import org.apache.flink.api.common.state.ReducingStateDescriptor;
    10. import org.apache.flink.api.common.typeinfo.Types;
    11. import org.apache.flink.configuration.Configuration;
    12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    14. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    15. import org.apache.flink.util.Collector;
    16. import java.time.Duration;
    17. import java.util.Map;
    18. /**
    19. * TODO 计算每种传感器的水位和
    20. *
    21. * @author cjp
    22. * @version 1.0
    23. */
    24. public class KeyedReducingStateDemo {
    25. public static void main(String[] args) throws Exception {
    26. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    27. env.setParallelism(1);
    28. SingleOutputStreamOperator sensorDS = env
    29. .socketTextStream("hadoop102", 7777)
    30. .map(new WaterSensorMapFunction())
    31. .assignTimestampsAndWatermarks(
    32. WatermarkStrategy
    33. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    34. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    35. );
    36. sensorDS.keyBy(r -> r.getId())
    37. .process(
    38. new KeyedProcessFunction() {
    39. ReducingState vcSumReducingState;
    40. @Override
    41. public void open(Configuration parameters) throws Exception {
    42. super.open(parameters);
    43. vcSumReducingState = getRuntimeContext()
    44. .getReducingState(
    45. new ReducingStateDescriptor(
    46. "vcSumReducingState",
    47. new ReduceFunction() {
    48. @Override
    49. public Integer reduce(Integer value1, Integer value2) throws Exception {
    50. return value1 + value2;
    51. }
    52. },
    53. Types.INT
    54. )
    55. );
    56. }
    57. @Override
    58. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    59. // 来一条数据,添加到 reducing状态里
    60. vcSumReducingState.add(value.getVc());
    61. Integer vcSum = vcSumReducingState.get();
    62. out.collect("传感器id为" + value.getId() + ",水位值总和=" + vcSum);
    63. // vcSumReducingState.get(); // 对本组的Reducing状态,获取结果
    64. // vcSumReducingState.add(); // 对本组的Reducing状态,添加数据
    65. // vcSumReducingState.clear(); // 对本组的Reducing状态,清空数据
    66. }
    67. }
    68. )
    69. .print();
    70. env.execute();
    71. }
    72. }

    8.2.5 聚合状态(AggregatingState)

    与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。

    同样地,AggregatingState接口调用方法也与ReducingState相同,调用.add()方法添加元素时,会直接使用指定的AggregateFunction进行聚合并更新状态。

    案例需求:计算每种传感器的平均水位

    1. public class KeyedAggregatingStateDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. SingleOutputStreamOperator sensorDS = env
    6. .socketTextStream("hadoop102", 7777)
    7. .map(new WaterSensorMapFunction())
    8. .assignTimestampsAndWatermarks(
    9. WatermarkStrategy
    10. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    11. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    12. );
    13. sensorDS.keyBy(r -> r.getId())
    14. .process(
    15. new KeyedProcessFunction() {
    16. AggregatingState vcAvgAggregatingState;
    17. @Override
    18. public void open(Configuration parameters) throws Exception {
    19. super.open(parameters);
    20. vcAvgAggregatingState = getRuntimeContext()
    21. .getAggregatingState(
    22. new AggregatingStateDescriptor, Double>(
    23. "vcAvgAggregatingState",
    24. new AggregateFunction, Double>() {
    25. @Override
    26. public Tuple2 createAccumulator() {
    27. return Tuple2.of(0, 0);
    28. }
    29. @Override
    30. public Tuple2 add(Integer value, Tuple2 accumulator) {
    31. return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
    32. }
    33. @Override
    34. public Double getResult(Tuple2 accumulator) {
    35. return accumulator.f0 * 1D / accumulator.f1;
    36. }
    37. @Override
    38. public Tuple2 merge(Tuple2 a, Tuple2 b) {
    39. // return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
    40. return null;
    41. }
    42. },
    43. Types.TUPLE(Types.INT, Types.INT))
    44. );
    45. }
    46. @Override
    47. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    48. // 将 水位值 添加到 聚合状态中
    49. vcAvgAggregatingState.add(value.getVc());
    50. // 从 聚合状态中 获取结果
    51. Double vcAvg = vcAvgAggregatingState.get();
    52. out.collect("传感器id为" + value.getId() + ",平均水位值=" + vcAvg);
    53. // vcAvgAggregatingState.get(); // 对 本组的聚合状态 获取结果
    54. // vcAvgAggregatingState.add(); // 对 本组的聚合状态 添加数据,会自动进行聚合
    55. // vcAvgAggregatingState.clear(); // 对 本组的聚合状态 清空数据
    56. }
    57. }
    58. )
    59. .print();
    60. env.execute();
    61. }
    62. }

     

    8.2.6 状态生存时间(TTL)

    在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。

    具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。

    配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。

    StateTtlConfig ttlConfig = StateTtlConfig

        .newBuilder(Time.seconds(10))

        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

        .build();

    ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("my state", String.class);

    stateDescriptor.enableTimeToLive(ttlConfig);

    这里用到了几个配置项:

    1. .newBuilder()

    状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数,这就是设定的状态生存时间

    1. .setUpdateType()

    设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为OnCreateAndWrite。

    1. .setStateVisibility()

    设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。

    除此之外,TTL配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对RocksDB状态后端使用压缩过滤器(compaction filter)进行后台清理。这里需要注意,目前的TTL设置只支持处理时间

    1. public class StateTTLDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. SingleOutputStreamOperator sensorDS = env
    6. .socketTextStream("hadoop102", 7777)
    7. .map(new WaterSensorMapFunction())
    8. .assignTimestampsAndWatermarks(
    9. WatermarkStrategy
    10. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    11. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    12. );
    13. sensorDS.keyBy(r -> r.getId())
    14. .process(
    15. new KeyedProcessFunction() {
    16. ValueState lastVcState;
    17. @Override
    18. public void open(Configuration parameters) throws Exception {
    19. super.open(parameters);
    20. // TODO 1.创建 StateTtlConfig
    21. StateTtlConfig stateTtlConfig = StateTtlConfig
    22. .newBuilder(Time.seconds(5)) // 过期时间5s
    23. // .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入(更新) 更新 过期时间
    24. .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入(更新) 更新 过期时间
    25. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值
    26. .build();
    27. // TODO 2.状态描述器 启用 TTL
    28. ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);
    29. stateDescriptor.enableTimeToLive(stateTtlConfig);
    30. this.lastVcState = getRuntimeContext().getState(stateDescriptor);
    31. }
    32. @Override
    33. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    34. // 先获取状态值,打印 ==》 读取状态
    35. Integer lastVc = lastVcState.value();
    36. out.collect("key=" + value.getId() + ",状态值=" + lastVc);
    37. // 如果水位大于10,更新状态值 ===》 写入状态
    38. if (value.getVc() > 10) {
    39. lastVcState.update(value.getVc());
    40. }
    41. }
    42. }
    43. )
    44. .print();
    45. env.execute();
    46. }
    47. }

    算子状态(Operator State)

    算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State。

    算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。比如Flink的Kafka连接器中,就用到了算子状态。

    当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

    算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。

    8.3.1 列表状态(ListState)

    与Keyed State中的ListState一样,将状态表示为一组数据的列表。

    与Keyed State中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。

    当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的rebanlance数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。

    算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。

    案例实操:在map算子中计算数据的个数。

    1. public class OperatorListStateDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(2);
    5. env
    6. .socketTextStream("hadoop102", 7777)
    7. .map(new MyCountMapFunction())
    8. .print();
    9. env.execute();
    10. }
    11. // TODO 1.实现 CheckpointedFunction 接口
    12. public static class MyCountMapFunction implements MapFunction, CheckpointedFunction {
    13. private Long count = 0L;
    14. private ListState state;
    15. @Override
    16. public Long map(String value) throws Exception {
    17. return ++count;
    18. }
    19. /**
    20. * TODO 2.本地变量持久化:将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用
    21. *
    22. * @param context
    23. * @throws Exception
    24. */
    25. @Override
    26. public void snapshotState(FunctionSnapshotContext context) throws Exception {
    27. System.out.println("snapshotState...");
    28. // 2.1 清空算子状态
    29. state.clear();
    30. // 2.2 将 本地变量 添加到 算子状态 中
    31. state.add(count);
    32. }
    33. /**
    34. * TODO 3.初始化本地变量:程序启动和恢复时, 从状态中 把数据添加到 本地变量,每个子任务调用一次
    35. *
    36. * @param context
    37. * @throws Exception
    38. */
    39. @Override
    40. public void initializeState(FunctionInitializationContext context) throws Exception {
    41. System.out.println("initializeState...");
    42. // 3.1 从 上下文 初始化 算子状态
    43. state = context
    44. .getOperatorStateStore()
    45. .getListState(new ListStateDescriptor("state", Types.LONG));
    46. // 3.2 从 算子状态中 把数据 拷贝到 本地变量
    47. if (context.isRestored()) {
    48. for (Long c : state.get()) {
    49. count += c;
    50. }
    51. }
    52. }
    53. }
    54. }

    8.3.2 联合列表状态(UnionListState)

    与ListState类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。

    UnionListState的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。

    使用方式同ListState,区别在如下标红部分:

    state = context

                  .getOperatorStateStore()

                  .getUnionListState(new ListStateDescriptor("union-state", Types.LONG));

    8.3.3 广播状态(BroadcastState)

    有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。

    因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。

    案例实操:水位超过指定的阈值发送告警,阈值可以动态修改

    1. public class OperatorBroadcastStateDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(2);
    5. // 数据流
    6. SingleOutputStreamOperator sensorDS = env
    7. .socketTextStream("hadoop102", 7777)
    8. .map(new WaterSensorMapFunction());
    9. // 配置流(用来广播配置)
    10. DataStreamSource configDS = env.socketTextStream("hadoop102", 8888);
    11. // TODO 1. 将 配置流 广播
    12. MapStateDescriptor broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
    13. BroadcastStream configBS = configDS.broadcast(broadcastMapState);
    14. // TODO 2.把 数据流 和 广播后的配置流 connect
    15. BroadcastConnectedStream sensorBCS = sensorDS.connect(configBS);
    16. // TODO 3.调用 process
    17. sensorBCS
    18. .process(
    19. new BroadcastProcessFunction() {
    20. /**
    21. * 数据流的处理方法: 数据流 只能 读取 广播状态,不能修改
    22. * @param value
    23. * @param ctx
    24. * @param out
    25. * @throws Exception
    26. */
    27. @Override
    28. public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector out) throws Exception {
    29. // TODO 5.通过上下文获取广播状态,取出里面的值(只读,不能修改)
    30. ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(broadcastMapState);
    31. Integer threshold = broadcastState.get("threshold");
    32. // 判断广播状态里是否有数据,因为刚启动时,可能是数据流的第一条数据先来
    33. threshold = (threshold == null ? 0 : threshold);
    34. if (value.getVc() > threshold) {
    35. out.collect(value + ",水位超过指定的阈值:" + threshold + "!!!");
    36. }
    37. }
    38. /**
    39. * 广播后的配置流的处理方法: 只有广播流才能修改 广播状态
    40. * @param value
    41. * @param ctx
    42. * @param out
    43. * @throws Exception
    44. */
    45. @Override
    46. public void processBroadcastElement(String value, Context ctx, Collector out) throws Exception {
    47. // TODO 4. 通过上下文获取广播状态,往里面写数据
    48. BroadcastState broadcastState = ctx.getBroadcastState(broadcastMapState);
    49. broadcastState.put("threshold", Integer.valueOf(value));
    50. }
    51. }
    52. )
    53. .print();
    54. env.execute();
    55. }
    56. }

    8.4 状态后端(State Backends)

    在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责管理本地状态的存储方式和位置

    8.4.1 状态后端的分类(HashMapStateBackend/RocksDB

    状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend。

    (1)哈希表状态后端(HashMapStateBackend)

    HashMapStateBackend是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆上。普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。

    (2)内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)

    RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里

    RocksDB的状态数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。

    EmbeddedRocksDBStateBackend始终执行的是异步快照,所以不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。

    8.4.2 如何选择正确的状态后端

    HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里。

    HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。

    而RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级。

    8.4.3 状态后端的配置

    在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件flink-conf.yaml中指定的,配置的键名称为state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。

    (1)配置默认的状态后端

    在flink-conf.yaml中,可以使用state.backend来配置默认状态后端

    配置项的可能值为hashmap,这样配置的就是HashMapStateBackend;如果配置项的值是rocksdb,这样配置的就是EmbeddedRocksDBStateBackend。

    下面是一个配置HashMapStateBackend的例子:

    # 默认状态后端

    state.backend: hashmap

    # 存放检查点的文件路径

    state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints

    这里的state.checkpoints.dir配置项,定义了检查点和元数据写入的目录。

    (2)为每个作业(Per-job/Application)单独配置状态后端

    通过执行环境设置,HashMapStateBackend

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setStateBackend(new HashMapStateBackend());

    通过执行环境设置,EmbeddedRocksDBStateBackend。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setStateBackend(new EmbeddedRocksDBStateBackend());

    需要注意,如果想在IDE中使用EmbeddedRocksDBStateBackend,需要为Flink项目添加依赖:

        org.apache.flink

        flink-statebackend-rocksdb

        ${flink.version}

    而由于Flink发行版中默认就包含了RocksDB(服务器上解压的Flink),所以只要我们的代码中没有使用RocksDB的相关内容,就不需要引入这个依赖。

    1. package com.atguigu.state;
    2. import com.atguigu.bean.WaterSensor;
    3. import com.atguigu.functions.WaterSensorMapFunction;
    4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    5. import org.apache.flink.api.common.state.ValueState;
    6. import org.apache.flink.api.common.state.ValueStateDescriptor;
    7. import org.apache.flink.api.common.typeinfo.Types;
    8. import org.apache.flink.configuration.Configuration;
    9. import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
    10. import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
    11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    13. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    14. import org.apache.flink.util.Collector;
    15. import java.time.Duration;
    16. /**
    17. * TODO
    18. *
    19. * @author cjp
    20. * @version 1.0
    21. */
    22. public class StateBackendDemo {
    23. public static void main(String[] args) throws Exception {
    24. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    25. env.setParallelism(1);
    26. /**
    27. * TODO 状态后端
    28. * 1、负责管理 本地状态
    29. * 2、 hashmap
    30. * 存在 TM的 JVM的堆内存, 读写快,缺点是存不了太多(受限与TaskManager的内存)
    31. * rocksdb
    32. * 存在 TM所在节点的rocksdb数据库,存到磁盘中, 写--序列化,读--反序列化
    33. * 读写相对慢一些,可以存很大的状态
    34. *
    35. * 3、配置方式
    36. * 1)配置文件 默认值 flink-conf.yaml
    37. * 2)代码中指定
    38. * 3)提交参数指定
    39. * flink run-application -t yarn-application
    40. * -p 3
    41. * -Dstate.backend.type=rocksdb
    42. * -c 全类名
    43. * jar包
    44. */
    45. // 1. 使用 hashmap状态后端
    46. HashMapStateBackend hashMapStateBackend = new HashMapStateBackend();
    47. env.setStateBackend(hashMapStateBackend);
    48. // 2. 使用 rocksdb状态后端
    49. EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
    50. env.setStateBackend(embeddedRocksDBStateBackend);
    51. SingleOutputStreamOperator sensorDS = env
    52. .socketTextStream("hadoop102", 7777)
    53. .map(new WaterSensorMapFunction())
    54. .assignTimestampsAndWatermarks(
    55. WatermarkStrategy
    56. .forBoundedOutOfOrderness(Duration.ofSeconds(3))
    57. .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
    58. );
    59. sensorDS.keyBy(r -> r.getId())
    60. .process(
    61. new KeyedProcessFunction() {
    62. ValueState lastVcState;
    63. @Override
    64. public void open(Configuration parameters) throws Exception {
    65. super.open(parameters);
    66. lastVcState = getRuntimeContext().getState(new ValueStateDescriptor("lastVcState", Types.INT));
    67. }
    68. @Override
    69. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    70. int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();
    71. Integer vc = value.getVc();
    72. if (Math.abs(vc - lastVc) > 10) {
    73. out.collect("传感器=" + value.getId() + "==>当前水位值=" + vc + ",与上一条水位值=" + lastVc + ",相差超过10!!!!");
    74. }
    75. lastVcState.update(vc);
    76. }
    77. }
    78. )
    79. .print();
    80. env.execute();
    81. }
    82. }

  • 相关阅读:
    10 数组和指针
    嵌入式linux下sqlite3数据库操作
    多租户架构
    python机器学习——决策树
    【unity插件】UGUI的粒子效果(UI粒子)—— Particle Effect For UGUI (UI Particle)
    11_跳表(Skip List)
    NUMA架构详解
    待看12313132
    C++ 并发编程实战 第十章 并行算法函数
    【力扣每日一题】2023.10.22 做菜顺序
  • 原文地址:https://blog.csdn.net/yuzheh521/article/details/133185193