• Flink使用AsyncDataStream异步处理数据


    转载请标明出处:
    https://bigmaning.blog.csdn.net/article/details/125455916
    本文出自:【BigManing的博客】

    场景需求

    一共有20条数据,每条数据处理需要2秒,分别使用同步、异步方式处理,一共需要多久才能处理完呢?

    接下来进行简单的验证,如有疑惑,欢迎指正。

    同步方式

    使用map算子模拟同步

    代码

    	/**
         * 同步方式  
         */
        private static void syncHandleData() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.generateSequence(1, 20)
                    .map(aLong -> {
                        return handleData(aLong);
                    });
            env.execute("test");
        }
    
        /**
         * 模拟业务 耗时2s操作
         *
         */
        private static Long handleData(Long aLong) throws InterruptedException {
            String dateTime = DateFormatUtils.ISO_DATETIME_FORMAT.format(new Date());
            System.out.println(dateTime + "====> 处理第" + aLong + "个任务... 开始 " + Thread.currentThread().getName());
            // 模拟 耗时2s处理
            Thread.sleep(2000);
            dateTime = DateFormatUtils.ISO_DATETIME_FORMAT.format(new Date());
            System.out.println(dateTime + "====> 处理第" + aLong + "个任务... 结束");
            return aLong;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    结果输出

    从以下结果可以得出结论:

    1. 每个任务都是按照顺序执行的
    2. 每个任务都是耗时2s()
    3. 总耗时为 20*2 = 40s
    2022-06-25T09:53:39====> 处理第1个任务... 开始 Thread-7
    2022-06-25T09:53:41====> 处理第1个任务... 结束
    2022-06-25T09:53:41====> 处理第2个任务... 开始 Thread-7
    2022-06-25T09:53:43====> 处理第2个任务... 结束
    2022-06-25T09:53:43====> 处理第3个任务... 开始 Thread-7
    2022-06-25T09:53:45====> 处理第3个任务... 结束
    2022-06-25T09:53:45====> 处理第4个任务... 开始 Thread-7
    2022-06-25T09:53:47====> 处理第4个任务... 结束
    2022-06-25T09:53:47====> 处理第5个任务... 开始 Thread-7
    2022-06-25T09:53:49====> 处理第5个任务... 结束
    2022-06-25T09:53:49====> 处理第6个任务... 开始 Thread-7
    2022-06-25T09:53:51====> 处理第6个任务... 结束
    2022-06-25T09:53:51====> 处理第7个任务... 开始 Thread-7
    2022-06-25T09:53:53====> 处理第7个任务... 结束
    2022-06-25T09:53:53====> 处理第8个任务... 开始 Thread-7
    2022-06-25T09:53:55====> 处理第8个任务... 结束
    2022-06-25T09:53:55====> 处理第9个任务... 开始 Thread-7
    2022-06-25T09:53:57====> 处理第9个任务... 结束
    2022-06-25T09:53:57====> 处理第10个任务... 开始 Thread-7
    2022-06-25T09:53:59====> 处理第10个任务... 结束
    2022-06-25T09:53:59====> 处理第11个任务... 开始 Thread-7
    2022-06-25T09:54:01====> 处理第11个任务... 结束
    2022-06-25T09:54:01====> 处理第12个任务... 开始 Thread-7
    2022-06-25T09:54:03====> 处理第12个任务... 结束
    2022-06-25T09:54:03====> 处理第13个任务... 开始 Thread-7
    2022-06-25T09:54:05====> 处理第13个任务... 结束
    2022-06-25T09:54:05====> 处理第14个任务... 开始 Thread-7
    2022-06-25T09:54:07====> 处理第14个任务... 结束
    2022-06-25T09:54:07====> 处理第15个任务... 开始 Thread-7
    2022-06-25T09:54:09====> 处理第15个任务... 结束
    2022-06-25T09:54:09====> 处理第16个任务... 开始 Thread-7
    2022-06-25T09:54:11====> 处理第16个任务... 结束
    2022-06-25T09:54:11====> 处理第17个任务... 开始 Thread-7
    2022-06-25T09:54:13====> 处理第17个任务... 结束
    2022-06-25T09:54:13====> 处理第18个任务... 开始 Thread-7
    2022-06-25T09:54:15====> 处理第18个任务... 结束
    2022-06-25T09:54:15====> 处理第19个任务... 开始 Thread-7
    2022-06-25T09:54:17====> 处理第19个任务... 结束
    2022-06-25T09:54:17====> 处理第20个任务... 开始 Thread-7
    2022-06-25T09:54:19====> 处理第20个任务... 结束
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    异步方式

    使用AsyncDataStream数据流,因为我们不需要关注顺序,所以使用unorderedWait方法。如果你需要关注顺序,可以使用orderedWait方法。

    代码

    
        /**
         * 异步方式
         */
        private static void asyncHandleData() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<Long> longDataStreamSource = env.generateSequence(1, 20);
            // 不用关心顺序,异步处理完直接发送到下游
            AsyncDataStream.unorderedWait(longDataStreamSource, new MyRichAsyncFunction(), 1, TimeUnit.MINUTES, 5);
            env.execute("test");
        }
    
        /**
         * RichAsyncFunction 异步处理Function 实现类
         */
        static class MyRichAsyncFunction extends RichAsyncFunction<Long, Long> {
    
            private ThreadPoolExecutor threadPoolExecutor;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // 初始化操作
                threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
            }
    
            @Override
            public void close() throws Exception {
                super.close();
                // 善后操作
                threadPoolExecutor.shutdown();
            }
    
            @Override
            public void timeout(Long input, ResultFuture<Long> resultFuture) throws Exception {
                // 超时操作
                resultFuture.completeExceptionally(new TimeoutException());
            }
    
            @Override
            public void asyncInvoke(Long aLong, ResultFuture<Long> resultFuture) throws Exception {
                // 异步客户端处理
                CompletableFuture.runAsync(() -> {
                    try {
                        handleData(aLong);
                    } catch (InterruptedException e) {
                        resultFuture.completeExceptionally(e);
                        return;
                    }
                    resultFuture.complete(Collections.singleton(aLong));
                }, threadPoolExecutor);
            }
    
        }
        
        /**
         * 模拟 耗时2s操作
         */
        private static Long handleData(Long aLong) throws InterruptedException {
            String dateTime = DateFormatUtils.ISO_DATETIME_FORMAT.format(new Date());
            System.out.println(dateTime + "====> 处理第" + aLong + "个任务... 开始 " + Thread.currentThread().getName());
            // 模拟 耗时2s处理
            Thread.sleep(2000);
            dateTime = DateFormatUtils.ISO_DATETIME_FORMAT.format(new Date());
            System.out.println(dateTime + "====> 处理第" + aLong + "个任务... 结束");
            return aLong;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    结果输出

    从以下结果可以得出结论:

    1. 任务是异步并发处理的
    2. 由于是AsyncDataStream.unorderedWait,所以顺序是无序的
    3. 总耗时为 8s (2022-06-25T10:23:09 - 2022-06-25T10:23:01)
    2022-06-25T10:23:01====> 处理第4个任务... 开始 pool-3-thread-4
    2022-06-25T10:23:01====> 处理第2个任务... 开始 pool-3-thread-2
    2022-06-25T10:23:01====> 处理第5个任务... 开始 pool-3-thread-5
    2022-06-25T10:23:01====> 处理第3个任务... 开始 pool-3-thread-3
    2022-06-25T10:23:01====> 处理第1个任务... 开始 pool-3-thread-1
    2022-06-25T10:23:03====> 处理第4个任务... 结束
    2022-06-25T10:23:03====> 处理第2个任务... 结束
    2022-06-25T10:23:03====> 处理第1个任务... 结束
    2022-06-25T10:23:03====> 处理第3个任务... 结束
    2022-06-25T10:23:03====> 处理第5个任务... 结束
    2022-06-25T10:23:03====> 处理第6个任务... 开始 pool-3-thread-4
    2022-06-25T10:23:03====> 处理第7个任务... 开始 pool-3-thread-2
    2022-06-25T10:23:03====> 处理第8个任务... 开始 pool-3-thread-1
    2022-06-25T10:23:03====> 处理第9个任务... 开始 pool-3-thread-3
    2022-06-25T10:23:03====> 处理第10个任务... 开始 pool-3-thread-5
    2022-06-25T10:23:05====> 处理第6个任务... 结束
    2022-06-25T10:23:05====> 处理第7个任务... 结束
    2022-06-25T10:23:05====> 处理第8个任务... 结束
    2022-06-25T10:23:05====> 处理第9个任务... 结束
    2022-06-25T10:23:05====> 处理第11个任务... 开始 pool-3-thread-4
    2022-06-25T10:23:05====> 处理第10个任务... 结束
    2022-06-25T10:23:05====> 处理第12个任务... 开始 pool-3-thread-1
    2022-06-25T10:23:05====> 处理第13个任务... 开始 pool-3-thread-2
    2022-06-25T10:23:05====> 处理第14个任务... 开始 pool-3-thread-3
    2022-06-25T10:23:05====> 处理第15个任务... 开始 pool-3-thread-5
    2022-06-25T10:23:07====> 处理第13个任务... 结束
    2022-06-25T10:23:07====> 处理第15个任务... 结束
    2022-06-25T10:23:07====> 处理第11个任务... 结束
    2022-06-25T10:23:07====> 处理第12个任务... 结束
    2022-06-25T10:23:07====> 处理第14个任务... 结束
    2022-06-25T10:23:07====> 处理第16个任务... 开始 pool-3-thread-3
    2022-06-25T10:23:07====> 处理第17个任务... 开始 pool-3-thread-5
    2022-06-25T10:23:07====> 处理第18个任务... 开始 pool-3-thread-4
    2022-06-25T10:23:07====> 处理第19个任务... 开始 pool-3-thread-1
    2022-06-25T10:23:07====> 处理第20个任务... 开始 pool-3-thread-2
    2022-06-25T10:23:09====> 处理第18个任务... 结束
    2022-06-25T10:23:09====> 处理第17个任务... 结束
    2022-06-25T10:23:09====> 处理第16个任务... 结束
    2022-06-25T10:23:09====> 处理第19个任务... 结束
    2022-06-25T10:23:09====> 处理第20个任务... 结束
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    延伸

    使用orderedWait会有什么样的表现呢?

    代码修改如下:

    
        /**
         * 异步方式
         */
        private static void asyncHandleData() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<Long> longDataStreamSource = env.generateSequence(1, 20);
            // 关心顺序,使用orderedWait, 异步处理完 并且是位于队列第一个 才发送到下游
            AsyncDataStream.orderedWait(longDataStreamSource, new MyRichAsyncFunction(), 1, TimeUnit.MINUTES, 5)//     
                    //  验证 orderedWait   
                    .print("打印结果:");
            env.execute("test");
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    运行结果:

    2022-06-25T10:39:57====> 处理第2个任务... 开始 pool-3-thread-2
    2022-06-25T10:39:57====> 处理第4个任务... 开始 pool-3-thread-4
    2022-06-25T10:39:57====> 处理第5个任务... 开始 pool-3-thread-5
    2022-06-25T10:39:57====> 处理第3个任务... 开始 pool-3-thread-3
    2022-06-25T10:39:57====> 处理第1个任务... 开始 pool-3-thread-1
    2022-06-25T10:39:59====> 处理第4个任务... 结束
    2022-06-25T10:39:59====> 处理第1个任务... 结束
    2022-06-25T10:39:59====> 处理第2个任务... 结束
    2022-06-25T10:39:59====> 处理第3个任务... 结束
    2022-06-25T10:39:59====> 处理第5个任务... 结束
    打印结果:> 1
    打印结果:> 2
    打印结果:> 3
    打印结果:> 4
    打印结果:> 5
    2022-06-25T10:39:59====> 处理第6个任务... 开始 pool-3-thread-1
    2022-06-25T10:39:59====> 处理第7个任务... 开始 pool-3-thread-2
    2022-06-25T10:39:59====> 处理第8个任务... 开始 pool-3-thread-4
    2022-06-25T10:39:59====> 处理第9个任务... 开始 pool-3-thread-3
    2022-06-25T10:39:59====> 处理第10个任务... 开始 pool-3-thread-5
    2022-06-25T10:40:01====> 处理第9个任务... 结束
    2022-06-25T10:40:01====> 处理第6个任务... 结束
    2022-06-25T10:40:01====> 处理第8个任务... 结束
    2022-06-25T10:40:01====> 处理第7个任务... 结束
    2022-06-25T10:40:01====> 处理第10个任务... 结束
    打印结果:> 6
    打印结果:> 7
    打印结果:> 8
    打印结果:> 9
    打印结果:> 10
    2022-06-25T10:40:01====> 处理第11个任务... 开始 pool-3-thread-3
    2022-06-25T10:40:01====> 处理第12个任务... 开始 pool-3-thread-1
    2022-06-25T10:40:01====> 处理第13个任务... 开始 pool-3-thread-4
    2022-06-25T10:40:01====> 处理第14个任务... 开始 pool-3-thread-2
    2022-06-25T10:40:01====> 处理第15个任务... 开始 pool-3-thread-5
    2022-06-25T10:40:03====> 处理第11个任务... 结束
    2022-06-25T10:40:03====> 处理第12个任务... 结束
    2022-06-25T10:40:03====> 处理第13个任务... 结束
    打印结果:> 11
    打印结果:> 12
    打印结果:> 13
    2022-06-25T10:40:03====> 处理第14个任务... 结束
    2022-06-25T10:40:03====> 处理第15个任务... 结束
    2022-06-25T10:40:03====> 处理第16个任务... 开始 pool-3-thread-2
    打印结果:> 14
    2022-06-25T10:40:03====> 处理第17个任务... 开始 pool-3-thread-1
    打印结果:> 15
    2022-06-25T10:40:03====> 处理第18个任务... 开始 pool-3-thread-4
    2022-06-25T10:40:03====> 处理第19个任务... 开始 pool-3-thread-3
    2022-06-25T10:40:03====> 处理第20个任务... 开始 pool-3-thread-5
    2022-06-25T10:40:05====> 处理第16个任务... 结束
    打印结果:> 16
    2022-06-25T10:40:05====> 处理第17个任务... 结束
    2022-06-25T10:40:05====> 处理第19个任务... 结束
    2022-06-25T10:40:05====> 处理第18个任务... 结束
    打印结果:> 17
    2022-06-25T10:40:05====> 处理第20个任务... 结束
    打印结果:> 18
    打印结果:> 19
    打印结果:> 20
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
  • 相关阅读:
    人工智能导论
    腾讯mini项目-【指标监控服务重构-会议记录】2023-08-04
    记录一个 Hudi HBase 依赖冲突问题及解决方案
    MVC、MVP、MVVM区别
    (附源码)ssm财务管理系统 毕业设计 282251
    DC3算法相关题目
    二,几何相交-5,BO算法实现--(3)事件和操作
    SIP没有摘机消息可以通话吗
    后端接口性能优化分析-程序结构优化
    Qt编程-QTableView冻结行或冻结列或冻结局部单元格
  • 原文地址:https://blog.csdn.net/qq_27818541/article/details/125455916