• Flink之常用处理函数


    处理函数

    概述

    处理函数(Processing Function)是Apache Flink中用于对数据流上的元素进行处理的核心组件之一。处理函数负责定义数据流上的数据如何被处理,允许开发人员编写自定义逻辑以执行各种操作,如转换、聚合、筛选、连接等,并在处理后生成输出数据流。

    对于数据流,都可以直接调用.process()方法进行自定义处理,传入的参数就叫作处理函数,也可以把它划分为转换算子。

    基本处理函数

    ProcessFunction是最基本的处理函数,基于DataStream直接调用.process()时作为参数传入

    ProcessFunction介绍

    ProcessFunction是一个抽象类,它继承AbstractRichFunction,有两个泛型类型参数:

    1.输入的数据类型
    
    2.处理完成之后输出数据类型
    
    • 1
    • 2
    • 3

    内部单独定义了两个方法:

    1.必须要实现的抽象方法.processElement()
    
    2.一个非抽象方法.onTimer()
    
    • 1
    • 2
    • 3

    ProcessFunction类如下:

    /**
     * 处理流元素的函数
     *
     * 对于输入流中的每个元素,调用processElement(Object,ProcessFunction.Context,Collector) 可以产生零个或多个元素作为输出
     * 还可以通过提供的ProcessFunction.Context查询时间和设置计时器
     *
     * 对于触发计时器,将调用onTimer(long,ProcessFunction.OnTimerContext,Collector) 可以再次产生零个或多个元素作为输出,并注册其他计时器
     *
     * @param  输入元素的类型
     * @param  输出元素的类型
     */
    @PublicEvolving
    public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
    
        private static final long serialVersionUID = 1L;
    
        /**
         * 处理输入流中的一个元素,对于流中的每个元素都会调用一次
         *
         * 可以使用输出零个或多个元素收集器参数,并使用更新内部状态或设置计时器ProcessFunction.Context参数
         *
         * @param value 输入值,类型与流中数据类型一致
         * @param ctx ProcessFunction的内部抽象类Context,表示当前运行的上下文,可以获取当前时间戳,用于查询时间和注册定时器的定时服务
         * @param out 用于返回结果值的收集器,与out.collect()方法向下游发数据类似
         */
        public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    
        /**
         * 当使用设置计时器时调用TimerService
         * 
         * 只有在注册好的定时器触发的时候才会调用,而定时器是通过定时服务TimerService来注册的
         * 
         * 事件时间语义下就是由水位线watermark来触发
         * 
         * 也可以自定义数据按照时间分组、定时触发计算输出结果,实现类似窗口window的功能
         *
         * @param timestamp 触发计时器的时间戳,指设定好的触发时间
         * @param ctx 上下文
         * @param out 用于返回结果值的收集器
         */
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    使用示例

    基本处理函数ProcessFunction的使用与基本的转换操作类似,直接基于DataStream调用.process()方法,传入一个ProcessFunction作为参数,用来定义处理逻辑。

    具体举例使用示例如下:

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
            DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, -6));
            /**
             * 创建OutputTag对象
             * 分别指定: 标签名、放入侧输出流的数据类型(Typeinformation)
             */
            OutputTag<Integer> evenTag = new OutputTag<>("even", Types.INT);
            OutputTag<Integer> oddTag = new OutputTag<>("odd", Types.INT);
    
            // 使用process算子
            SingleOutputStreamOperator<Integer> process = stream.process(
                    new ProcessFunction<Integer, Integer>() {
                        @Override
                        public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                            if (value > 0) {
                                if (value % 2 == 0) {
                                    // 偶数放到侧输出流evenTag中
                                    // 调用上下文对象ctx的output方法,分别传入 Tag对象、放入侧输出流中的数据
                                    ctx.output(evenTag, value);
                                } else if (value % 2 == 1) {
                                    // 奇数放到侧输出流oddTag中
                                    ctx.output(oddTag, value);
                                }
                            } else {
                                // 负数 数据,放到主流中
                                out.collect(value);
                            }
                        }
                    }
            );
    
            // 在主流中,根据标签 获取 侧输出流
            SideOutputDataStream<Integer> even = process.getSideOutput(evenTag);
            SideOutputDataStream<Integer> odd = process.getSideOutput(oddTag);
    
            // 打印主流
            process.printToErr("主流-负数-job");
            //打印 侧输出流
            even.print("偶数-job");
            odd.print("奇数-job");
            env.execute();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    奇数-job:1> 1
    偶数-job:2> 2
    奇数-job:1> 3
    偶数-job:2> 4
    奇数-job:1> 5
    主流-负数-job:2> -6
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    按键分区处理函数

    KeyedProcessFunction对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,必须基于KeyedStream

    KeyedProcessFunction介绍

    KeyedProcessFunction与ProcessFunction的定义几乎完全一样,区别只是在于类型参数多了一个K,这是当前按键分区的key的类型。

    按键分区处理函数接口如下:

    public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    
        public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    
    
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    定时器Timer和定时服务TimerService

    另外在KeyedStream中是支持使用定时服务TimerService,可以通过它访问流中的事件event、时间戳timestamp、水位线watermark,甚至可以注册定时事件。

    在onTimer()方法中可以实现定时处理的逻辑,而它触发的前提是之前曾经注册过定时器、并且现在已经到了触发时间。

    注册定时器的功能是通过上下文中提供的定时服务来实现的。

    // 获取定时服务
    TimerService timerService = ctx.timerService();
    
    • 1
    • 2

    TimerService是Flink关于时间和定时器的基础服务接口,对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器,具体方法如下:

    // 获取当前的处理时间
    long currentProcessingTime();
    // 获取当前的水位线(事件时间)
    long currentWatermark();
    
    // 注册处理时间定时器,当处理时间超过time时触发
    void registerProcessingTimeTimer(long time);
    // 注册事件时间定时器,当水位线超过time时触发
    void registerEventTimeTimer(long time);
    
    // 删除触发时间为time的处理时间定时器
    void deleteProcessingTimeTimer(long time);
    // 删除触发时间为time的处理时间定时器
    void deleteEventTimeTimer(long time);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    注意:

    尽管处理函数中都可以访问TimerService,不过只有基于KeyedStream的处理函数,才能去调用注册和删除定时器的方法

    使用示例

    直接基于keyBy之后的KeyedStream,直接调用.process()方法,传入KeyedProcessFunction的实现类参数

    必须实现processElement()抽象方法,用来处理流中的每一个数据
    
    必须实现非抽象方法onTimer(),用来定义定时器触发时的回调操作
    
    • 1
    • 2
    • 3
     public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 从socket接收数据流
            SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)
                    // 将输入数据转换为Tuple2
                    .map(new MapFunction<String, Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> map(String value) throws Exception {
                            String[] split = value.split(",");
                            return Tuple2.of(split[0], Integer.valueOf(split[1]));
                        }
                    })
                    // 指定 watermark策略
                    .assignTimestampsAndWatermarks(
                            // 定义Watermark策略
                            WatermarkStrategy
                                    .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                    .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                    );
    
            // keyBy分区
            KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);
    
            // 按键分区处理函数
            SingleOutputStreamOperator<Integer> process = keyByStream.process(
                    new KeyedProcessFunction<String, Tuple2<String, Integer>, Integer>() {
                        /**
                         * 来一条数据调用一次
                         * @param value 当前数据
                         * @param ctx 上下文
                         * @param out 收集器
                         * @throws Exception
                         */
                        @Override
                        public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Integer> out) throws Exception {
                            //获取当前数据的key
                            String currentKey = ctx.getCurrentKey();
    p();
                            // 获取定时服务
                            TimerService timerService = ctx.timerService();
    						// 数据中提取出来的事件时间
                            Long currentEventTime = ctx.timestam
    						// 注册事件时间定时器
                            timerService.registerEventTimeTimer(3000L);
                            System.out.println("key: " + currentKey + " 当前数据: " + value + " 当前时间: " + currentEventTime + " 注册一个3s定时器");
    
                        /**
                         * 时间进展到定时器注册的时间,调用该方法
                         * @param timestamp 定时器被触发时的时间
                         * @param ctx       上下文
                         * @param out       采集器
                         */
                        @Override
                        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
                            super.onTimer(timestamp, ctx, out);
                            String currentKey = ctx.getCurrentKey();
                            System.out.println("key: " + currentKey + " 时间: " + timestamp + " 定时器触发");
                        }
                    }
            );
            process.print();
            env.execute();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    其他

    1.注册一个事件时间的定时器

    事件时间定时器,通过watermark来触发,即watermark >= 注册的时间
    
    水印watermark = 当前最大事件时间 - 等待时间 -1ms
    
    例子:等待3s,3s定时器,事件时间6s 则watermark = 6s - 3s -1ms = 2.99s,不会触发3s的定时器
    
    • 1
    • 2
    • 3
    • 4
    • 5
    // 数据中提取出来的事件时间
    Long currentEventTime = ctx.timestam
    // 注册事件时间定时器
    timerService.registerEventTimeTimer(3000L);
    System.out.println("key: " + currentKey + " 当前数据: " + value + " 当前时间: " + currentEventTime + " 注册一个3s定时器");
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输入数据如下,当输入7时,水位线是7-3=4s-1ms=3.99s,即水位线超过定时器3s,执行触发回调操作

    nc -lk 8086
    key1,1
    key1,2
    key2,3
    key2,4
    key1,5
    key2,6
    key1,7
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    控制台输出:

    key: key1 当前数据: (key1,1) 当前时间: 1000 注册一个3s定时器
    key: key1 当前数据: (key1,2) 当前时间: 2000 注册一个3s定时器
    key: key2 当前数据: (key2,3) 当前时间: 3000 注册一个3s定时器
    key: key2 当前数据: (key2,4) 当前时间: 4000 注册一个3s定时器
    key: key1 当前数据: (key1,5) 当前时间: 5000 注册一个3s定时器
    key: key2 当前数据: (key2,6) 当前时间: 6000 注册一个3s定时器
    key: key1 当前数据: (key1,7) 当前时间: 7000 注册一个3s定时器
    key: key1 时间: 3000 定时器触发
    key: key2 时间: 3000 定时器触发
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    注意:

    TimerService会以键和时间戳为标准,对定时器进行去重,因此对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

    2.注册一个处理时间的定时器

    long currentTs = timerService.currentProcessingTime();
    timerService.registerProcessingTimeTimer(currentTs + 3000L);
     System.out.println("key: " + currentKey + " 当前数据: " + value + " 当前时间: " + currentTs + " 注册一个3s后的定时器");
    
    • 1
    • 2
    • 3

    输入测试数据如下:

    key1,1
    key2,2
    
    • 1
    • 2

    当注册一个处理时间的定时器,3s后定时器会触发操作

    key: key1 当前数据: (key1,1) 当前时间: 1688136512301 注册一个3s后的定时器
    key: key2 当前数据: (key2,2) 当前时间: 1688136514179 注册一个3s后的定时器
    key: key1 时间: 1688136515301 定时器触发
    key: key2 时间: 1688136517179 定时器触发
    
    • 1
    • 2
    • 3
    • 4

    3.获取process当前watermark

    long currentWatermark = timerService.currentWatermark();
    System.out.println("当前数据: " + value + " 当前watermark: " + currentWatermark);
    
    • 1
    • 2
    key1,1
    key1,2
    key1,3
    
    • 1
    • 2
    • 3

    结论:每次process处理,watermark是指上一条数据的事件时间-等待时间,例如:3-2-1ms=-1001

    当前数据=(key1,1),当前watermark=-9223372036854775808
    当前数据=(key1,2),当前watermark=-2001
    当前数据=(key1,3),当前watermark=-1001
    
    • 1
    • 2
    • 3

    4.删除一个处理时间定时器

                            // 注册处理时间定时器
                            long currentTs = timerService.currentProcessingTime();
                            long timer = currentTs + 3000;
                            timerService.registerProcessingTimeTimer(timer);
                            System.out.println("key: " + currentKey + " 当前数据: " + value + " 当前时间: " + currentTs + " 注册一个3s后的定时器");
    
                            // 在3000毫秒后删除处理时间定时器
                            if("key1".equals(currentKey)){
                                timerService.deleteProcessingTimeTimer(timer)
                            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    输入测试数据:

    key1,1
    key2,2
    
    • 1
    • 2

    控制台输出结果:

    key: key1 当前数据: (key1,1) 当前时间: 1688138104565 注册一个3s后的定时器
    key: key2 当前数据: (key2,2) 当前时间: 1688138106441 注册一个3s后的定时器
    key: key2 时间: 1688138109441 定时器触发
    
    • 1
    • 2
    • 3

    窗口处理函数

    窗口处理函数就是一种典型的全窗口函数,它是基于WindowedStream直接调用.process()方法

    窗口处理函数有2个:

    1.ProcessWindowFunction:

    开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入,必须是keyBy的数据流

    2.ProcessAllWindowFunction:

    同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入,必须是非keyBy的数据流

    ProcessWindowFunction介绍

    ProcessWindowFunction既是处理函数又是全窗口函数,具体接口如下:

          /**
         * ProcessWindowFunction它有四个类型参数:
    
         * @param  数据流中窗口任务的输入数据类型
         * @param  窗口任务进行计算之后的输出数据类型
         * @param  数据中键key的类型
         * @param   窗口的类型,是Window的子类型。一般情况下我们定义时间窗口,W就是TimeWindow
         */
    
        public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
    
            /**
             * 处理数据的核心方法process()方法
             *
             * @param key 窗口做统计计算基于的键,也就是之前keyBy用来分区的字段
             * @param context 当前窗口进行计算的上下文,它的类型就是ProcessWindowFunction内部定义的抽象类Context
             * @param elements 窗口收集到用来计算的所有数据,这是一个可迭代的集合类型
             * @param out 用来发送数据输出计算结果的收集器,类型为Collector
             * @throws Exception
             */
            public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
    
            /**
             * 主要是进行窗口的清理工作
             * 如果自定义了窗口状态,那么必须在clear()方法中进行显式地清除,避免内存溢出
             * @param context 当前窗口进行计算的上下文
             * @throws Exception
             */
            public void clear(Context context) throws Exception {}
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    ProcessAllWindowFunction介绍

    ProcessAllWindowFunction的用法类似,不过它是基于AllWindowedStream,也就是对没有keyBy的数据流直接开窗并调用.process()方法

    stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) )
        .process(new MyProcessAllWindowFunction())
    
    • 1
    • 2

    使用示例

    以使用ProcessWindowFunction为例说明:

      public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 从socket接收数据流
            SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8086);
    
    
            // 将输入数据转换为(key, value)元组
            DataStream<Tuple2<String, Integer>> dataStream = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2 map(String s) throws Exception {
                    int number = Integer.parseInt(s);
                    String key = number % 2 == 0 ? "key1" : "key2";
                    Tuple2 tuple2 = new Tuple2(key, number);
                    return tuple2;
                }
            }).returns(Types.TUPLE(Types.STRING, Types.INT));
    
            // 将数据流按键分组,并定义滚动窗口(处理时间窗口)
            DataStream<String> resultStream = dataStream
                    .keyBy(tuple -> tuple.f0)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    .process(new MyProcessWindowFunction());
    
            resultStream.print();
    
            env.execute("ProcessWindowFunction Example");
        }
    
        public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
            @Override
            public void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) {
                int sum = 0;
                for (Tuple2<String, Integer> element : elements) {
                    sum += element.f1;
                }
    
                out.collect("Key: " + key + ", Window: " + context.window() + ", Sum: " + sum);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    流的合并处理函数

    CoProcessFunction是合并connect两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入

    CoProcessFunction介绍

    调用.process()时,传入一个CoProcessFunction。它需要实现的就是processElement1()、processElement2()两个方法

    CoProcessFunction类具体结构如下:

    /**
     * 用于同时处理两个连接的流
     * 它允许定义自定义处理逻辑,以处理来自两个不同输入流的事件并生成输出
     *
     * @param  第一个输入流的元素类型
     * @param  第二个输入流的元素类型
     * @param  输出元素的类型
     */
    public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
    
        /**
         * 处理第一个输入流的元素
         *
         * @param value 第一个输入流的元素
         * @param ctx 用于访问上下文信息,例如事件时间和状态的Context对象
         * @param out 用于发射输出元素的Collector对象
         * @throws Exception 处理时可能抛出的异常
         */
        public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
    
        /**
         * 处理第二个输入流的元素
         *
         * @param value 第二个输入流的元素
         * @param ctx 用于访问上下文信息,可以使用Context对象来访问事件时间、水位线和状态等上下文信息
         * @param out 用于发射输出元素的Collector对象
         * @throws Exception 处理时可能抛出的异常
         */
        public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
    
        /**
         * 当定时器触发时调用的方法。可以重写这个方法来执行基于时间的操作
         *
         * @param timestamp 触发定时器的时间戳
         * @param ctx 用于访问上下文信息,如事件时间和状态的OnTimerContext对象
         * @param out 用于发射输出元素的Collector对象
         * @throws Exception 处理时可能抛出的异常
         */
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    使用示例

    假设有两个输入流,将这两个流合并计算得到每个key对应的合计,并输出结果流

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<Tuple2<String, Integer>> source1 = env.fromElements(Tuple2.of("key1", 1), Tuple2.of("key2", 4), Tuple2.of("key1", 2));
            DataStreamSource<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("key1", 3), Tuple2.of("key2", 5), Tuple2.of("key2", 6));
            ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connect = source1.connect(source2);
    
            // 进行keyby操作,将key相同数据放到一起
            ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);
    
            /**
             * 对2个流中相同key的值求和
             */
            SingleOutputStreamOperator<String> process = connectKeyby.process(
                    new CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
                        Map<String, Integer> map = new HashMap<>();
    
                        /**
                         * 第一条流的处理逻辑
                         * @param value 第一条流的数据
                         * @param ctx   上下文
                         * @param out   采集器
                         * @throws Exception
                         */
                        @Override
                        public void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                            String key = value.f0;
                            if (!map.containsKey(key)) {
                                // 如果key不存在,则将值直接put进map
                                map.put(key, value.f1);
                            } else {
                                // key存在,则计算:获取上一次put的值 + 本次的值
                                Integer total = map.get(key) + value.f1;
                                map.put(key, total);
                            }
    
                            out.collect("processElement1  key = " + key + " value = " + value + "total = " + map.get(key));
                        }
    
                        /**
                         * 第二条流的处理逻辑
                         * @param value 第二条流的数据
                         * @param ctx   上下文
                         * @param out   采集器
                         * @throws Exception
                         */
                        @Override
                        public void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                            String key = value.f0;
                            if (!map.containsKey(key)) {
                                // 如果key不存在,则将值直接put进map
                                map.put(key, value.f1);
                            } else {
                                // key存在,则计算:获取上一次put的值 + 本次的值
                                Integer total = map.get(key) + value.f1;
                                map.put(key, total);
                            }
                            out.collect("processElement2  key = " + key + " value = " + value + "total = " + map.get(key));
                        }
                    }
            );
    
            process.print();
            env.execute();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    3> processElement1  key = key2 value = (key2,4)total = 4
    4> processElement1  key = key1 value = (key1,1)total = 1
    4> processElement2  key = key1 value = (key1,3)total = 4
    4> processElement1  key = key1 value = (key1,2)total = 6
    3> processElement2  key = key2 value = (key2,5)total = 9
    3> processElement2  key = key2 value = (key2,6)total = 15
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    流的联结处理函数

    JoinFunction 和 ProcessJoinFunction 是 Flink 中用于执行窗口连接操作的两个不同接口

    窗口联结 JoinFunction

    Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键key的数据放在窗口中进行配对处理。

    JoinFunction接口如下:

        /**
         * 联接通过在指定的键上联接两个数据集的元素来组合它们,每对连接元素都调用此函数
         * 
         * 默认情况下,连接严格遵循SQL中 “inner join” 的语义
         *
         * @param  第一个输入中元素的类型
         * @param  第二个输入中元素的类型
         * @param  结果元素的类型
         */
        public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
    
            /**
             * join方法,每对联接的元素调用一次
             *
             * @param first 来自第一个输入的元素
             * @param second 来自第二个输入的元素
             * @return 生成的元素
             */
            OUT join(IN1 first, IN2 second) throws Exception;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    具体语法格式如下:

    /**
     * 1.调用DataStream的.join()方法来合并两条流,得到一个JoinedStreams
     * 2.通过.where()和.equalTo()方法指定两条流中联结的key。注意:两者相同的元素,如果在同一窗口中,才可以匹配起来
     * 3.通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算
     */
    stream1.join(stream2)
            // where()参数是KeySelector键选择器,用来指定第一条流中的key
            .where(<KeySelector>)
            // equalTo()传入KeySelector则指定第二条流中的key
            .equalTo(<KeySelector>)
            // window()传入窗口分配器
            .window(<WindowAssigner>)
            // apply()看作实现一个特殊的窗口函数,只能调用.apply()。传入JoinFunction是一个函数类接口,使用时需要实现内部的.join()方法,方法有两个参数,分别表示两条流中成对匹配的数据。
            .apply(<JoinFunction>)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    示例如下:

     public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 生成数据源1
            DataStreamSource<Tuple2<String, Integer>> streamSource1 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4));
            // 定义 使用 Watermark策略
            SingleOutputStreamOperator<Tuple2<String, Integer>> stream1 = streamSource1
                    .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));
    
    
            // 生成数据源2
            DataStreamSource<Tuple2<String, Integer>> streamSource2 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4), Tuple2.of("d", 5), Tuple2.of("e", 6));
            // 定义 使用 Watermark策略
            SingleOutputStreamOperator<Tuple2<String, Integer>> stream2 = streamSource2
                    .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));
    
            /**
             * 根据keyby的key进行匹配关联
             *
             * 注意:落在同一个时间窗口范围内才能匹配
             */
            DataStream<String> join = stream1.join(stream2)
                    // stream1的keyby
                    .where(r1 -> r1.f0)
                    // stream2的keyby
                    .equalTo(r2 -> r2.f0)
                    // 传入窗口分配器
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    // 传入JoinFunction函数类接口,实现内部的.join()方法,方法有两个参数,分别表示两条流中成对匹配的数据
                    .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
                        /**
                         * 关联上的数据,调用join方法
                         * @param first  stream1的数据
                         * @param second stream2的数据
                         */
                        @Override
                        public String join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws Exception {
                            return "stream1 数据: " + first + " 关联 stream2 数据: " + second;
                        }
                    });
    
            join.print();
            env.execute();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    执行结果如下:

    stream1 数据: (a,1) 关联 stream2 数据: (a,1)
    stream1 数据: (a,1) 关联 stream2 数据: (a,2)
    stream1 数据: (a,2) 关联 stream2 数据: (a,1)
    stream1 数据: (a,2) 关联 stream2 数据: (a,2)
    stream1 数据: (c,4) 关联 stream2 数据: (c,4)
    stream1 数据: (b,3) 关联 stream2 数据: (b,3)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    间隔联结 ProcessJoinFunction

    Interval Join即间隔联结,它是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。

    ProcessJoinFunction接口情况如下 :

    /**
     * 处理两个连接流的关联操作的抽象类
     * 该类允许定义自定义的处理逻辑,以在连接两个流时处理匹配的元素
     *
     * @param  第一个输入流的元素类型
     * @param  第二个输入流的元素类型
     * @param  输出元素的类型
     */
    public interface ProcessJoinFunction<IN1, IN2, OUT> {
    
        /**
         * 处理连接两个流的元素
         *
         * @param left  第一个输入流的元素
         * @param right 第二个输入流的元素
         * @param ctx   用于访问上下文信息的 Context 对象
         * @param out   用于发射输出元素的 Collector 对象
         * @throws Exception 处理时可能抛出的异常
         */
        void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    间隔联结使用语法如下:

    // 第一条流进行KeyedStream
    stream1
        .keyBy(<KeySelector>)
        // 得到KeyedStream之后,调用.intervalJoin()合并两条流,传入一个KeyedStream参数,两者key类型应该一致,最终得到一个IntervalJoin类型
        .intervalJoin(stream2.keyBy(<KeySelector>))
        // 通过.between()方法指定间隔的上下界
        .between(Time.milliseconds(-2), Time.milliseconds(1))
        // 调用.process()方法,定义对匹配数据对的处理操作,传入一个处理函数
        .process (new ProcessJoinFunction<Integer, Integer, String(){
                @Override
                public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
                    out.collect(left + "," + right);
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    使用示例如下:

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 生成数据源1
            DataStreamSource<Tuple2<String, Integer>> streamSource1 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4));
            // 定义 使用 Watermark策略
            SingleOutputStreamOperator<Tuple2<String, Integer>> stream1 = streamSource1
                    .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));
    
    
            // 生成数据源2
            DataStreamSource<Tuple2<String, Integer>> streamSource2 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4), Tuple2.of("d", 5), Tuple2.of("e", 6));
            // 定义 使用 Watermark策略
            SingleOutputStreamOperator<Tuple2<String, Integer>> stream2 = streamSource2
                    .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));
    
    
            // 对2条流分别做keyby,key就是关联条件
            KeyedStream<Tuple2<String, Integer>, String> keyedStream1 = stream1.keyBy(r1 -> r1.f0);
            KeyedStream<Tuple2<String, Integer>, String> keyedStream2 = stream2.keyBy(r2 -> r2.f0);
    
    
            // 执行间隔联结
            keyedStream1.intervalJoin(keyedStream2)
                    .between(Time.seconds(-2), Time.seconds(2))
                    .process(
                            new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
                                /**
                                 * 当两条流数据匹配上时调用这个方法
                                 * @param left  stream1的数据
                                 * @param right stream2的数据
                                 * @param ctx   上下文
                                 * @param out   采集器
                                 * @throws Exception
                                 */
                                @Override
                                public void processElement(Tuple2<String, Integer> left, Tuple2<String, Integer> right, Context ctx, Collector<String> out) throws Exception {
                                    // 关联的数据
                                    out.collect("stream1 数据: " + left + " 关联 stream2 数据: " + right);
                                }
                            })
                    .print();
    
            env.execute();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    stream1 数据: (a,1) 关联 stream2 数据: (a,1)
    stream1 数据: (a,1) 关联 stream2 数据: (a,2)
    stream1 数据: (a,2) 关联 stream2 数据: (a,2)
    stream1 数据: (a,2) 关联 stream2 数据: (a,1)
    stream1 数据: (b,3) 关联 stream2 数据: (b,3)
    stream1 数据: (c,4) 关联 stream2 数据: (c,4)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    迟到数据的处理

    窗口间隔联结处理函数可以实现对迟到数据的处理

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource1 = env.socketTextStream("112.74.96.150", 8086)
                    .map(new MapFunction<String, Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> map(String value) throws Exception {
                            String[] split = value.split(",");
                            return Tuple2.of(split[0], Integer.valueOf(split[1]));
                        }
                    })
                    .assignTimestampsAndWatermarks(
                            WatermarkStrategy
                                    .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                    .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                    );
    
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource2 = env.socketTextStream("112.74.96.150", 8087)
                    .map(new MapFunction<String, Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> map(String value) throws Exception {
                            String[] split = value.split(",");
                            return Tuple2.of(split[0], Integer.valueOf(split[1]));
                        }
                    })
                    .assignTimestampsAndWatermarks(
                            WatermarkStrategy
                                    .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                    .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                    );
    
    
            // 对2条流分别做keyby,key就是关联条件
            KeyedStream<Tuple2<String, Integer>, String> keyedStream1 = streamSource1.keyBy(r1 -> r1.f0);
            KeyedStream<Tuple2<String, Integer>, String> keyedStream2 = streamSource2.keyBy(r2 -> r2.f0);
    
            // 定义 标记操作符的侧面输出
            OutputTag<Tuple2<String, Integer>> keyedStream1OutputTag = new OutputTag<>("keyedStream1", Types.TUPLE(Types.STRING, Types.INT));
            OutputTag<Tuple2<String, Integer>> keyedStream2OutputTag = new OutputTag<>("keyedStream2", Types.TUPLE(Types.STRING, Types.INT));
    
            // 执行间隔联结
            SingleOutputStreamOperator<String> process = keyedStream1.intervalJoin(keyedStream2)
                    // 指定间隔的上界、下界的偏移,负号代表时间往前,正号代表时间往后
                    // 若keyedStream1中某事件时间为5,则其水位线是5-3=2,其上界是 5-2=3 下界是5+2=7 即2-7这个区间能匹配keyedStream2中事件时间是2-7的数据
                    .between(Time.seconds(-2), Time.seconds(2))
                    // 将streamSource1迟到数据,放入侧输出流
                    .sideOutputLeftLateData(keyedStream1OutputTag)
                    // 将streamSource2迟到数据,放入侧输出流
                    .sideOutputRightLateData(keyedStream2OutputTag)
                    // 对匹配数据对的处理操作 只能处理 join上的数据
                    .process(
                            new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
                                /**
                                 * 当两条流数据匹配上时调用这个方法
                                 * @param left  stream1的数据
                                 * @param right stream2的数据
                                 * @param ctx   上下文
                                 * @param out   采集器
                                 * @throws Exception
                                 */
                                @Override
                                public void processElement(Tuple2<String, Integer> left, Tuple2<String, Integer> right, Context ctx, Collector<String> out) throws Exception {
                                    // 进入这个方法,是关联上的数据
                                    out.collect("stream1 数据: " + left + " 关联 stream2 数据: " + right);
                                }
                            });
    
            process.print("主流");
            process.getSideOutput(keyedStream1OutputTag).printToErr("streamSource1迟到数据");
            process.getSideOutput(keyedStream2OutputTag).printToErr("streamSource2迟到数据");
            env.execute();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    1.2条流数据匹配
    若keyedStream1中某事件时间为5,则其水位线是5-3=2,其上界是 5-2=3 下界是5+2=7 即2-7这个区间能匹配keyedStream2中事件时间是2-7的数据

     nc -lk 8086
    key1,5
    
    • 1
    • 2
    nc -lk 8087
    key1,3
    key1,7
    key1,8
    
    • 1
    • 2
    • 3
    • 4
    主流> stream1 数据: (key1,5) 关联 stream2 数据: (key1,3)
    主流> stream1 数据: (key1,5) 关联 stream2 数据: (key1,7)
    
    • 1
    • 2

    2.keyedStream2迟到数据
    此时,keyedStream1中水位线是5-3=2,keyedStream2中水位线是8-3=5,多并行度下水位线取最小,即取水位线2

    在keyedStream2输入事件时间1

    nc -lk 8087
    key1,3
    key1,7
    key1,8
    key1,1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    事件时间1 < 水位线2,且事件时间1被keyedStream1的事件时间5的上界5-2=3与下界5+2=7不包含,即数据不匹配且streamSource2数据迟到

    streamSource2迟到数据> (key1,1)
    
    • 1

    3.keyedStream1迟到数据

    keyedStream1输入事件时间7

    nc -lk 8086
    key1,5
    key1,7
    
    • 1
    • 2
    • 3

    此时匹配到streamSource2中的8、7

    主流> stream1 数据: (key1,7) 关联 stream2 数据: (key1,8)
    主流> stream1 数据: (key1,7) 关联 stream2 数据: (key1,7)
    
    • 1
    • 2

    此时,keyedStream1的水位线是7-3=4,keyedStream2的水位线是8-3=5,多并行度下水位线取最小,即取水位线4

    keyedStream1输入事件时间3

     nc -lk 8086
    key1,5
    key1,7
    key1,3
    
    • 1
    • 2
    • 3
    • 4

    事件时间3 < 水位线4,且事件时间3被keyedStream2的事件时间3的上界3-2=1与下界3+2=5包含,即数据匹配且streamSource1数据迟到

    streamSource1迟到数据> (key1,3)
    
    • 1

    广播流处理函数

    用于连接一个主数据流和多个广播数据流。可以实现processElement 方法来处理主数据流的每个元素,同时可以处理广播数据流,通常用于数据广播和连接。

    广播流处理函数有2个:

    1.BroadcastProcessFunction:

    广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。它是一个未keyBy的普通DataStream与一个广播流BroadcastStream做连接之后的产物

    2.KeyedBroadcastProcessFunction:

    按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。它是一个KeyedStream与广播流做连接

    KeyedBroadcastProcessFunction

    /**
     * @param   输入键控流的键类型
     * @param  键控 (非广播) 端的输入类型
     * @param  广播端的输入类型
     * @param  运算符的输出类型
     */
    public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>
            extends BaseBroadcastProcessFunction {
    
        private static final long serialVersionUID = -2584726797564976453L;
    
        /**
         * (非广播) 的键控流中的每个元素调用此方法
         *
         * @param value 流元素
         * @param ctx   允许查询元素的时间戳、查询当前处理/事件时间以及以只读访问迭代广播状态
         * @param out   将结果元素发出
         */
        public abstract void processElement(
                final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
    
        /**
         * 针对broadcast stream中的每个元素调用该方法
         *
         * @param value stream元素
         * @param ctx   上下文 许查询元素的时间戳、查询当前处理/事件时间和更新广播状态
         * @param out   将结果元素发射到
         */
        public abstract void processBroadcastElement(
                final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
    
        /**
         * 当使用TimerService设置的计时器触发时调用
         *
         * @param timestamp 触发计时器的时间戳
         * @param ctx       上下文
         * @param out       返回结果值的收集器
         */
        public void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<OUT> out)
                throws Exception {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    BroadcastProcessFunction

    BroadcastProcessFunction与KeyedBroadcastProcessFunction类似,不过它是基于AllWindowedStream,也就是对没有keyBy的数据流直接开窗并调用.process()方法

        public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
    
            public abstract void processElement( final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
    
            public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    使用示例

    以使用KeyedBroadcastProcessFunction为例说明:

    public class KeyedBroadcastProcessFunctionExample {
    
        /**
         * 主流 数据对象
         */
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class MainRecord {
            private String key;
            private int value;
        }
    
        /**
         * 广播流 数据对象
         */
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class BroadcastRecord {
            private String configKey;
            private int configValue;
        }
    
        /**
         * 结果 数据对象
         */
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class ResultRecord {
            private String key;
            private int result;
        }
    
    
        // 使用给定的名称和给定的类型信息新建一个MapStateDescriptor
        static MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("broadcastState", String.class, Integer.class);
    
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 创建主数据流
            DataStream<MainRecord> mainStream = env.fromElements(
                    new MainRecord("A", 10),
                    new MainRecord("B", 20),
                    new MainRecord("A", 30)
            );
    
            // 创建广播数据流
            DataStream<BroadcastRecord> broadcastStream = env.fromElements(
                    new BroadcastRecord("config", 5)
            );
    
            // 将广播数据流转化为 BroadcastStream
            BroadcastStream<BroadcastRecord> broadcast = broadcastStream.broadcast(mapStateDescriptor);
    
            // 使用 KeyedBroadcastProcessFunction 连接主数据流和广播数据流
            DataStream<ResultRecord> resultStream = mainStream
                    .keyBy(new MainRecordKeySelector())
                    .connect(broadcast)
                    .process(new MyKeyedBroadcastProcessFunction());
    
            resultStream.print();
    
            env.execute("KeyedBroadcastProcessFunction Example");
        }
    
    
        /**
         * 使用提供的键对其运算符状态进行分区
         */
        public static class MainRecordKeySelector implements KeySelector<MainRecord, String> {
            @Override
            public String getKey(MainRecord mainRecord) {
                return mainRecord.getKey();
            }
        }
    
        /**
         *
         */
        public static class MyKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction<String, MainRecord, BroadcastRecord, ResultRecord> {
    
            @Override
            public void processBroadcastElement(BroadcastRecord value, Context ctx, Collector<ResultRecord> out) throws Exception {
                // 通过上下文获取广播状态
                BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
                // 处理广播数据流中的每个元素,更新广播状态
                broadcastState.put(value.getConfigKey(), value.getConfigValue());
            }
    
            @Override
            public void processElement(MainRecord value, ReadOnlyContext ctx, Collector<ResultRecord> out) throws Exception {
                // 在 processElement 中访问广播状态
                ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
    
                // 从广播状态中获取配置值
                Integer configValue = broadcastState.get("config");
                // 注意:刚启动时,可能是数据流的第1 2 3...条数据先来 不是广播流先来
                if (configValue == null) {
                    return;
                }
    
                System.out.println(String.format("主数据流的Key: %s, value: %s, 广播更新结果: %s", value.key, value.value, value.value + configValue));
                // 根据配置值和主数据流中的元素执行处理逻辑
                int result = value.getValue() + configValue;
    
                // 发出结果记录
                out.collect(new ResultRecord(value.getKey(), result));
            }
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    主数据流的Key: A, value: 10, 广播更新结果: 15
    主数据流的Key: B, value: 20, 广播更新结果: 25
    2> KeyedBroadcastProcessFunctionExample.ResultRecord(key=B, result=25)
    7> KeyedBroadcastProcessFunctionExample.ResultRecord(key=A, result=15)
    主数据流的Key: A, value: 30, 广播更新结果: 35
    7> KeyedBroadcastProcessFunctionExample.ResultRecord(key=A, result=35)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    Vue使用markdown编辑器:
    一篇搞懂进阶集合使用技巧
    python 多进程windows报错 linux不报错 TypeError: cannot pickle ‘_thread.lock‘ object
    2D物理系统——物理材质 & 恒定力
    MySQL:事务1(锁与隔离级别)
    简述 Mock 接口测试
    JS基本数据类型中null和undefined区别及应用
    设计模式 - 行为型考点模式:责任链模式(概述 | 案例实现 | 优缺点 | 使用场景)
    P2458 [SDOI2006]保安站岗
    VUE 组件传值 $emit之 $event和arguments
  • 原文地址:https://blog.csdn.net/qq_38628046/article/details/131466269