有状态的流处理:根据每一次当前输入的数据和一些其他已处理的数据共同转换输出结果的过程,这些其他已处理的数据就称之为状态(state),状态由任务维护,可以被任务的业务逻辑访问。例如,做求和(sum)计算时,需要当前输入的数据和保存的之前所有输入数据的和共同计算;窗口操作中会将当前达到的数据和保存的之前已经到达的所有数据共同处理。Flink 中的聚合算子和窗口算子都属于有状态的算子。
Managed State,所有的托管状态都由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现
Operator State,状态作用范围限定为当前的算子任务实例,只对当前的并行子任务实例有效;使用较少
public class TestFlinkOperatorState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
});
//定义一个有状态的map算子,用于统计输入数据个数
DataStream<Integer> resultStream = dataStream.map(new MyCountMapper());
resultStream.print();
env.execute();
}
//定义有状态的 map 操作
//实现 ListCheckpointed 接口,泛型为状态数据类型
public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {
//定义一个本地变量作为状态
private Integer count = 0;
@Override
public Integer map(SensorReading value) throws Exception {
count++;
return count;
}
//对状态做快照
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(count);
}
//容错恢复状态
@Override
public void restoreState(List<Integer> state) throws Exception {
for(Integer num : state) {
count += num;
}
}
}
}
Keyed State,状态的作用范围以 key 来隔离,是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,即 keyBy 之后才可以使用
//按键分区状态的实例化方法:在富函数中,调用 getRuntimeContext() 方法获取到运行时上下文之后
ValueState<T> getState(ValueStateDescriptor<T>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
ValueState
,将状态表示为单个的值,值的类型为 T
ValueState.value()
:获取状态值ValueState.update(T value)
:添加或更新状态值ValueState.clear()
:清空操作ListState
,将状态表示为一组数据的列表,列表里的元素的数据类型为 T
ListState.add(T value)
:追加状态值ListState.addAll(List values)
:追加状态值列表ListState.get()
:获取状态值的 Iterable
ListState.update(List values)
:更新状态值列表ListState.clear()
:清空操作MapState
,将状态表示为一组 Key-Value 对
MapState.get(UK key)
:获取状态值MapState.put(UK key , UV value)
:添加或更新状态值MapState.contains(UK key)
:判断状态值是否存在MapState.remove(UK key)
:删除状态值MapState.clear()
:清空操作ReducingState
和 AggregatingState
,将状态表示为一个用于聚合操作的列表
ReducingState.add()
:聚合状态值,调用实例化 ReducingState 时自定义 ReduceFunction 中的方法;AggregatingState 同理ReducingState.clear()
:清空操作,AggregatingState 同理/**
按键分区状态的使用步骤:
1. 在自定义算子Function中声明一个按键分区数据结构,由于声明时需要使用 getRuntimeContext(),因此要使用继承富函数类的方式自定义算子Function
2. 在自定义算子Function的对应算子方法中进行状态的读写等相关操作
*/
public class TestFlinkKeyedState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
});
/*
需求:自定义有状态的map算子,按sensor_id统计个数
*/
//使用按键分区状态必须先进行keyBy
DataStream<Integer> resultStream = dataStream.keyBy("id").map(new MyKeyCountMapper());
resultStream.print();
env.execute();
}
//使用继承富函数类的方式自定义MapFunction
public static class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer> {
//定义一个值状态属性
private ValueState<Integer> myValueState;
//在open方法中实例化值状态
@Override
public void open(Configuration parameters) throws Exception {
myValueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("value-state", Integer.class));
}
@Override
public Integer map(SensorReading value) throws Exception {
//获取状态值
Integer count = myValueState.value();
if(count == null) {
count = 0;
}
count++;
//更新状态值
myValueState.update(count);
return count;
}
}
}
Raw State,原始状态是自定义的,相当于开辟了一块内存,需要开发者自己管理,实现状态的序列化和故障恢复
/**
需求:检测同一个传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警信息
*/
public class FlinkKeyedStateCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//定义一个有状态的 flatMap 操作,若同一个传感器连续两个温度的差值超过 10 度,则输出报警
//报警信息:sensor_id,前一次温度值,当前温度值
DataStream<Tuple3<String, Double, Double>> warningStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));
warningStream.print();
env.execute();
}
//使用继承富函数类的方式自定义FlatMapFunction
public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {
//定义温度差阈值属性
private Double threshold;
//定义值状态属性,保存上一次的温度值
private ValueState<Double> lastTempState;
public TempChangeWarning(Double threshold) {
this.threshold = threshold;
}
//在open方法中实例化值状态
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));
}
//重写flatMap方法
@Override
public void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {
//获取上一次温度状态值
Double lastTemp = lastTempState.value();
//如果状态值不为null,则进行差值判断
if(lastTemp != null) {
Double diff = Math.abs(lastTemp - value.getTemperature());
//差值超过阈值,则输出报警信息
if(diff >= threshold) {
out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));
}
}
//更新状态值
lastTempState.update(value.getTemperature());
}
//在close方法中清空状态
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}
State Backends,一个可插入的决定状态的存储、访问以及维护等工作的组件
在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backends)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。
MemoryStateBackend:内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储
在 TaskManager 的 JVM 堆上;而将 checkpoint 存储在 JobManager 的内存中。
FsStateBackend:文件系统级的状态后端,对于本地状态,跟 MemoryStateBackend 一样,也会存储在 TaskManager 的 JVM 堆上,但会将 checkpoint 存储到远程的持久化文件系统(FileSystem)中,如 HDFS。
RocksDBStateBackend:将所有状态和 checkpoint 序列化后,存入本地的 RocksDB 中存储。RocksDBStateBackend 的支持并不直接包含在 flink 中,需要引入依赖。
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-statebackend-rocksdb_2.12artifactId>
<version>1.10.1version>
dependency>
进入 flink 安装目录下的 conf 目录,打开 flink-conf.yaml
文件
cd /opt/module/flink/conf
vim flink-conf.yaml
在文件中的 Fault tolerance and checkpointing
部分进行配置
#Fault tolerance and checkpointing
#============================================================
state.backend: filesystem #默认值为 filesystem,可选值为 jobmanager/filesystem/rocksdb
#state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints
jobmanager.execution.failover-strategy: region #容错恢复策略,默认是按区域恢复
在代码中为每个作业单独配置状态后端
public class TestStatebackend {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//配置状态后端
//1.MemoryStateBackend
env.setStateBackend(new MemoryStateBackend());
//2.FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://......"));
//3.RocksDBStateBackend,需要先引入依赖
env.setStateBackend(new RocksDBStateBackend("checkpointDataUri"));
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.print();
env.execute();
}
}