• Flink从入门到放弃—Stream API—Join实现(即多流操作)


    前言

    记不住那就写出来吧,感觉自己早晚一天会变成嘴炮。

    在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:1、join();2、coGroup();3、intervalJoin(), 另外其实还有两个算子,connect()和union()

    • join()
    • coGroup()
    • intervalJoin()
    • connect()

    具体代码请参考:
    github仓库地址

    join

    join() 算子提供的语义为"Window join",即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。以下示例以10秒滚动窗口,将两个流通过商品 ID 关联,取得订单流中的商品名称相关字段。

    先上代码

    package com.flink.datastream.join;
    
    import com.flink.datastream.join.entity.ClickLog;
    import com.flink.datastream.join.entity.OrderLog;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.api.common.functions.JoinFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    import java.util.Arrays;
    
    /**
     * @author DeveloperZJQ
     * @since 2022-11-7
     * join() 的语义即 Window join
     */
    public class WindowJoin {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = JoinSource.env();
            DataStream<ClickLog> clickLogDataStream = JoinSource.socketSource(env);
            clickLogDataStream.print("clickLog:");
            DataStream<OrderLog> orderLogDataStream = JoinSource.socketAnotherSource(env);
            orderLogDataStream.print("orderLog:");
    
            DataStream<String> joinStream = clickLogDataStream
                    .join(orderLogDataStream)
                    .where(ClickLog::getGoodId)
                    .equalTo(OrderLog::getGoodId)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                    .apply(
                            (JoinFunction<ClickLog, OrderLog, String>)
                                    (accessRecord, orderRecord) -> StringUtils.join(Arrays.asList(
                                            accessRecord.getGoodId(),
                                            orderRecord.getGoodName()
                                    ), '\t'));
    
            joinStream.print().setParallelism(1);
    
            env.execute(WindowJoin.class.getSimpleName());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    coGroup

    利用 coGroup() 算子实现 left/right outer join,它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。

    先上代码

    package com.flink.datastream.join;
    
    import com.flink.datastream.join.entity.ClickLog;
    import com.flink.datastream.join.entity.OrderLog;
    import org.apache.flink.api.common.functions.CoGroupFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    /**
     * coGroup
     *
     * @author DeveloperZJQ
     * @since 2022-11-10
     */
    public class CoGroupsJoin {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = JoinSource.env();
            DataStream<ClickLog> clickLogDataStream = JoinSource.socketSource(env);
            DataStream<OrderLog> orderLogDataStream = JoinSource.socketAnotherSource(env);
    
            DataStream<Tuple2<String, String>> coGroupStream = clickLogDataStream
                    .coGroup(orderLogDataStream)
                    .where(ClickLog::getGoodId)
                    .equalTo(OrderLog::getGoodId)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                    .apply((CoGroupFunction<ClickLog, OrderLog, Tuple2<String, String>>) (accessRecords, orderRecords, collector) -> {
                        for (ClickLog accessRecord : accessRecords) {
                            boolean isMatched = false;
                            for (OrderLog orderRecord : orderRecords) {
                                // 右流中有对应的记录
                                collector.collect(new Tuple2<>(accessRecord.getGoodId(), orderRecord.getGoodName()));
                                isMatched = true;
                            }
                            if (!isMatched) {
                                // 右流中没有对应的记录
                                collector.collect(new Tuple2<>(accessRecord.getGoodId(), null));
                            }
                        }
                    });
    
            coGroupStream.print().setParallelism(1);
    
            env.execute(CoGroupsJoin.class.getSimpleName());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    intervalJoin

    join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:
    right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

    interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。
    示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。

    interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。

    先上代码

    package com.flink.datastream.join;
    
    import com.flink.datastream.join.entity.ClickLog;
    import com.flink.datastream.join.entity.OrderLog;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    import java.util.Arrays;
    
    /**
     * interval join
     *
     * @author DeveloperZJQ
     * @since 2022-11-10
     */
    public class IntervalJoin {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = JoinSource.env();
            DataStream<ClickLog> clickLogDataStream = JoinSource.socketSource(env);
            DataStream<OrderLog> orderLogDataStream = JoinSource.socketAnotherSource(env);
    
            SingleOutputStreamOperator<String> intervalStream = clickLogDataStream
                    .keyBy(ClickLog::getGoodId)
                    .intervalJoin(orderLogDataStream.keyBy(OrderLog::getGoodId))
                    .between(Time.seconds(-30), Time.seconds(30))
                    .process(new ProcessJoinFunction<>() {
                        @Override
                        public void processElement(ClickLog accessRecord, OrderLog orderRecord, Context context, Collector<String> collector) throws Exception {
                            collector.collect(StringUtils.join(Arrays.asList(
                                    accessRecord.getGoodId(),
                                    orderRecord.getGoodName()
                            ), '\t'));
                        }
                    });
            intervalStream.print().setParallelism(1);
    
            env.execute(IntervalJoin.class.getSimpleName());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    connect

    先上代码

    package com.flink.datastream.join;
    
    import com.flink.datastream.join.entity.ClickLog;
    import com.flink.datastream.join.entity.OrderLog;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.CoMapFunction;
    
    /**
     * connect
     *
     * @author DeveloperZJQ
     * @since 2022/11/12
     */
    public class ConnectStream {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = JoinSource.env();
            DataStream<ClickLog> clickLogDataStream = JoinSource.socketSource(env);
            clickLogDataStream.print("clickLog:");
            DataStream<OrderLog> orderLogDataStream = JoinSource.socketAnotherSource(env);
            orderLogDataStream.print("orderLog:");
    
            SingleOutputStreamOperator<Tuple2<String, String>> connectStream = clickLogDataStream.connect(orderLogDataStream).map(new CoMapFunction<ClickLog, OrderLog, Tuple2<String, String>>() {
                @Override
                public Tuple2<String, String> map1(ClickLog clickLog) {
                    return Tuple2.of(clickLog.getGoodId(), clickLog.getSessionId());
                }
    
                @Override
                public Tuple2<String, String> map2(OrderLog orderLog) {
                    return Tuple2.of(orderLog.getGoodId(), orderLog.getGoodName());
                }
            });
    
            connectStream.print().setParallelism(1);
    
            env.execute(WindowJoin.class.getSimpleName());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    可参考

    Flink算子使用方法及实例演示:union和connect
    Flink 双流 Join 的3种操作示例
    Flink进阶(三):双流connect的用法
    (十)Flink Datastream API 编程指南 算子-3 window join和interval join

    另外需要更加深入了解的的可以留言,我会抽时间回复大家。

  • 相关阅读:
    2022到2023基于java+微信小程序毕业设计课题参考
    ByteX-shrink_r源码解析
    Python OpenCV实现鼠标绘制矩形框和多边形
    【图说区块链】什么是企业级区块链?
    Vue-MVVM数据双向绑定响应式原理之Object.defineProperty
    leetcode 1222. 可以攻击国王的皇后(每日一题)
    mannose-CHO|甘露糖-醛基|甘露糖-聚乙二醇-醛基|醛基-PEG-甘露糖
    【LeetCode每日一题合集】2023.8.28-2023.9.3(到家的最少跳跃次数)
    JS笔记:方法两次调用,执行不同分支(公共变量,闭包,类三种方法实现)
    代码随想录Day02 数组基础2 leetcode T977有序数组的平方, T209 长度最小的子数组,T59 螺旋矩阵II
  • 原文地址:https://blog.csdn.net/u010772882/article/details/127493177