• 大数据-玩转数据-Flink状态后端(下)


    一、状态后端

    每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。

    状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)。

    当使用checkpoint时,状态(state)会被持久化到checkpoint上,以防止数据的丢失并确保发生故障时能够完全恢复。状态是通过什么方式在哪里持久化,取决于使用的状态后端。

    状态后端主要负责两件事:本地(taskmanager)的状态管理,将检查点(checkpoint)状态写入远程存储。

    二、状态后端分类

    Flink提供了3种状态后端,MemoryStateBackend,FsStateBackend,RocksDBStateBackend,作为一个可插入的组件,没有固定的配置,根据需要进行选择。注意:如果什么都不配置,系统默认的是MemoryStateBackend。

    三、MemoryStateBackend

    存储方式:本地状态存储在TaskManager的内存中,checkpoint 存储在JobManager的内存中。
    特点:快速,低延迟, 但不稳定。
    使用场景:1. 本地测试 ;2. 几乎无状态的作业(ETL) ;3. JobManager不容易挂, 或者挂了影响不大;4. 不推荐在生产环境下使用。

    四、FsStateBackend

    存储方式: 本地状态在TaskManager内存, Checkpoint时, 存储在文件系统(hdfs)中。
    特点:拥有内存级别的本地访问速度, 和更好的容错保证。
    使用场景:1. 常规使用状态的作业,例如分钟级别窗口聚合, join等; 2. 需要开启HA的作业 ;3. 可以应用在生产环境中。

    五、RocksDBStateBackend

    将所有的状态序列化之后,存入本地的RocksDB数据库中。(一种NoSql数据库, KV形式存储)
    存储方式:1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘); 2. Checkpoint在外部文件系统(hdfs)中。
    使用场景:1. 超大状态的作业,例如天级的窗口聚合 ;2. 需要开启HA的作业; 3. 对读写状态性能要求不高的作业 ;4. 可以使用在生产环境。

    六、状态后端文件配置

    在flink-conf.yaml文件中设置默认的全局后端
    在这里插入图片描述
    老的写法:

    memory
    state.backend: jobmanager

    fs
    state.backend: filesystem
    state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

    rocksdb
    state.backend: rocksdb
    state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

    新的写法:
    state.backend: hashmap 或 rocksdb
    state.checkpoints.dir: 文件目录 或 jobmanager

    七、代码配置

    可以在代码中单独为这个Job设置状态后端。
    memory

    env.setStateBackend(new MemoryStateBackend());
    
    • 1

    fs

    env.setStateBackend(new FsStateBackend("hdfs://hadoop100:8020/flink/checkpoints/fs"));
    
    • 1

    rocksdb
    如果要使用RocksDBBackend, 需要先引入依赖:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    env.setStateBackend(new  RocksDBStateBackend("hdfs://hadoop100:8020/flink/checkpoints/rocksdb"));
    
    • 1
  • 相关阅读:
    教小白白Hue安装部署
    Clickhouse通过命令导入导出文件(在Linux命令窗口)
    vue3的api解读-VUE Reactivity
    IP协议从0到1
    将conda虚拟环境添加到JNB
    excel提取单元格中的数字
    Java配置25-搭建Jenkins服务器
    LabVIEW专栏八、类
    三分钟学会数据结构顺序表
    C# CAD 框选pdf输出
  • 原文地址:https://blog.csdn.net/s_unbo/article/details/132788518