每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。
状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)。
当使用checkpoint时,状态(state)会被持久化到checkpoint上,以防止数据的丢失并确保发生故障时能够完全恢复。状态是通过什么方式在哪里持久化,取决于使用的状态后端。
状态后端主要负责两件事:本地(taskmanager)的状态管理,将检查点(checkpoint)状态写入远程存储。
Flink提供了3种状态后端,MemoryStateBackend,FsStateBackend,RocksDBStateBackend,作为一个可插入的组件,没有固定的配置,根据需要进行选择。注意:如果什么都不配置,系统默认的是MemoryStateBackend。
存储方式:本地状态存储在TaskManager的内存中,checkpoint 存储在JobManager的内存中。
特点:快速,低延迟, 但不稳定。
使用场景:1. 本地测试 ;2. 几乎无状态的作业(ETL) ;3. JobManager不容易挂, 或者挂了影响不大;4. 不推荐在生产环境下使用。
存储方式: 本地状态在TaskManager内存, Checkpoint时, 存储在文件系统(hdfs)中。
特点:拥有内存级别的本地访问速度, 和更好的容错保证。
使用场景:1. 常规使用状态的作业,例如分钟级别窗口聚合, join等; 2. 需要开启HA的作业 ;3. 可以应用在生产环境中。
将所有的状态序列化之后,存入本地的RocksDB数据库中。(一种NoSql数据库, KV形式存储)
存储方式:1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘); 2. Checkpoint在外部文件系统(hdfs)中。
使用场景:1. 超大状态的作业,例如天级的窗口聚合 ;2. 需要开启HA的作业; 3. 对读写状态性能要求不高的作业 ;4. 可以使用在生产环境。
在flink-conf.yaml文件中设置默认的全局后端
老的写法:
memory
state.backend: jobmanager
fs
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
rocksdb
state.backend: rocksdb
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
新的写法:
state.backend: hashmap 或 rocksdb
state.checkpoints.dir: 文件目录 或 jobmanager
可以在代码中单独为这个Job设置状态后端。
memory
env.setStateBackend(new MemoryStateBackend());
fs
env.setStateBackend(new FsStateBackend("hdfs://hadoop100:8020/flink/checkpoints/fs"));
rocksdb
如果要使用RocksDBBackend, 需要先引入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:8020/flink/checkpoints/rocksdb"));