(1)主程序代码
import Beans.EventPOJO;
import Beans.WaringMsgPOJO;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import scala.util.parsing.json.JSONObject;
import java.net.URL;
import java.util.List;
import java.util.Map;
public class cepTestPro {
public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2. 从文件中读取数据,转换为POJO数据对象,配置水位线
URL resource = cepTestPro.class.getResource("/loginlog.csv");
DataStream<EventPOJO> eventPOJOStream = env.readTextFile(resource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new EventPOJO(fields[0].trim(), fields[1].trim(), fields[2].trim(), fields[3].trim(), Long.valueOf(fields[4].trim()));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<EventPOJO>(Time.seconds(2)) {
@Override
public long extractTimestamp(EventPOJO eventPOJO) {
return eventPOJO.getTimestamp() * 1000L;
}
});
// 3. 按照用户名进行分组
KeyedStream<EventPOJO, String> eventKeyedStream = eventPOJOStream.keyBy(event -> event.getName());
// eventPOJOStream.print();
// 4.1 定义一个cep匹配规则,5秒之内连续3次登陆失败
Pattern<EventPOJO, EventPOJO> loginFailPattern = Pattern
.<EventPOJO>begin("loginFailEvents").where(new SimpleCondition<EventPOJO>() {
@Override
public boolean filter(EventPOJO value) throws Exception {
return "fail".equals(value.getLoginStatus());
}
}).times(3).consecutive()
.within(Time.seconds(5));
// 4.2 定义一个cep匹配规则:5秒之内在不同ip地址登陆成功
Pattern<EventPOJO, EventPOJO> LoginMorePlacePattern = Pattern
.<EventPOJO>begin("firstLogin").where(new SimpleCondition<EventPOJO>() {
@Override
public boolean filter(EventPOJO eventPOJO) throws Exception {
return "success".equals(eventPOJO.getLoginStatus());
}
}).next("secondLogin").where(new IterativeCondition<EventPOJO>() {
@Override
public boolean filter(EventPOJO eventPOJO, Context<EventPOJO> context) throws Exception {
EventPOJO firstLogin = context.getEventsForPattern("firstLogin").iterator().next();
return !eventPOJO.getIp().equals(firstLogin.getIp());
}
}).within(Time.seconds(5));
// 4.3 将匹配模式应用到数据流上,得到patternStream
PatternStream<EventPOJO> loginFailPatternStream = CEP.pattern(eventKeyedStream, loginFailPattern);
PatternStream<EventPOJO> LoginMorePlacePatternStream = CEP.pattern(eventKeyedStream, LoginMorePlacePattern);
// 4.4 检测出符合匹配规则的复杂事件,进行转换处理,得到告警信息
SingleOutputStreamOperator<WaringMsgPOJO> loginMorePlaceWarning = LoginMorePlacePatternStream.select(new LoginMorePlaceWarning());
SingleOutputStreamOperator<WaringMsgPOJO> loginFailWarning = loginFailPatternStream.select(new LoginFailWarning());
// 5. 打印告警输出
loginMorePlaceWarning.print();
loginFailWarning.print();
// 执行flink任务
env.execute();
}
// 对多地登陆事件的告警输出
public static class LoginMorePlaceWarning implements PatternSelectFunction<EventPOJO, WaringMsgPOJO> {
@Override
public WaringMsgPOJO select(Map<String, List<EventPOJO>> pattern) throws Exception {
EventPOJO firstLoginEvent = pattern.get("firstLogin").iterator().next();;
EventPOJO secondLoginEvent = pattern.get("secondLogin").get(0);
return new WaringMsgPOJO(firstLoginEvent.getName(), firstLoginEvent.getIp()+", "+secondLoginEvent.getIp(), "More place login", firstLoginEvent.getTimestamp(), secondLoginEvent.getTimestamp());
}
}
// 对连续登陆失败事件的告警输出
public static class LoginFailWarning implements PatternSelectFunction<EventPOJO, WaringMsgPOJO> {
@Override
public WaringMsgPOJO select(Map<String, List<EventPOJO>> pattern) throws Exception {
EventPOJO firstFailEvent = pattern.get("loginFailEvents").get(0);
EventPOJO lastFailEvent = pattern.get("loginFailEvents").get(pattern.get("loginFailEvents").size() - 1);
return new WaringMsgPOJO(firstFailEvent.getName(), firstFailEvent.getIp(), "Login fail " + pattern.get("loginFailEvents").size() + " times", firstFailEvent.getTimestamp(), lastFailEvent.getTimestamp());
}
}
}
(2)用到的Bean对象
package Beans;
public class EventPOJO {
public String name;
public String url;
public String ip;
public String loginStatus;
public Long timestamp;
public EventPOJO() {
}
public EventPOJO(String name, String url, String ip, String loginStatus, Long timestamp) {
this.name = name;
this.url = url;
this.ip = ip;
this.loginStatus = loginStatus;
this.timestamp = timestamp;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getLoginStatus() {
return loginStatus;
}
public void setLoginStatus(String loginStatus) {
this.loginStatus = loginStatus;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "EventPOJO{" +
"name='" + name + '\'' +
", url='" + url + '\'' +
", ip='" + ip + '\'' +
", loginStatus='" + loginStatus + '\'' +
", timestamp=" + timestamp +
'}';
}
}
package Beans;
public class WaringMsgPOJO {
public String username;
public String ip;
public String warningMsg;
public Long firstTime;
public Long lastTime;
public WaringMsgPOJO() {
}
public WaringMsgPOJO(String username, String ip, String warningMsg, Long firstTime, Long lastTime) {
this.username = username;
this.ip = ip;
this.warningMsg = warningMsg;
this.firstTime = firstTime;
this.lastTime = lastTime;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getWarningMsg() {
return warningMsg;
}
public void setWarningMsg(String warningMsg) {
this.warningMsg = warningMsg;
}
public Long getFirstTime() {
return firstTime;
}
public void setFirstTime(Long firstTime) {
this.firstTime = firstTime;
}
public Long getLastTime() {
return lastTime;
}
public void setLastTime(Long lastTime) {
this.lastTime = lastTime;
}
@Override
public String toString() {
return "WaringMsgPOJO{" +
"username='" + username + '\'' +
", ip='" + ip + '\'' +
", warningMsg='" + warningMsg + '\'' +
", firstTime='" + firstTime + '\'' +
", lastTime='" + lastTime + '\'' +
'}';
}
}
(3)测试数据
Bob,./index.html,10.255.82.110,fail,1597184223
Bob,./index.html,10.255.82.110,fail,1597184224
Bob,./index.html,10.255.82.110,fail,1597184226
Bob,./index.html,10.255.82.110,success,1597184225
Bob,./index.html,10.255.82.110,fail,1597184226
Frank,./index.html,10.255.82.110,fail,1597184227
Bob,./index.html,10.255.82.110,fail,1597184227
Frank,./index.html,10.255.82.110,fail,1597184228
Bob,./index.html,10.255.82.110,success,1597184235
Alice,./index.html,10.255.82.110,fail,1597184236
Alice,./index.html,10.255.82.110,fail,1597184237
Frank,./index.html,10.255.82.110,success,15971842238
Frank,./index.html,10.255.82.111,success,15971842239
Alice,./index.html,10.255.82.110,success,1597184246