记不住那就写出来吧,感觉自己早晚一天会变成嘴炮。
在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:1、join();2、coGroup();3、intervalJoin(), 另外其实还有两个算子,connect()和union()
具体代码请参考:
github仓库地址
join() 算子提供的语义为"Window join",即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。以下示例以10秒滚动窗口,将两个流通过商品 ID 关联,取得订单流中的商品名称相关字段。
先上代码
package com.flink.datastream.join;
import com.flink.datastream.join.entity.ClickLog;
import com.flink.datastream.join.entity.OrderLog;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Arrays;
/**
* @author DeveloperZJQ
* @since 2022-11-7
* join() 的语义即 Window join
*/
public class WindowJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = JoinSource.env();
DataStream<ClickLog> clickLogDataStream = JoinSource.socketSource(env);
clickLogDataStream.print("clickLog:");
DataStream<OrderLog> orderLogDataStream = JoinSource.socketAnotherSource(env);
orderLogDataStream.print("orderLog:");
DataStream<String> joinStream = clickLogDataStream
.join(orderLogDataStream)
.where(ClickLog::getGoodId)
.equalTo(OrderLog::getGoodId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(
(JoinFunction<ClickLog, OrderLog, String>)
(accessRecord, orderRecord) -> StringUtils.join(Arrays.asList(
accessRecord.getGoodId(),
orderRecord.getGoodName()
), '\t'));
joinStream.print().setParallelism(1);
env.execute(WindowJoin.class.getSimpleName());
}
}
利用 coGroup() 算子实现 left/right outer join,它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。
先上代码
package com.flink.datastream.join;
import com.flink.datastream.join.entity.ClickLog;
import com.flink.datastream.join.entity.OrderLog;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* coGroup
*
* @author DeveloperZJQ
* @since 2022-11-10
*/
public class CoGroupsJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = JoinSource.env();
DataStream<ClickLog> clickLogDataStream = JoinSource.socketSource(env);
DataStream<OrderLog> orderLogDataStream = JoinSource.socketAnotherSource(env);
DataStream<Tuple2<String, String>> coGroupStream = clickLogDataStream
.coGroup(orderLogDataStream)
.where(ClickLog::getGoodId)
.equalTo(OrderLog::getGoodId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply((CoGroupFunction<ClickLog, OrderLog, Tuple2<String, String>>) (accessRecords, orderRecords, collector) -> {
for (ClickLog accessRecord : accessRecords) {
boolean isMatched = false;
for (OrderLog orderRecord : orderRecords) {
// 右流中有对应的记录
collector.collect(new Tuple2<>(accessRecord.getGoodId(), orderRecord.getGoodName()));
isMatched = true;
}
if (!isMatched) {
// 右流中没有对应的记录
collector.collect(new Tuple2<>(accessRecord.getGoodId(), null));
}
}
});
coGroupStream.print().setParallelism(1);
env.execute(CoGroupsJoin.class.getSimpleName());
}
}
join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。
示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。
interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。
先上代码
package com.flink.datastream.join;
import com.flink.datastream.join.entity.ClickLog;
import com.flink.datastream.join.entity.OrderLog;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* interval join
*
* @author DeveloperZJQ
* @since 2022-11-10
*/
public class IntervalJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = JoinSource.env();
DataStream<ClickLog> clickLogDataStream = JoinSource.socketSource(env);
DataStream<OrderLog> orderLogDataStream = JoinSource.socketAnotherSource(env);
SingleOutputStreamOperator<String> intervalStream = clickLogDataStream
.keyBy(ClickLog::getGoodId)
.intervalJoin(orderLogDataStream.keyBy(OrderLog::getGoodId))
.between(Time.seconds(-30), Time.seconds(30))
.process(new ProcessJoinFunction<>() {
@Override
public void processElement(ClickLog accessRecord, OrderLog orderRecord, Context context, Collector<String> collector) throws Exception {
collector.collect(StringUtils.join(Arrays.asList(
accessRecord.getGoodId(),
orderRecord.getGoodName()
), '\t'));
}
});
intervalStream.print().setParallelism(1);
env.execute(IntervalJoin.class.getSimpleName());
}
}
先上代码
package com.flink.datastream.join;
import com.flink.datastream.join.entity.ClickLog;
import com.flink.datastream.join.entity.OrderLog;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/**
* connect
*
* @author DeveloperZJQ
* @since 2022/11/12
*/
public class ConnectStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = JoinSource.env();
DataStream<ClickLog> clickLogDataStream = JoinSource.socketSource(env);
clickLogDataStream.print("clickLog:");
DataStream<OrderLog> orderLogDataStream = JoinSource.socketAnotherSource(env);
orderLogDataStream.print("orderLog:");
SingleOutputStreamOperator<Tuple2<String, String>> connectStream = clickLogDataStream.connect(orderLogDataStream).map(new CoMapFunction<ClickLog, OrderLog, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map1(ClickLog clickLog) {
return Tuple2.of(clickLog.getGoodId(), clickLog.getSessionId());
}
@Override
public Tuple2<String, String> map2(OrderLog orderLog) {
return Tuple2.of(orderLog.getGoodId(), orderLog.getGoodName());
}
});
connectStream.print().setParallelism(1);
env.execute(WindowJoin.class.getSimpleName());
}
}
Flink算子使用方法及实例演示:union和connect
Flink 双流 Join 的3种操作示例
Flink进阶(三):双流connect的用法
(十)Flink Datastream API 编程指南 算子-3 window join和interval join
另外需要更加深入了解的的可以留言,我会抽时间回复大家。