上节我们完成了如下的内容:
之前已经介绍过,但是防止大家没看到,这里再简单介绍以下。
Flink CEP(Complex Event Processing)是Apache Flink提供的一个扩展库,用于实时复杂事件处理。通过Flink CEP,开发者可以从流数据中识别出特定的事件模式。这在欺诈检测、网络安全、实时监控、物联网等场景中非常有用。
Flink CEP的核心是通过定义事件模式,从流中检测复杂事件序列。
具体来说,CEP允许用户:
开发Flink CEP应用的基本步骤包括:
定义事件流:创建一个DataStream,表示原始的事件流。
定义事件模式:使用Flink CEP的API定义事件模式,例如连续事件、迟到事件等。
将模式应用到流中:将定义好的模式应用到事件流上,生成模式流PatternStream。
提取匹配事件:使用select函数提取匹配模式的事件,并定义如何处理这些事件。
业务上需要找出24小时内,至少5次有效交易的用户。
数据源如下:
new CepActiveUserBean("100XX", 0.0D, 1597905234000L),
new CepActiveUserBean("100XX", 100.0D, 1597905235000L),
new CepActiveUserBean("100XX", 200.0D, 1597905236000L),
new CepActiveUserBean("100XX", 300.0D, 1597905237000L),
new CepActiveUserBean("100XX", 400.0D, 1597905238000L),
new CepActiveUserBean("100XX", 500.0D, 1597905239000L),
new CepActiveUserBean("101XX", 0.0D, 1597905240000L),
new CepActiveUserBean("101XX", 100.0D, 1597905241000L)
package icu.wzk;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class FlinkCepActiveUser {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<CepActiveUserBean> data = env.fromElements(
new CepActiveUserBean("100XX", 0.0D, 1597905234000L),
new CepActiveUserBean("100XX", 100.0D, 1597905235000L),
new CepActiveUserBean("100XX", 200.0D, 1597905236000L),
new CepActiveUserBean("100XX", 300.0D, 1597905237000L),
new CepActiveUserBean("100XX", 400.0D, 1597905238000L),
new CepActiveUserBean("100XX", 500.0D, 1597905239000L),
new CepActiveUserBean("101XX", 0.0D, 1597905240000L),
new CepActiveUserBean("101XX", 100.0D, 1597905241000L)
);
SingleOutputStreamOperator<CepActiveUserBean> watermark = data
.assignTimestampsAndWatermarks(new WatermarkStrategy<CepActiveUserBean>() {
@Override
public WatermarkGenerator<CepActiveUserBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<CepActiveUserBean>() {
long maxTimestamp = Long.MAX_VALUE;
long maxOutOfOrderness = 500L;
@Override
public void onEvent(CepActiveUserBean event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(event.getTimestamp(), maxTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
}
};
}
}.withTimestampAssigner((element, recordTimes) -> element.getTimestamp())
);
KeyedStream<CepActiveUserBean, String> keyed = watermark
.keyBy(new KeySelector<CepActiveUserBean, String>() {
@Override
public String getKey(CepActiveUserBean value) throws Exception {
return value.getUsername();
}
});
Pattern<CepActiveUserBean, CepActiveUserBean> pattern = Pattern
.<CepActiveUserBean>begin("start")
.where(new SimpleCondition<CepActiveUserBean>() {
@Override
public boolean filter(CepActiveUserBean value) throws Exception {
return value.getPrice() > 0;
}
})
.timesOrMore(5)
.within(Time.hours(24));
PatternStream<CepActiveUserBean> parentStream = CEP.pattern(keyed, pattern);
SingleOutputStreamOperator<CepActiveUserBean> process = parentStream
.process(new PatternProcessFunction<CepActiveUserBean, CepActiveUserBean>() {
@Override
public void processMatch(Map<String, List<CepActiveUserBean>> map, Context context, Collector<CepActiveUserBean> collector) throws Exception {
System.out.println("map: " + map);
}
});
process.print();
env.execute("FlinkCepActiveUser");
}
}
class CepActiveUserBean {
private String username;
private Double price;
private Long timestamp;
public CepActiveUserBean(String username, Double price, Long timestamp) {
this.username = username;
this.price = price;
this.timestamp = timestamp;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "CepActiveUserBean{" +
"username='" + username + '\'' +
", price=" + price +
", timestamp=" + timestamp +
'}';
}
}
map: {start=[CepActiveUserBean{username='100XX', price=100.0, timestamp=1597905235000}, CepActiveUserBean{username='100XX', price=200.0, timestamp=1597905236000}, CepActiveUserBean{username='100XX', price=300.0, timestamp=1597905237000}, CepActiveUserBean{username='100XX', price=400.0, timestamp=1597905238000}, CepActiveUserBean{username='100XX', price=500.0, timestamp=1597905239000}]}
Process finished with exit code 0
运行结果如下图所示:
找出下单后10分钟没有支付的订单,数据源如下:
new TimeOutPayBean(1L, "create", 1597905234000L),
new TimeOutPayBean(1L, "pay", 1597905235000L),
new TimeOutPayBean(2L, "create", 1597905236000L),
new TimeOutPayBean(2L, "pay", 1597905237000L),
new TimeOutPayBean(3L, "create", 1597905239000L)
package icu.wzk;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
public class FlinkCepTimeOutPay {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<TimeOutPayBean> data = env.fromElements(
new TimeOutPayBean(1L, "create", 1597905234000L),
new TimeOutPayBean(1L, "pay", 1597905235000L),
new TimeOutPayBean(2L, "create", 1597905236000L),
new TimeOutPayBean(2L, "pay", 1597905237000L),
new TimeOutPayBean(3L, "create", 1597905239000L)
);
DataStream<TimeOutPayBean> watermark = data
.assignTimestampsAndWatermarks(new WatermarkStrategy<TimeOutPayBean>() {
@Override
public WatermarkGenerator<TimeOutPayBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<TimeOutPayBean>() {
long maxTimestamp = Long.MAX_VALUE;
long maxOutOfOrderness = 500L;
@Override
public void onEvent(TimeOutPayBean event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
}
};
}
}.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
KeyedStream<TimeOutPayBean, Long> keyedStream = watermark
.keyBy(new KeySelector<TimeOutPayBean, Long>() {
@Override
public Long getKey(TimeOutPayBean value) throws Exception {
return value.getUserId();
}
});
// 逻辑处理代码
OutputTag<TimeOutPayBean> orderTimeoutOutput = new OutputTag<>("orderTimeout") {};
Pattern<TimeOutPayBean, TimeOutPayBean> pattern = Pattern
.<TimeOutPayBean>begin("begin")
.where(new IterativeCondition<TimeOutPayBean>() {
@Override
public boolean filter(TimeOutPayBean timeOutPayBean, Context<TimeOutPayBean> context) throws Exception {
return timeOutPayBean.getOperation().equals("create");
}
})
.followedBy("pay")
.where(new IterativeCondition<TimeOutPayBean>() {
@Override
public boolean filter(TimeOutPayBean timeOutPayBean, Context<TimeOutPayBean> context) throws Exception {
return timeOutPayBean.getOperation().equals("pay");
}
})
.within(Time.seconds(600));
PatternStream<TimeOutPayBean> patternStream = CEP.pattern(keyedStream, pattern);
SingleOutputStreamOperator<TimeOutPayBean> result = patternStream
.select(orderTimeoutOutput, new PatternTimeoutFunction<TimeOutPayBean, TimeOutPayBean>() {
@Override
public TimeOutPayBean timeout(Map<String, List<TimeOutPayBean>> map, long l) throws Exception {
return map.get("begin").get(0);
}
}, new PatternSelectFunction<TimeOutPayBean, TimeOutPayBean>() {
@Override
public TimeOutPayBean select(Map<String, List<TimeOutPayBean>> map) throws Exception {
return map.get("pay").get(0);
}
});
// 输出结果
// result.print();
System.out.println("==============");
DataStream<TimeOutPayBean> sideOutput = result
.getSideOutput(orderTimeoutOutput);
sideOutput.print();
// 执行
env.execute("FlinkCepTimeOutPay");
}
}
class TimeOutPayBean {
private Long userId;
private String operation;
private Long timestamp;
public TimeOutPayBean(Long userId, String operation, Long timestamp) {
this.userId = userId;
this.operation = operation;
this.timestamp = timestamp;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getOperation() {
return operation;
}
public void setOperation(String operation) {
this.operation = operation;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "TimeOutPayBean{" +
"userId=" + userId +
", operation='" + operation + '\'' +
", timestamp=" + timestamp +
'}';
}
}
控制台输出为:
==============
TimeOutPayBean{userId=1, operation='pay', timestamp=1597905235000}
TimeOutPayBean{userId=3, operation='create', timestamp=1597905239000}
TimeOutPayBean{userId=2, operation='pay', timestamp=1597905237000}
Process finished with exit code 0
对应截图如下: