前面的文章中介绍过Operator State
,这里介绍一下Keyed State
.
在使用Operator State
时必须要实现CheckpointFunction
接口,而Keyed State
则不需要,在使用keyBy(...)
分组分组后,调用的函数必须是实现RichFuntion
接口的函数才可以使用Keyed State
.同样使用Keyed State
也必须开启Checkpoint
.
Socket
数据源中的字符串进行拼接socket
命令:nc -lk 8888
public class FlinkKeyedState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1,便于观察
env.setParallelism(1);
// 开启Checkpoint, 8秒一个周期并开启一次性语义
env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);
// 指定checkpoint持久化路径
env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));
// 获取Socket数据源
DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);
// 将数据进行分组,将分组key给一个常量值
SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1")
// 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等
.map(new RichMapFunction<String, String>() {
ListState<String> listState;
// open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器
@Override
public void open(Configuration parameters) throws Exception {
// 获取上下文
RuntimeContext ctx = getRuntimeContext();
// 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等
listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class));
}
// 在map方法中正常编写业务逻辑
@Override
public String map(String s) throws Exception {
// 模拟Task失败
if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {
throw new Exception("Task 异常");
}
// 将数据添加到状态存储器中
listState.add(s);
Iterable<String> strings = listState.get();
StringBuilder builder = new StringBuilder();
for (String string : strings) {
builder.append(string);
}
return builder.toString();
}
});
map.print();
env.execute("Keyed State");
}
}
API
的使用大概就这些内容,不过在使用Keyed Sate
时首先要对keyBy
的特性有所了解,才能得到最终想要的结果数据,如使用keyBy
时上下游之间的数据分发模式
、所设置的默认并行度
、上下游算子的并行度
是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.