• 【Apache Flink】实现有状态函数


    在RuntimeContext 中声明键值分区状态

    Flink为键值分区状态(Keyed State)提供了几种不同的原语(数据类型)。这是因为不同的算法和操作可能需要管理不同类型的状态。其中一些原语包括:

    1. ValueState: 这种状态类型用于存储单个的,可能更新的值。常见的用途包括存储计数器或聚合。

    2. ListState: 这种状态用于存储一组元素(通常是元素的长列表)。借助此状态,可以简单地追加元素和迭代所有元素。

    3. ReducingStateAggregatingState: 这两种状态都用于合并元素,通常在窗口操作中使用。

      • ReducingState:将添加的元素与现有元素通过reduce函数进行合并,最后只会保留一个元素,即合并的结果。

      • AggregatingState:与ReducingState类似,但是其可以存储转换后的聚合结果,而不是输入元素。

    4. MapState: 这种状态类型存储一个key-value映射。

    要使用某一类型的 keyed state,需要提供一个 StateDescriptor,用于声明状态的名称和类型。然后可以通过 RuntimeContext 获取状态。

    这些状态类型都是接口,并将存储后端(Flink提供了内存和RocksDB两种用于存储状态的后端)的具体实现细节隔离出来,因此用户可以不用关心状态是如何存储和访问的。

    Flink 的键控状态使我们能够通过简单的API调用,就能够很自然地处理键控数据流,我们只需要关心特定键的当前事件和状态,Flink 框架会自动地处理状态的分布式存储和故障恢复等

    我们需要了解在 Flink 中,RuntimeContext 提供访问在运行期间的任务 (比如 Map、Reduce 或 Filter function) 可以访问的上下文信息,例如任务的并行度,任务名称,任务 ID,输入和输出信号等。此外,RuntimeContext 还为用户代码提供了生成和维护分布式累加器和键值状态的方法。

    在 Apache Flink 中,键值状态(Keyed State)是一种类型的状态,它是以 key 为中心的。每一个 key 都可以对应一个状态。我们可以在 Flink 算子的open()方法中通过 RuntimeContext 获取和初始化它。

    举个例子,假设我们正在构建一个实时的网络游戏分析系统,我们可能关注每位玩家的实时得分,这个得分基于他们在游戏中执行的动作(例如完成一项任务,击败一个敌人等)。在这个场景中,每个玩家的ID就是一个 "键",同时他们的游戏得分就是与键关联的 "状态"。当玩家在游戏中执行动作时,我们需要调整他们的分数状态

    然后,我们的 Flink 代码可以定义一个 RichMapFunction 来维护每个玩家的分数状态:

    public class PlayerScoreFunction extends RichMapFunction<GameEvent, Tuple2<String, Long>> {
      
      // 定义键控状态
      private transient ValueState<Long> scoreState;
    
      @Override
      public void open(Configuration params) throws Exception {
        ValueStateDescriptor<Long> descriptor = 
            new ValueStateDescriptor<>(
              "playerScore", // 状态的名称
              TypeInformation.of(new TypeHint<Long>() {}),
              0L); // 默认值
        scoreState = getRuntimeContext().getState(descriptor);
      }
    
      @Override
      public Tuple2<String, Long> map(GameEvent gameEvent) throws Exception {
        // update the state
        long currentScore = scoreState.value();
        currentScore += gameEvent.getScore();
        scoreState.update(currentScore);
        // return the updated score
        return new Tuple2<>(gameEvent.getPlayerId(), currentScore);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    在这个例子中,PlayerScoreFunction 接收 GameEvent 流,这是玩家在游戏中的各种动作生成的事件。我们将玩家的 ID 作为键来处理这个流。通过 getRuntimeContext().getState(descriptor) 我们获得了状态。然后我们在每次新的 GameEvent 到来时,根据事件中的分数增量用 scoreState.update(currentScore) 更新状态,然后将更新后的得分以及玩家的 ID 一起输出给下一个算子,例如,连接到实时的游戏分数仪表盘,将每个玩家的最新得分显示给观众看。

    通过ListCheckPonitend 接口实现算子列表状态

    算子状态(Operator State)在流处理系统(比如 Apache Flink)中,是一种特殊类型的状态,针对的是整个算子,而不是特定的键值。它存储的是某一特定算子的所有记录的全局信息。

    算子状态的维护主要包括以下步骤:

    1. 定义算子状态:首先,我们需要在处理函数中定义一个或多个算子状态。我们可以指定算子状态的名字,并定义它存储的数据类型。

    2. 读取和写入算子状态:一旦定义了算子状态,我们就可以在流处理函数中对它进行读取和写入。读取算子状态通常在需要根据状态信息做出处理决策时进行。写入算子状态通常在我们需要更新状态信息时进行。

    3. 保持状态一致:为了保持状态的一致性,我们需要定期将算子状态进行快照(Snapshot)并保存到远程存储系统中。在系统中断后,我们可以从最新的快照恢复算子状态。

    4. 状态恢复:在系统中断后,我们可以使用保存的快照恢复算子状态,恢复流处理的执行。

    维护算子状态的方法可能会根据具体的流处理系统有所不同,但基本原理是相同的。这四步是维护算子状态的基本过程。

    在 Flink 中,ListState 是 CheckpointedState 的一种。ListState 可以为每一条数据保存不止一个值,也就是说,所有的数据都会添加到该状态中。在故障恢复时,这些元素按添加的顺序重放。我们从 CheckpointedFunctionListCheckpointed 接口的抽象类型继承,然后实现 snapshotStaterestoreState 方法,以完成状态恢复。

    具体来说,如果我们想使用 ListCheckpointed 接口实现算子列表状态,可以参考以下的代码:

    我们每次接收到未序列化的 String 类型的数值,就把它转成 Integer 类型存储在一个列表(List)中。在每个 Checkpoint 操作当中,通过 snapshotState 方法进行状态的快照并返回。当故障发生后,Flink 会调用 restoreState 方法将状态恢复回来。

    如果算子是并行的,Flink 会为每一个子任务调用 restoreState 方法,并在算子的每个子任务中创建一个新的列表状态实例。在故障后进行状态恢复时,Flink 将提取快照并将其分发到每个子任务。

    public class ListStateFunction extends RichMapFunction<String, Integer> implements ListCheckpointed<Integer> {
    
      private List<Integer> bufferElements;
    
      public ListStateFunction(){
        this.bufferElements = new ArrayList<>();
      }
    
      @Override
      public Integer map(String value) throws Exception {
        int parsedValue = Integer.parseInt(value);
        bufferElements.add(parsedValue);
        return bufferElements.size();
      }
    
      // 每次 checkpoint 时,将缓存的元素进行快照
      @Override
      public List<Integer> snapshotState(long checkpointId, long timestamp)  {
        return this.bufferElements;
      }
    
      // 从存储中恢复状态
      @Override
      public void restoreState(List<Integer> state) {
        this.bufferElements.addAll(state);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    使用 ListCheckpointed 还是 CheckpointedFunction 取决于特定的需求和上下文,两者在功能上是相似的,但 CheckpointedFunction 提供了更多的灵活性,可以让你自己决定如何存储和恢复状态以及存储于哪种类型的状态后端。

    使用CheckpointedFunction接口

    Apache Flink提供了一个特殊的接口CheckpointedFunction,可以在自定义函数中使用它来操作和管理算子状态。这个接口会在检查点(checkpoint)操作时触发,允许访问和编辑操作员状态。

    h使用CheckpointedFunction的例子:

    public class CountWithCheckpoint implements CheckpointedFunction, MapFunction<Long, Long> {
    
      private transient ValueState<Long> counter;
    
      @Override
      public void initializeState(FunctionInitializationContext context) throws Exception {
        ValueStateDescriptor<Long> descriptor = 
          new ValueStateDescriptor<>("counter", TypeInformation.of(new TypeHint<Long>() {}));
        counter = getRuntimeContext().getState(descriptor);
      }
    
      @Override
      public Long map(Long value) throws Exception {
        Long currentCount = counter.value();
        Long newCount = currentCount == null ? 1L : currentCount + 1;
        counter.update(newCount);
        return newCount;
      }
    
      @Override
      public void snapshotState(FunctionSnapshotContext context) throws Exception {
        counter.clear();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    此示例创建一个计数但在每个检查点清空的函数。initializeState()方法会在各种生命周期事件(例如,开始和恢复)时调用并初始化状态变量。然后在map()方法中,状态被更新。snapshotState()在checkpoint操作时触发,这里我们仅清空状态,无任何持久化操作。

    在操作和维护算子状态时,我们需要考虑状态的一致性和恢复,以处理可能的故障和中断。实际中可能会对snapshotState()方法更复杂的逻辑,比如将状态存储至远端。

    接收检查点完成通知

    在Apache Flink中,当所有任务成功从接头位置创建检查点后,作业管理器将坐标控制条以通知所有任务检查点的成功完成。然后,所有任务都会得到一个新的检查点的完成通知。

    如果要接收这样的通知并对其做出反应,可以让你的RichFunction实现CheckpointListener接口。以下是一个基本示例:

    函数使用ListState进行状态管理,每个接收到的元素都会被添加到状态中。并且,我们实现了notifyCheckpointComplete(long checkpointId)函数,以便在每次成功完成检查点后接收到通知。这个函数里你可以进行一些操作如清除状态、更新外部系统等。

    触发的notifyCheckpointComplete方法是在下一次checkpoint发生在Task周的快照操作之前,具体的实现要根据你的检查点配置和故障恢复能力进行规划。

    public class MyFunction extends RichMapFunction<Long, Long> implements CheckpointListener {
    
        private transient ListState<Long> checkpointedState;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            ListStateDescriptor<Long> descriptor = 
                new ListStateDescriptor<>("state", Long.class);
            checkpointedState = getRuntimeContext().getListState(descriptor);
        }
    
        @Override
        public Long map(Long value) throws Exception {
            checkpointedState.add(value);
            return value;
        }
    
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            // 监听到检查点成功完成的通知,此处可以进行相关逻辑处理
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    参考文档

  • 相关阅读:
    JAVA多线程基础篇--守护线程(Daemon Thread)
    云网融合赋能智慧转型,“天翼云管 ”开启贴身云管家时代
    VScode 使用ESlint检查代码
    基于Java毕业设计羽毛球馆场地管理系统源码+系统+mysql+lw文档+部署软件
    【入门篇】ClickHouse 的安装与配置
    Linux常用命令
    android 动画中插值器Interpolator详解
    “判断性别”Demo需求分析和初步设计(中)
    Ceph 分布式文件系统 搭建及使用
    Java SE 19 虚拟线程
  • 原文地址:https://blog.csdn.net/wangshuai6707/article/details/134087238