• flink中使用异步函数的几个注意事项


    背景

    flink系统中,我们为了补充某个流事件成一个完整的记录,经常需要调用外部接口获取一些配置数据,流事件结合这些配置数据就可以组合成一条完整的记录,然而如果同步调用外部系统接口来实现,那么会有很大的性能瓶颈,这种情况下我们一般会使用异步函数提高性能,本文就来记录下使用异步函数的几个注意事项

    异步函数的使用

    首先看一下官方的例子:

    /**
     * 实现 'AsyncFunction' 用于发送请求和设置回调。
     */
    class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
    
        /** 能够利用回调函数并发发送请求的数据库客户端 */
        private transient DatabaseClient client;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            client = new DatabaseClient(host, post, credentials);
        }
    
        @Override
        public void close() throws Exception {
            client.close();
        }
    
        @Override
        public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
    
            // 发送异步请求,接收 future 结果
            final Future<String> result = client.query(key);
    
            // 设置客户端完成请求后要执行的回调函数
            // 回调函数只是简单地把结果发给 future
            CompletableFuture.supplyAsync(new Supplier<String>() {
    
                @Override
                public String get() {
                    try {
                        return result.get();
                    } catch (InterruptedException | ExecutionException e) {
                        // 显示地处理异常。
                        return null;
                    }
                }
            }).thenAccept( (String dbResult) -> {
                resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
            });
        }
    }
    
    // 创建初始 DataStream
    DataStream<String> stream = ...;
    
    // 应用异步 I/O 转换操作
    DataStream<Tuple2<String, String>> resultStream =
        AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    注意事项如下:
    1.在asyncinvoke方法中不能有阻塞的操作,比如这里仅仅是使用Future.thenAccept注册一个回调返回后的处理逻辑,而不会使用Future.get方法进行阻塞操作
    2.AsyncDataStream.orderWait和AsyncDataStream.unorderWait方法都能正确的事件时间,也就是说即使是AsyncDataStream.unorderWait,它也能保证记录不会被之后的水位线超越
    3.异步函数可以和检查点机制进行集成,也就是那些正在等待响应结果的记录会被写入检查点中,当故障恢复后,可以重新发送请求
    4.如果服务端没有提供异步的客户端,我们可以用多线程进行模拟,只要多线程返回future对象即可
    5.使用AsyncDataStream可以限制并发数以及如何进行超时处理等

  • 相关阅读:
    3D建模就业前景如何?加班多吗?值不值得入行
    NTM中attr的用法
    离散数学复习:谓词逻辑
    JavaScript基础 JavaScript第五天 1. 对象
    如何设计日志内容
    关于Pod中进程在节点中的研究
    工作5年,没用过分布式锁,正常吗?
    Java高级之反射
    Beyond Compare 4对比工具注册
    R语言矩阵操作:根据值找到行号和列号
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133980909