• flink多流操作(connect cogroup union broadcast)


    1 分流操作

    SingleOutputStreamOperator<Student> mainStream = students.process(new ProcessFunction<Student, Student>() {
        @Override
        public void processElement(Student student, ProcessFunction<Student, Student>.Context ctx, Collector<Student> collector) throws Exception {
            if (student.getGender().equals("m")) {
                // 输出到测流
                ctx.output(maleOutputTag, student);
            } else if (student.getGender().equals("f")) {
                // 输出到测流
                ctx.output(femaleOutputTag, student.toString());
            } else {
                // 在主流中输出
                collector.collect(student);
            }
        }
    });
    
    SingleOutputStreamOperator<Student> side1 = mainStream.getSideOutput(maleOutputTag);
    SingleOutputStreamOperator<String> side2 = mainStream.getSideOutput(femaleOutputTag);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    2 connect连接操作

    2.1 connect 连接(DataStream,DataStream→ConnectedStreams)

    connect 翻译成中文意为连接,可以将两个数据类型一样也可以类型不一样 DataStream 连接成一个新 的 ConnectedStreams。需要注意的是,connect 方法与 union 方法不同,虽然调用 connect 方法将两个 流连接成一个新的 ConnectedStreams,但是里面的两个流依然是相互独立的,这个方法最大的好处是 可以让两个流共享 State 状态。

    // 使用 fromElements 创建两个 DataStream
    DataStreamSource<String> word = env.fromElements("a", "b", "c", "d");
    DataStreamSource<Integer> num = env.fromElements(1, 3, 5, 7, 9);
    
    // 将两个 DataStream 连接到一起
    ConnectedStreams<String, Integer> connected = word.connect(num);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.2 coMap(ConnectedStreams → DataStream)

    对 ConnectedStreams 调用 map 方法时需要传入 CoMapFunction 函数;
    该接口需要指定 3 个泛型

    1. 第一个输入 DataStream 的数据类型
    2. 第二个输入 DataStream 的数据类型
    3. 返回结果的数据类型。
      该接口需要重写两个方法:
    4. map1 方法,是对第 1 个流进行 map 的处理逻辑。
    5. 2 map2 方法,是对 2 个流进行 map 的处理逻辑

    这两个方法必须是相同的返回值类型。

    //将两个 DataStream 连接到一起
    
    ConnectedStreams<String, Integer> wordAndNum = word.connect(num);
    
    // 对 ConnectedStreams 中两个流分别调用个不同逻辑的 map 方法
    DataStream<String> result = wordAndNum.map(new CoMapFunction<String, Integer, String>() {
        @Override
        public String map1(String value) throws Exception {
            // 第一个 map 方法是将第一个流的字符变大写
            return value.toUpperCase();
        }
    
        @Override
        public String map2(Integer value) throws Exception {
            // 第二个 map 方法将是第二个流的数字乘以 10 并转成 String
            return String.valueOf(value * 10);
        }
    });
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2.3 coFlatMap(ConnectedStreams → DataStream)

    对 ConnectedStreams 调用 flatMap 方法。调用 flatMap 方法,传入的 Function 是 CoFlatMapFunction;
    这个接口要重写两个方法:

    1. flatMap1 方法,是对第 1 个流进行 flatMap 的处理逻辑;
    2. flatMap2 方法,是对 2 个流进行 flatMap 的处理逻辑;

    这两个方法都必须返回是相同的类型。

    // 使用 fromElements 创建两个 DataStream
    DataStreamSource<String> word = env.fromElements("a b c", "d e f");
    DataStreamSource<String> num = env.fromElements("1,2,3", "4,5,6");
    
    // 将两个 DataStream 连接到一起
    ConnectedStreams<String, String> connected = word.connect(num);
    
    // 对 ConnectedStreams 中两个流分别调用个不同逻辑的 flatMap 方法
    DataStream<String> result = connected.flatMap(new CoFlatMapFunction<String, String, String>() {
        @Override
        public void flatMap1(String value, Collector<String> out) throws Exception {
            String[] words = value.split(" ");
            for (String w : words) {
                out.collect(w);
            }
        }
    
        @Override
        public void flatMap2(String value, Collector<String> out) throws Exception {
            String[] nums = value.split(",");
            for (String n : nums) {
                out.collect(n);
            }
        }
    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    3 union操作

    3.1 union 合并(DataStream * → DataStream)

    该方法可以将两个或者多个数据类型一致的 DataStream 合并成一个 DataStream。DataStream union(DataStream… streams)可以看出 DataStream 的 union 方法的参数为可变参数,即可以合并两 个或多个数据类型一致的 DataStream,connect 不要求两个流的类型一致,但union必须一致。

    下面的例子是使用 fromElements 生成两个 DataStream,一个是基数的,一个是偶数的,然后将两个 DataStream 合并成一个 DataStream。

    // 使用 fromElements 创建两个 DataStream
    DataStreamSource<Integer> odd = env.fromElements(1, 3, 5, 7, 9);
    DataStreamSource<Integer> even = env.fromElements(2, 4, 6, 8, 10);
    
    // 将两个 DataStream 合并到一起
    DataStream<Integer> result = odd.union(even);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    4 coGroup 协同分组

    coGroup 本质上是join 算子的底层算子;功能类似;可以用cogroup来实现join left join full join的功能。 代码结构如下:

    DataStreamSource<String> stream1 = env.fromElements("1,aa,m,18", "2,bb,m,28", "3,cc,f,38");
    DataStreamSource<String> stream2 = env.fromElements("1:aa:m:18", "2:bb:m:28", "3:cc:f:38");
    
    DataStream<String> res = stream1
        .coGroup(stream2)
        .where(new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        })
        .equalTo(new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        })
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .apply(new CoGroupFunction<String, String, String>() {
            @Override
            public void coGroup(Iterable<String> first, Iterable<String> second, Collector<String> out) throws Exception {
                // 这里添加具体的 coGroup 处理逻辑
               // 这两个迭代器,是这5s的数据中的某一组,id = 1
            }
        });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    4.1 coGroup 实现 left join操作

    package batch;
    
    import org.apache.flink.api.common.functions.CoGroupFunction;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    
    public class coGrouptest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    //        id name
            DataStreamSource<String> stream1 = env.socketTextStream("localhost", 9998);
    //        id age
            DataStreamSource<String> stream2 = env.socketTextStream("localhost", 9999);
    // nc -lp 9999
    // nc -lp 9998
            SingleOutputStreamOperator<Tuple2<String, String>> s1 = stream1.map(s -> {
                String[] arr = s.split(",");
                return Tuple2.of(arr[0], arr[1]);
            }).returns(new TypeHint<Tuple2<String, String>>() {
            });
    
            SingleOutputStreamOperator<Tuple2<String, String>> s2 = stream2.map(s -> {
                String[] arr = s.split(",");
                return Tuple2.of(arr[0], arr[1]);
            }).returns(new TypeHint<Tuple2<String, String>>() {
            });
    
    
            DataStream<Tuple3<String, String, String>> out = s1.coGroup(s2).where(tp -> tp.f0)  //左的f0 id 字段
                    .equalTo(tp -> tp.f0)  //又的f0 id 字段
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
                    .apply(new CoGroupFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple3<String, String, String>>() {
                        @Override
                        public void coGroup(Iterable<Tuple2<String, String>> iterable, Iterable<Tuple2<String, String>> iterable1, Collector<Tuple3<String, String, String>> out) throws Exception {
                            for (Tuple2<String, String> t1 : iterable) {
                                boolean t2isnull = false;
                                for (Tuple2<String, String> t2 : iterable1) {
                                    out.collect(new Tuple3<String, String, String>(t1.f0,t1.f1,t2.f1));
                                    t2isnull = true;
                                }
                                if(!t2isnull){
                                    out.collect(new Tuple3<String, String, String>(t1.f0,t1.f1,null));
                                }
                            }
    
                        }
                    });
            out.print();
    
            env.execute();
    
    
    }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    5 join

    用于关联两个流(类似于 sql 中 join),需要指定 join,需要在窗口中进行关联后的逻辑计算。
    只能支持inner join 不支持 左右和全连接

    stream.join(otherStream)
          .where(<KeySelector>)
          .equalTo(<KeySelector>)
          .window(<WindowAssigner>)
          .apply(<JoinFunction>);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    实例:

    SingleOutputStreamOperator<Student> s1;
    SingleOutputStreamOperator<StuInfo> s2;
    
    // join 两个流,此时并没有具体的计算逻辑
    JoinedStreams<Student, StuInfo> joined = s1.join(s2);
    
    // 对 join 流进行计算处理
    DataStream<String> stream = joined
            // where 流 1 的某字段 equalTo 流 2 的某字段
            .where(s -> s.getId()).equalTo(s -> s.getId())
            // join 实质上只能在窗口中进行
            .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
            // 对窗口中满足关联条件的数据进行计算
            .apply(new JoinFunction<Student, StuInfo, String>() {
                // 这边传入的两个流的两条数据,是能够满足关联条件的
                @Override
                public String join(Student first, StuInfo second) throws Exception {
                    // first: 左流数据 ; second: 右流数据
                    // 计算逻辑
                    // 返回结果
                    return null;
                }
            });
    
    // 对 join 流进行计算处理
    joined.where(s -> s.getId()).equalTo(s -> s.getId())
            .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
            .apply(new FlatJoinFunction<Student, StuInfo, String>() {
                @Override
                public void join(Student first, StuInfo second, Collector<String> out) throws Exception {
                    out.collect();
                }
            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    6 broadcast 广播

    Broadcast State 是 Flink 1.5 引入的新特性。 在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到 另一个数据流的计算中 。
    在这里插入图片描述

    6.1 API 介绍 , 核心要点

    • 将需要广播出去的流,调用 broadcast 方法进行广播转换,得到广播流 BroadCastStream
    • 然后在主流上调用 connect 算子,来连接广播流(以实现广播状态的共享处理)
    • 在连接流上调用 process 算子,就会在同一个 ProcessFunciton 中提供两个方法分别对两个流进行 处理,并在这个 ProcessFunction 内实现“广播状态”的共享
    public class _16_BroadCast_Demo {
        public static void main(String[] args) throws Exception {
            Configuration configuration = new Configuration();
            configuration.setInteger("rest.port", 8822);
            
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
            env.setParallelism(1);
            
            // id,eventId
            DataStreamSource<String> stream1 = env.socketTextStream("localhost", 9998);
            SingleOutputStreamOperator<Tuple2<String, String>> s1 = stream1.map(s -> {
                String[] arr = s.split(",");
                return Tuple2.of(arr[0], arr[1]);
            }).returns(new TypeHint<Tuple2<String, String>>() { });
            
            // id,age,city
            DataStreamSource<String> stream2 = env.socketTextStream("localhost", 9999);
            SingleOutputStreamOperator<Tuple3<String, String, String>> s2 = stream2.map(s -> {
                String[] arr = s.split(",");
                return Tuple3.of(arr[0], arr[1], arr[2]);
            }).returns(new TypeHint<Tuple3<String, String, String>>() { });
            
            /**
             * 案例背景:
             * 流 1: 用户行为事件流(持续不断,同一个人也会反复出现,出现次数不定
             * 流 2: 用户维度信息(年龄,城市),同一个人的数据只会来一次,来的时间也不定 (作为广播流)
             * 需要加工流 1,把用户的维度信息填充好,利用广播流来实现
             */
            
            // 将字典数据所在流: s2 , 转成 广播流
            MapStateDescriptor<String, Tuple2<String, String>> userInfoStateDesc =
                    new MapStateDescriptor<>("userInfoStateDesc", TypeInformation.of(String.class),
                            TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));
            BroadcastStream<Tuple3<String, String, String>> s2BroadcastStream = s2.broadcast(userInfoStateDesc);
            
            // 哪个流处理中需要用到广播状态数据,就要 去 连接 connect 这个广播流
            SingleOutputStreamOperator<String> connected = s1.connect(s2BroadcastStream)
                    .process(new BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>() {
                        /**BroadcastState> broadcastState;*/
                        
                        /**
                         * 本方法,是用来处理 主流中的数据(每来一条,调用一次)
                         * @param element 左流(主流)中的一条数据
                         * @param ctx 上下文
                         * @param out 输出器
                         * @throws Exception
                         */
                        @Override
                        public void processElement(Tuple2<String, String> element,
                                                   BroadcastProcessFunction<Tuple2<String, String>,
                                                           Tuple3<String, String, String>, String>.ReadOnlyContext ctx,
                                                   Collector<String> out) throws Exception {
                            // 通过 ReadOnlyContext ctx 取到的广播状态对象,是一个 “只读 ” 的对象;
                            ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoStateDesc);
                            
                            if (broadcastState != null) {
                                Tuple2<String, String> userInfo = broadcastState.get(element.f0);
    						out.collect(element.f0 + "," + element.f1 + "," + (userInfo == null ? null : userInfo.f0) + "," + (userInfo == null ? null : userInfo.f1));
    						} else { out.collect(element.f0 + "," + element.f1 + "," + null + "," + null);
    						 }
    				 }
    				 /**** 
    				 * @param element 广播流中的一条数据 
    				 * @param ctx 上下文 
    				 * @param out 输出器 
    				 * @throws Exception 
    				 */ 
    				@Override 
    				public void processBroadcastElement(Tuple3<String, String, String> element, 
    				                                    BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>.Context ctx, 
    				                                    Collector<String> out) throws Exception { 
    				    // 从上下文中,获取广播状态对象(可读可写的状态对象) 
    				    BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoStateDesc); 
    				    // 然后将获得的这条广播流数据,拆分后,装入广播状态 
    				    broadcastState.put(element.f0, Tuple2.of(element.f1, element.f2)); 
    				}
    				
    				resultStream.print(); 
    				env.execute(); 
    				}
    				}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
  • 相关阅读:
    Vue render 函数
    在portacle中获取EMACS Lisp帮助文档的方法(Win11)
    haproxy搭建web集群
    NIO通信实现
    开源AI漫画生成器-AI Comic Factory
    haas506 2.0开发教程-高级组件库-modem.voiceCall(仅支持2.2以上版本)
    牛客网刷题
    【CTO变形记】有序定无序—为什么越努力,越无力
    【Linux网络编程】epoll进阶之水平模式和边沿模式
    XSSFWorkbook Excel导出导入
  • 原文地址:https://blog.csdn.net/Direction_Wind/article/details/136185832