在flink系统中,我们为了补充某个流事件成一个完整的记录,经常需要调用外部接口获取一些配置数据,流事件结合这些配置数据就可以组合成一条完整的记录,然而如果同步调用外部系统接口来实现,那么会有很大的性能瓶颈,这种情况下我们一般会使用异步函数提高性能,本文就来记录下使用异步函数的几个注意事项
首先看一下官方的例子:
/**
* 实现 'AsyncFunction' 用于发送请求和设置回调。
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** 能够利用回调函数并发发送请求的数据库客户端 */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// 发送异步请求,接收 future 结果
final Future<String> result = client.query(key);
// 设置客户端完成请求后要执行的回调函数
// 回调函数只是简单地把结果发给 future
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// 显示地处理异常。
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// 创建初始 DataStream
DataStream<String> stream = ...;
// 应用异步 I/O 转换操作
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
注意事项如下:
1.在asyncinvoke方法中不能有阻塞的操作,比如这里仅仅是使用Future.thenAccept注册一个回调返回后的处理逻辑,而不会使用Future.get方法进行阻塞操作
2.AsyncDataStream.orderWait和AsyncDataStream.unorderWait方法都能正确的事件时间,也就是说即使是AsyncDataStream.unorderWait,它也能保证记录不会被之后的水位线超越
3.异步函数可以和检查点机制进行集成,也就是那些正在等待响应结果的记录会被写入检查点中,当故障恢复后,可以重新发送请求
4.如果服务端没有提供异步的客户端,我们可以用多线程进行模拟,只要多线程返回future对象即可
5.使用AsyncDataStream可以限制并发数以及如何进行超时处理等