• Flink-CEP快速入门


    0. 简介 & 使用步骤

    简介

    • 所谓 CEP,其实就是“复杂事件处理(Complex Event Processing)”的缩写;而 Flink CEP,就是 Flink 实现的一个用于复杂事件处理的库(library)
    • 把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就是“复杂事件”;然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出

    使用步骤

    • 复杂事件处理(CEP)的流程可以分成三个步骤:
      1. 定义一个匹配规则
      2. 将匹配规则应用到事件流上,检测满足规则的复杂事件
      3. 对检测到的复杂事件进行处理,得到结果进行输出
    // 实体类
    public class LoginEvent {
        public String userId;
        public String ipAddress;
        public String eventType;
        public Long timestamp;
    
        public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
            this.userId = userId;
            this.ipAddress = ipAddress;
            this.eventType = eventType;
            this.timestamp = timestamp;
        }
    }
    
    
    // CEP Demo
    public class Demo003 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 获取登录事件流,并提取时间戳、生成水位线
            SingleOutputStreamOperator<LoginEvent> sourceData = env
                    .fromElements(
                            new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
                            new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
                            new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
                            new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
                            new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
                            new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
                            new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
    
                    )
                    .assignTimestampsAndWatermarks(
                            WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ZERO)
                                    .withTimestampAssigner(
                                            new SerializableTimestampAssigner<LoginEvent>() {
                                                @Override
                                                public long extractTimestamp(LoginEvent loginEvent, long l) {
                                                    return loginEvent.timestamp;
                                                }
                                            }));
    
            // 1. 定义一个匹配规则:定义 Pattern,连续的三个登录失败事件
            Pattern<LoginEvent, LoginEvent> pattern = Pattern
                    .<LoginEvent>begin("first")  // 以第一个登录失败事件开始
                    .where(new IterativeCondition<LoginEvent>() {
                        @Override
                        public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
                            return "fail".equals(loginEvent.eventType);
                        }
                    })
                    .next("second")  // 接着是第二个登录失败事件
                    .where(new IterativeCondition<LoginEvent>() {
                        @Override
                        public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
                            return "fail".equals(loginEvent.eventType);
                        }
                    })
                    .next("third")  // 接着是第三个登录失败事件
                    .where(new IterativeCondition<LoginEvent>() {
                        @Override
                        public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
                            return "fail".equals(loginEvent.eventType);
                        }
                    });
    
            // 2. 将 Pattern 应用到流上,检测匹配的复杂事件,得到一个 PatternStream
            PatternStream<LoginEvent> cepPattern = CEP.pattern(sourceData.keyBy(loginEvent -> loginEvent.userId), pattern);
    
            // 3. 对检测到的复杂事件进行处理:将匹配到的复杂事件选择出来,然后包装成字符串
            SingleOutputStreamOperator<String> select = cepPattern.select(new PatternSelectFunction<LoginEvent, String>() {
                @Override
                public String select(Map<String, List<LoginEvent>> map) throws Exception {
                    LoginEvent first = map.get("first").get(0);
                    LoginEvent second = map.get("second").get(0);
                    LoginEvent third = map.get("third").get(0);
                    return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
                }
            });
    
            select.print();
    
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    1. 模式API(Pattern API:匹配规则)

    单个模式

    • 一个模式可以是一个单例或者循环模式。单例模式只接受一个事件,循环模式可以接受多个事件。 在模式匹配表达式中,模式"a b+ c? d"(或者"a",后面跟着一个或者多个"b",再往后可选择的跟着一个"c",最后跟着一个"d"), ac?,和 d都是单例模式,b+是一个循环模式

    量词

    • 单个模式后面可以跟一个“量词”,用来指定循环的次数,单个模式可以包括“单例(singleton)模式”和“循环(looping)模式”,默认是“单例(singleton)模式”,当定义了量词之后,就变成了“循环模式”,可以匹配接收多个事件

    循环模式的方法:

    • .oneOrMore()
      • 匹配事件出现一次或多次,假设 a 是一个个体模式,a.oneOrMore()表示可以匹配 1 个或多个 a 的事件组合。我们有时会用 a+来简单表示
    • .times(times)
      • 匹配事件发生特定次数(times),例如 a.times(3)表示 aaa
    • .times(fromTimes,toTimes)
      • 指定匹配事件出现的次数范围,最小次数为fromTimes,最大次数为toTimes。例如a.times(2, 4)可以匹配 aa,aaa 和 aaaa
    • .greedy()
      • 只能用在循环模式后,使当前循环模式变得“贪心”(greedy),也就是总是尽可能多地去匹配。例如 a.times(2, 4).greedy(),如果出现了连续 4 个 a,那么会直接把 aaaa 检测出来进行处理,其他任意 2 个 a 是不算匹配事件的
    • .optional()
      • 使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足
    // 期望出现4次
    start.times(4);
    
    // 期望出现0或者4次
    start.times(4).optional();
    
    // 期望出现2、3或者4次
    start.times(2, 4);
    
    // 期望出现2、3或者4次,并且尽可能的重复次数多
    start.times(2, 4).greedy();
    
    // 期望出现0、2、3或者4次
    start.times(2, 4).optional();
    
    // 期望出现0、2、3或者4次,并且尽可能的重复次数多
    start.times(2, 4).optional().greedy();
    
    // 期望出现1到多次
    start.oneOrMore();
    
    // 期望出现1到多次,并且尽可能的重复次数多
    start.oneOrMore().greedy();
    
    // 期望出现0到多次
    start.oneOrMore().optional();
    
    // 期望出现0到多次,并且尽可能的重复次数多
    start.oneOrMore().optional().greedy();
    
    // 期望出现2到多次
    start.timesOrMore(2);
    
    // 期望出现2到多次,并且尽可能的重复次数多
    start.timesOrMore(2).greedy();
    
    // 期望出现0、2或多次
    start.timesOrMore(2).optional();
    
    // 期望出现0、2或多次,并且尽可能的重复次数多
    start.timesOrMore(2).optional().greedy();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    条件

    限定子类型
    • 调用.subtype()方法可以为当前模式增加子类型限制条件

      // 这里 SubEvent 是流中数据类型 Event 的子类型。只有事件是 SubEvent 类型时,才可以满足当前模式 pattern 的匹配条件
      pattern.subtype(SubEvent.class);
      
      • 1
      • 2
    简单条件(SimpleCondition)
    • 简单条件是最简单的匹配规则,只根据当前事件的特征来决定是否接受它。这在本质上其实就是一个 filter 操作

      start.where(new SimpleCondition<MyEvent>() {
          @Override
          public boolean filter(MyEvent myEvent) throws Exception {
              return ... // 一些判断条件
          }
      })
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    迭代条件(IterativeCondition)
    • 在实际应用中,我们可能需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。这种需要依靠之前事件来做判断的条件,就叫作“迭代条件”(Iterative Condition)

      Pattern.<MyEvent>begin("first")
          .where(new IterativeCondition<MyEvent>() {
              @Override
              public boolean filter(MyEvent myEvent, Context<MyEvent> context) throws Exception {
                  if (!"event1001".equals(myEvent.getEvent())) {
                      return false;
                  }
      
                  // 根据上下文获取 之前的事件,获取之前满足条件的
                  Iterable<MyEvent> myEventIterable = context.getEventsForPattern("first");
                  // TODO 处理 之前的事件
      
                  return ... // 一些判断条件
              }
          });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
    组合条件
    • 可以多个条件一起使用,当有多个判断逻辑的时候我们可能会用if-else的方式,但组合条件可以在 where()方法后继续接or()方法来组合使用

      Pattern<MyEvent, MyEvent> pattern = Pattern.<MyEvent>begin("first")
                      .where(new IterativeCondition<MyEvent>() {
                          @Override
                          public boolean filter(MyEvent myEvent, Context<MyEvent> context) throws Exception {
                              return ... // 一些判断条件
                          }
                      }).or(new IterativeCondition<MyEvent>() {
                          @Override
                          public boolean filter(MyEvent myEvent, Context<MyEvent> context) throws Exception {
                              return ... // 一些判断条件
                          }
                      });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    终止条件
    • 终止条件的定义是通过调用模式对象的.until()方法来实现的

    • ⚠️终止条件只与oneOrMore()或者oneOrMore().optional()结合使用

      Pattern<MyEvent, MyEvent> pattern = Pattern.<MyEvent>begin("first")
                      .where(new IterativeCondition<MyEvent>() {
                          @Override
                          public boolean filter(MyEvent myEvent, Context<MyEvent> context) throws Exception {
                              return ... // 一些判断条件
                          }
                      }).oneOrMore()
                      .until(new IterativeCondition<MyEvent>() {
                          @Override
                          public boolean filter(MyEvent myEvent, Context<MyEvent> context) throws Exception {
                              return ... // 一些判断条件
                          }
                      });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13

    模式操作列举

    模式操作描述
    where(condition)为当前模式定义一个条件。为了匹配这个模式,一个事件必须满足某些条件。 多个连续的where()语句取与组成判断条件:java pattern.where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // 一些判断条件 } });
    or(condition)增加一个新的判断,和当前的判断取或。一个事件只要满足至少一个判断条件就匹配到模式:java pattern.where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // 一些判断条件 } }).or(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // 替代条件 } });
    until(condition)为循环模式指定一个停止条件。意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。只适用于和oneOrMore()同时使用。NOTE: 在基于事件的条件中,它可用于清理对应模式的状态。java pattern.oneOrMore().until(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // 替代条件 } });
    subtype(subClass)为当前模式定义一个子类型条件。一个事件只有是这个子类型的时候才能匹配到模式:java pattern.subtype(SubEvent.class);
    oneOrMore()指定模式期望匹配到的事件至少出现一次。.默认(在子事件间)使用松散的内部连续性。 NOTE: 推荐使用until()或者within()来清理状态。java pattern.oneOrMore();
    timesOrMore(#times)指定模式期望匹配到的事件至少出现**#times**次。.默认(在子事件间)使用松散的内部连续性。 java pattern.timesOrMore(2);
    times(#ofTimes)指定模式期望匹配到的事件正好出现的次数。默认(在子事件间)使用松散的内部连续性。 java pattern.times(2);
    times(#fromTimes, #toTimes)指定模式期望匹配到的事件出现次数在**#fromTimes#toTimes**之间。默认(在子事件间)使用松散的内部连续性。 java pattern.times(2, 4);
    optional()指定这个模式是可选的,也就是说,它可能根本不出现。这对所有之前提到的量词都适用。java pattern.oneOrMore().optional();
    greedy()指定这个模式是贪心的,也就是说,它会重复尽可能多的次数。这只对量词适用,现在还不支持模式组。java pattern.oneOrMore().greedy();

    组合模式

    连续性

    • 将多个个体模式组合起来的完整模式,就叫作“组合模式”

    • FlinkCEP支持事件之间如下形式的连续策略:

      1. 严格连续: next()期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。
      2. 松散连续: followedBy()忽略匹配的事件之间的不匹配的事件。
      3. 不确定的松散连续: followedByAny()更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。
      4. notNext():如果不想后面直接连着一个特定事件
      5. notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方
      // 严格连续
      Pattern<Event, ?> strict = start.next("middle").where(...);
      
      // 松散连续
      Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
      
      // 不确定的松散连续
      Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
      
      // 严格连续的NOT模式
      Pattern<Event, ?> strictNot = start.notNext("not").where(...);
      
      // 松散连续的NOT模式
      Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
    • within()方法:指定一个模式应该在一定时间内发生

      // 在十秒钟内,从 event1001 开始到 event1004 结束才算
      Pattern<MyEvent, MyEvent> pattern = Pattern.<MyEvent>begin("first")
                      .where(new IterativeCondition<MyEvent>() {
                          @Override
                          public boolean filter(MyEvent myEvent, Context<MyEvent> context) throws Exception {
                              return "event1001".equals(myEvent.getEvent());
                          }
                      })
                      .followedBy("second")
                      .where(new IterativeCondition<MyEvent>() {
                          @Override
                          public boolean filter(MyEvent myEvent, Context<MyEvent> context) throws Exception {
                              return "event1004".equals(myEvent.getEvent());
                          }
                      })
                      .within(Time.seconds(10L));
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16

    循环模式中的近邻条件

    • oneOrMore()、times()等循环模式的默认是松散连续,也就是followedBy()模式

      • .consecutive():在oneOrMore()、times()等循环模式后面跟上consecutive()表示严格连续(next()

        // 1. 定义 Pattern,登录失败事件,循环检测 3 次
        Pattern<LoginEvent, LoginEvent> pattern = Pattern
            .<LoginEvent>begin("fails")
            .where(new SimpleCondition<LoginEvent>() {
                @Override
                public boolean filter(LoginEvent loginEvent) throws Exception {
                    return loginEvent.eventType.equals("fail");
                }
            }).times(3).consecutive();
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
      • .allowCombinations():在oneOrMore()、times()等循环模式后面跟上allowCombinations()表示不确定的松散连续(followedByAny()

    模式组

    • 也可以定义一个模式序列作为beginfollowedByfollowedByAnynext的条件。这个模式序列在逻辑上会被当作匹配的条件, 并且返回一个GroupPattern,可以在GroupPattern上使用oneOrMore()times(#ofTimes)times(#fromTimes, #toTimes)optional()consecutive()allowCombinations()

      Pattern<Event, ?> start = Pattern.begin(
          Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
      );
      
      // 严格连续
      Pattern<Event, ?> strict = start.next(
          Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
      ).times(3);
      
      // 松散连续
      Pattern<Event, ?> relaxed = start.followedBy(
          Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
      ).oneOrMore();
      
      // 不确定松散连续
      Pattern<Event, ?> nonDetermin = start.followedByAny(
          Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
      ).optional();
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
    模式操作描述
    begin(#name)定义一个开始的模式:java Pattern start = Pattern.begin("start");
    begin(#pattern_sequence)定义一个开始的模式:java Pattern start = Pattern.begin( Pattern.begin("start").where(...).followedBy("middle").where(...) );
    next(#name)增加一个新的模式。匹配的事件必须是直接跟在前面匹配到的事件后面(严格连续):java Pattern next = start.next("middle");
    next(#pattern_sequence)增加一个新的模式。匹配的事件序列必须是直接跟在前面匹配到的事件后面(严格连续):java Pattern next = start.next( Pattern.begin("start").where(...).followedBy("middle").where(...) );
    followedBy(#name)增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间(松散连续):java Pattern followedBy = start.followedBy("middle");
    followedBy(#pattern_sequence)增加一个新的模式。可以有其他事件出现在匹配的事件序列和之前匹配到的事件中间(松散连续):java Pattern followedBy = start.followedBy( Pattern.begin("start").where(...).followedBy("middle").where(...) );
    followedByAny(#name)增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间, 每个可选的匹配事件都会作为可选的匹配结果输出(不确定的松散连续):java Pattern followedByAny = start.followedByAny("middle");
    followedByAny(#pattern_sequence)增加一个新的模式。可以有其他事件出现在匹配的事件序列和之前匹配到的事件中间, 每个可选的匹配事件序列都会作为可选的匹配结果输出(不确定的松散连续):java Pattern followedByAny = start.followedByAny( Pattern.begin("start").where(...).followedBy("middle").where(...) );
    notNext()增加一个新的否定模式。匹配的(否定)事件必须直接跟在前面匹配到的事件之后(严格连续)来丢弃这些部分匹配:java Pattern notNext = start.notNext("not");
    notFollowedBy()增加一个新的否定模式。即使有其他事件在匹配的(否定)事件和之前匹配的事件之间发生, 部分匹配的事件序列也会被丢弃(松散连续):java Pattern notFollowedBy = start.notFollowedBy("not");
    within(time)定义匹配模式的事件序列出现的最大时间间隔。如果未完成的事件序列超过了这个事件,就会被丢弃:java pattern.within(Time.seconds(10));

    匹配后跳过策略

    对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。 有五种跳过策略,如下:

    • NO_SKIP: 不跳过
    • SKIP_TO_NEXT: 跳至下一个
    • SKIP_PAST_LAST_EVENT: 跳过所有子匹配
    • SKIP_TO_FIRST: 跳至第一个
    • SKIP_TO_LAST: 跳至最后一个

    例如,给定一个模式b+ c和一个数据流b1 b2 b3 c,不同跳过策略之间的不同如下:

    跳过策略结果描述
    NO_SKIPb1 b2 b3 c
    b2 b3 c
    b3 c
    找到匹配b1 b2 b3 c之后,不会丢弃任何结果。
    SKIP_TO_NEXTb1 b2 b3 c
    b2 b3 c
    b3 c
    找到匹配b1 b2 b3 c之后,不会丢弃任何结果,因为没有以b1开始的其他匹配。
    SKIP_PAST_LAST_EVENTb1 b2 b3 c找到匹配b1 b2 b3 c之后,会丢弃其他所有的部分匹配。
    SKIP_TO_FIRST[b]b1 b2 b3 c
    b2 b3 c
    b3 c
    找到匹配b1 b2 b3 c之后,会尝试丢弃所有在b1之前开始的部分匹配,但没有这样的匹配,所以没有任何匹配被丢弃。
    SKIP_TO_LAST[b]b1 b2 b3 c
    b3 c
    找到匹配b1 b2 b3 c之后,会尝试丢弃所有在b3之前开始的部分匹配,有一个这样的b2 b3 c被丢弃。
    方法描述
    AfterMatchSkipStrategy.noSkip()创建NO_SKIP策略
    AfterMatchSkipStrategy.skipToNext()创建SKIP_TO_NEXT策略
    AfterMatchSkipStrategy.skipPastLastEvent()创建SKIP_PAST_LAST_EVENT策略
    AfterMatchSkipStrategy.skipToFirst(patternName)创建引用模式名称为patternNameSKIP_TO_FIRST策略
    AfterMatchSkipStrategy.skipToLast(patternName)创建引用模式名称为patternNameSKIP_TO_LAST策略

    skipToNext

    // 配置跳过策略:skipToNext模式
    AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToNext();
    // 将跳过策略加入到模式中
    Pattern<MyEvent, MyEvent> pattern = Pattern.<MyEvent>begin("first", skipStrategy)
        .where(new IterativeCondition<MyEvent>() {
            @Override
            public boolean filter(MyEvent myEvent, Context<MyEvent> context) throws Exception {
                return ... // 一些判断条件
            }
        });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    skipToFirst(patternName)

    // 配置跳过策略:skipToFirst模式,参数传模式名称
    AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("first");
    Pattern<MyEvent, MyEvent> pattern = Pattern.<MyEvent>begin("first", skipStrategy)
        .where(new IterativeCondition<MyEvent>() {
            @Override
            public boolean filter(MyEvent myEvent, Context<MyEvent> context) throws Exception {
                return "event1001".equals(myEvent.getEvent());
            }
        }).oneOrMore()
        .followedBy("second")
        .where(new IterativeCondition<MyEvent>() {
            @Override
            public boolean filter(MyEvent myEvent, Context<MyEvent> context) throws Exception {
                return "event1003".equals(myEvent.getEvent());
            }
        })
        .followedBy("thrid")
        .where(new IterativeCondition<MyEvent>() {
            @Override
            public boolean filter(MyEvent myEvent, Context<MyEvent> context) throws Exception {
                return "event1004".equals(myEvent.getEvent());
            }
        });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    2. 检测模式(检测满足规则的复杂事件)

    将模式应用到流上

    • 调用 CEP 类的静态方法.pattern(),将数据流(DataStream)和模式(Pattern)作为两个参数传入
    • DataStream,也可以通过 keyBy 进行按键分区得到 KeyedStream,接下来对复杂事件的检测就会针对不同的 key 单独进行了
    DataStream<Event> inputStream = ...
    Pattern<Event, ?> pattern = ...
    PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
    
    • 1
    • 2
    • 3

    处理匹配事件

    匹配事件的选择提取(select)

    PatternSelectFunction
    • 处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)

      Pattern.<MyEvent>begin("first").where(...);  
      
      // 处理匹配事件
      cepPattern.select(new PatternSelectFunction<MyEvent, String>() {
          @Override
          public String select(Map<String, List<MyEvent>> map) throws Exception {
              // first 是 Pattern 的 name 字符串
              List<MyEvent> first = map.get("first");
              return ...  // 处理匹配事件逻辑
          }
      });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
    PatternFlatSelectFunction
    • .flatSelect(),传入的参数是一个PatternFlatSelectFunction。这是 PatternSelectFunction 的“扁平化”版本;内部需要实现一个 flatSelect()方法,

    • 它与之前 select()的不同就在于没有返回值,而是多了一个收集器(Collector)参数 collector,通过调用 collector.collet()方法就可以实现多次发送输出数据了

      cepPattern.flatSelect(new PatternFlatSelectFunction<MyEvent, String>() {
          @Override
          public void flatSelect(Map<String, List<MyEvent>> map, Collector<String> collector) throws Exception {
              // 处理匹配事件逻辑
          }
      });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

    匹配事件的通用处理(process)

    • 自 1.8 版本之后,Flink CEP 引入了对于匹配事件的通用检测处理方式,那就是直接调用PatternStream 的.process()方法,传入一个 PatternProcessFunction。这看起来就像是我们熟悉的处理函数(process function),它也可以访问一个上下文(Context),进行更多的操作。

    • PatternProcessFunction 功能更加丰富、调用更加灵活,可以完全覆盖其他接口,也就成为了目前官方推荐的处理方式。事实上,PatternSelectFunction 和 PatternFlatSelectFunction在 CEP 内部执行时也会被转换成 PatternProcessFunction

    • Context context:上下文

    • collector.collect():调用此方法实现发送输出数据

      cepPattern.process(new PatternProcessFunction<MyEvent, String>() {
          @Override
          public void processMatch(Map<String, List<MyEvent>> map, Context context, Collector<String> collector) throws Exception {
              // 处理匹配事件逻辑
          }
      });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

    处理超时事件

    • 在 Flink CEP 中 , 提 供 了 一 个 专 门 捕 捉 超 时 的 部 分 匹 配 事 件 的 接 口 , 叫 作TimedOutPartialMatchHandler。这个接口需要实现一个 processTimedOutMatch()方法,可以将超时的、已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。所以这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行

      PatternStream<MyEvent> cepPattern = CEP.pattern(myEventData.keyBy(myEvent -> myEvent.getUserId()), pattern);
      // 测流
      OutputTag<String> outputTag = new OutputTag<String>("time_out"){};
      // 超时数据处理
      SingleOutputStreamOperator<String> processData = cepPattern.process(new MyPatternProcessFunction());
      
      
      
      // 数据处理,处理匹配成功数据,处理超时数据
      public static class MyPatternProcessFunction extends PatternProcessFunction<MyEvent, String>
          implements TimedOutPartialMatchHandler<MyEvent> {
      
          @Override
          public void processMatch(Map<String, List<MyEvent>> map, Context context, Collector<String> collector) throws Exception {
              // 匹配成功逻辑处理
          }
      
          @Override
          public void processTimedOutMatch(Map<String, List<MyEvent>> map, Context context) throws Exception {
              // 超时逻辑处理,将数据写入到测输出流中
              OutputTag<String> outputTag = new OutputTag<String>("time_out"){};
              String str = ...  // 逻辑处理
              context.output(outputTag, str);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25

    Maven

    <properties>
        <flink.version>1.13.0flink.version>
        <java.version>1.8java.version>
        <scala.binary.version>2.12scala.binary.version>
        <slf4j.version>1.7.30slf4j.version>
    properties>
    
    
    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-cep_${scala.binary.version}artifactId>
        <version>${flink.version}version>
    dependency>
    
    
    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-javaartifactId>
        <version>${flink.version}version>
    dependency>
    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-streaming-java_${scala.binary.version}artifactId>
        <version>${flink.version}version>
    dependency>
    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-clients_${scala.binary.version}artifactId>
        <version>${flink.version}version>
    dependency>
    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-kafka_${scala.binary.version}artifactId>
        <version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
  • 相关阅读:
    数据仓库面试题——介绍下数据仓库
    LeetCode简单题之使数组中所有元素都等于零
    基于Java的新能源汽车在线租赁平台设计与实现(源码+lw+ppt+部署文档+视频讲解等)
    十月四日作业
    使用Windows自带命令,将文件编码改为ANSI的方法
    攻防世界—file_include
    剑指Offer || :栈与队列(简单)
    自已定义一个Java异常——子定义异常,和异常遇到的面试题。
    源码学习之Spring容器创建原理
    [python-大语言模型]从浅到深一系列学习笔记记录
  • 原文地址:https://blog.csdn.net/qq_44002865/article/details/126786450