package com.techwolf.hubble;
import com.alibaba.fastjson.JSONObject;
import com.techwolf.hubble.constant.Config;
import com.techwolf.hubble.model.TestEvent;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
public class App {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
WatermarkStrategy<TestEvent> watermarkStrategy=WatermarkStrategy.<TestEvent>
forMonotonousTimestamps().withTimestampAssigner(new EventTimeAssignerSupplier());
DataStream<TestEvent> inputDataSteam=env.fromElements(
new TestEvent("1","A",System.currentTimeMillis()-100*1000,"1"),
new TestEvent("1","A",System.currentTimeMillis()-85*1000,"2"),
new TestEvent("1","A",System.currentTimeMillis()-80*1000,"3"),
new TestEvent("1","A",System.currentTimeMillis()-75*1000,"4"),
new TestEvent("1","A",System.currentTimeMillis()-60*1000,"5"),
new TestEvent("1","A",System.currentTimeMillis()-55*1000,"6"),
new TestEvent("1","A",System.currentTimeMillis()-40*1000,"7"),
new TestEvent("1","A",System.currentTimeMillis()-35*1000,"8"),
new TestEvent("1","A",System.currentTimeMillis()-20*1000,"9"),
new TestEvent("1","A",System.currentTimeMillis()-10*1000,"10"),
new TestEvent("1","B",System.currentTimeMillis()-5*1000,"11")
).assignTimestampsAndWatermarks(watermarkStrategy);
Pattern<TestEvent,TestEvent> pattern=Pattern.<TestEvent>begin("begin")
.where(new SimpleCondition<TestEvent>() {
@Override
public boolean filter(TestEvent testEvent) throws Exception {
return testEvent.getAction().equals("A");
}
}).
followedBy("end")
.where(new SimpleCondition<TestEvent>() {
@Override
public boolean filter(TestEvent testEvent) throws Exception {
return testEvent.getAction().equals("B");
}
}).within(Time.seconds(10));
PatternStream<TestEvent> patternStream=CEP.pattern(inputDataSteam.keyBy(TestEvent::getId),pattern);
OutputTag<TestEvent> timeOutTag=new OutputTag<TestEvent>("timeOutTag"){};
SingleOutputStreamOperator<TestEvent> twentySingleOutputStream=patternStream
.flatSelect(timeOutTag,new EventTimeOut(),new FlatSelect())
.uid("match_twenty_minutes_pattern");
DataStream<String> result=twentySingleOutputStream.getSideOutput(timeOutTag).map(new MapFunction<TestEvent, String>() {
@Override
public String map(TestEvent testEvent) throws Exception {
return JSONObject.toJSONString(testEvent);
}
});
result.print();
env.execute(Config.JOB_NAME);
}
public static class EventTimeOut implements PatternFlatTimeoutFunction<TestEvent,TestEvent> {
private static final long serialVersionUID = -2471077777598713906L;
@Override
public void timeout(Map<String, List<TestEvent>> map, long l, Collector<TestEvent> collector) throws Exception {
if (null != map.get("begin")) {
for (TestEvent event : map.get("begin")) {
collector.collect(event);
}
}
}
}
public static class FlatSelect implements PatternFlatSelectFunction<TestEvent,TestEvent> {
private static final long serialVersionUID = 1753544074226581611L;
@Override
public void flatSelect(Map<String, List<TestEvent>> map, Collector<TestEvent> collector) throws Exception {
if (null != map.get("begin")) {
for (TestEvent event : map.get("begin")) {
collector.collect(event);
}
}
}
}
public static class EventTimeAssignerSupplier implements TimestampAssignerSupplier<TestEvent> {
private static final long serialVersionUID = -9040340771307752904L;
@Override
public TimestampAssigner<TestEvent> createTimestampAssigner(Context context) {
return new EventTimeAssigner();
}
}
public static class EventTimeAssigner implements TimestampAssigner<TestEvent> {
@Override
public long extractTimestamp(TestEvent event, long l) {
return event.getEventTime();
}
}
}

- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131