• Flink Cep 源码分析


    复合事件处理(Complex Event Processing,CEP)是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。

             我们从一个案例开始源码分析之路。

    1.案例代码

    1. Import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    2. import org.apache.flink.api.java.tuple.Tuple3;
    3. import org.apache.flink.cep.CEP;
    4. import org.apache.flink.cep.PatternSelectFunction;
    5. import org.apache.flink.cep.PatternStream;
    6. import org.apache.flink.cep.pattern.Pattern;
    7. import org.apache.flink.streaming.api.TimeCharacteristic;
    8. import org.apache.flink.streaming.api.datastream.KeyedStream;
    9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    10. import java.time.Duration;
    11. import java.util.Map;
    12. public class FlinkCepTest {
    13. public static void main(String[] args) throws Exception {
    14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    15. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    16. env.setParallelism(1);
    17. // 数据源
    18. KeyedStream<Tuple3<String, Long, String>, String> source = env.fromElements(
    19. new Tuple3<String, Long, String>("1001", 1656914303000L, "success")
    20. , new Tuple3<String, Long, String>("1001", 1656914304000L, "fail")
    21. , new Tuple3<String, Long, String>("1001", 1656914305000L, "fail")
    22. , new Tuple3<String, Long, String>("1001", 1656914306000L, "success")
    23. , new Tuple3<String, Long, String>("1001", 1656914307000L, "fail")
    24. , new Tuple3<String, Long, String>("1001", 1656914308000L, "success")
    25. , new Tuple3<String, Long, String>("1001", 1656914309000L, "fail")
    26. , new Tuple3<String, Long, String>("1001", 1656914310000L, "success")
    27. , new Tuple3<String, Long, String>("1001", 1656914311000L, "fail")
    28. , new Tuple3<String, Long, String>("1001", 1656914312000L, "fail")
    29. , new Tuple3<String, Long, String>("1001", 1656914313000L, "success")
    30. , new Tuple3<String, Long, String>("1001", 1656914314000L, "end")
    31. ).assignTimestampsAndWatermarks(WatermarkStrategy
    32. .<Tuple3<String, Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(1))
    33. .withTimestampAssigner((event, timestamp) ->{
    34. return event.f1;
    35. }))
    36. .keyBy(e -> e.f0);
    37. Pattern<Tuple3<String, Long, String>,?> pattern = Pattern
    38. .<Tuple3<String, Long, String>>begin("begin")
    39. .where(new Begincondition())
    40. .followedByAny("middle")
    41. .where(new Middlecondition())
    42. .followedBy("end")
    43. .where(new Endcondition())
    44. ;
    45. //TODO 内部构建 PatternStreamBuilder 并返回 PatternStream
    46. PatternStream patternStream = CEP.pattern(source, pattern);
    47. patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>,Map>() {
    48. @Override
    49. public Map select(Map map) throws Exception {
    50. return map;
    51. }
    52. }).print();
    53. env.execute("cep");
    54. }
    55. }

    2.源码分析

    根据上述提供的案例接下来我们分几个模块进行源码解析:Pattern构建,内部包含 NFAFactory,Cepopertor等构建逻辑,数据处理,超时处理,获取数据。

    1. //TODO 内部构建 PatternStreamBuilder 并返回 PatternStream
    2. PatternStream patternStream = CEP.pattern(source, pattern);
    3. PatternStreamBuilder.forStreamAndPattern(inputStream, pattern);
    4. static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {
    5. return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);
    6. }

    PatternStream.select() 内部会调用 PatternStreamBuilder.build()

    1. public <R> SingleOutputStreamOperator<R> process(
    2. final PatternProcessFunction<T, R> patternProcessFunction,
    3. final TypeInformation<R> outTypeInfo) {
    4. return builder.build(
    5. outTypeInfo,
    6. builder.clean(patternProcessFunction));
    7. }

    org.apache.flink.cep.PatternStreamBuilder#build

    1. <OUT, K> SingleOutputStreamOperator<OUT> build(
    2. final TypeInformation<OUT> outTypeInfo,
    3. final PatternProcessFunction<IN, OUT> processFunction) {
    4. checkNotNull(outTypeInfo);
    5. checkNotNull(processFunction);
    6. //TODO 构造序列化器
    7. final TypeSerializer<IN> inputSerializer =
    8. inputStream.getType().createSerializer(inputStream.getExecutionConfig());
    9. final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;
    10. //TODO 判断是否是获取超时结果的 select/flatSelect
    11. final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
    12. //TODO 构建 NFAFactory 工厂类
    13. // 工厂对象还包含了用户所有的State集合
    14. final NFACompiler.NFAFactory<IN> nfaFactory =
    15. NFACompiler.compileFactory(pattern, timeoutHandling);
    16. //TODO 创建 CepOperator
    17. final CepOperator<IN, K, OUT> operator =
    18. new CepOperator<>(
    19. inputSerializer,
    20. isProcessingTime,
    21. nfaFactory,
    22. comparator,
    23. pattern.getAfterMatchSkipStrategy(),
    24. processFunction,
    25. lateDataOutputTag);
    26. final SingleOutputStreamOperator<OUT> patternStream;
    27. if (inputStream instanceof KeyedStream) {
    28. KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;
    29. patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);
    30. } else {
    31. KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
    32. patternStream =
    33. inputStream
    34. .keyBy(keySelector)
    35. .transform("GlobalCepOperator", outTypeInfo, operator)
    36. .forceNonParallel();
    37. }
    38. return patternStream;
    39. }

    org.apache.flink.cep.nfa.compiler.NFACompiler#compileFactory 构建 NFACompiler.NFAFactory

    org.apache.flink.cep.operator.CepOperator#open() 初始化逻辑

    1. @Override
    2. public void open() throws Exception {
    3. super.open();
    4. //TODO 初始化 定时器服务
    5. timerService =
    6. getInternalTimerService(
    7. "watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);
    8. //TODO 创建NFA初始化了所有的顶点state和边transition
    9. // 这个时候的state集合已经初始化完成了
    10. nfa = nfaFactory.createNFA();
    11. //TODO 给判断逻辑设置 cep运行环境
    12. nfa.open(cepRuntimeContext, new Configuration());
    13. context = new ContextFunctionImpl();
    14. collector = new TimestampedCollector<>(output);
    15. cepTimerService = new TimerServiceImpl();
    16. // metrics
    17. this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    18. }

    org.apache.flink.cep.operator.CepOperator#processElement() 接收数据进行处理

    1. @Override
    2. public void processElement(StreamRecord<IN> element) throws Exception {
    3. //TODO 判断当前语义为 ProcessingTime 还是 EventTime
    4. if (isProcessingTime) {
    5. if (comparator == null) {
    6. // there can be no out of order elements in processing time
    7. NFAState nfaState = getNFAState();
    8. long timestamp = getProcessingTimeService().getCurrentProcessingTime();
    9. advanceTime(nfaState, timestamp);
    10. processEvent(nfaState, element.getValue(), timestamp);
    11. updateNFA(nfaState);
    12. } else {
    13. long currentTime = timerService.currentProcessingTime();
    14. bufferEvent(element.getValue(), currentTime);
    15. // register a timer for the next millisecond to sort and emit buffered data
    16. timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentTime + 1);
    17. }
    18. } else {
    19. //TODO 获取事件的时间
    20. long timestamp = element.getTimestamp();
    21. //TODO 获取数据
    22. IN value = element.getValue();
    23. // In event-time processing we assume correctness of the watermark.
    24. // Events with timestamp smaller than or equal with the last seen watermark are
    25. // considered late.
    26. // Late events are put in a dedicated side output, if the user has specified one.
    27. //TODO 在事件时间处理中,我们假设水印的正确性。
    28. // 时间戳小于或等于最后看到的水印的事件被认为是迟到的。
    29. // 如果用户指定了,则将延迟事件放在专用端输出中。
    30. if (timestamp > timerService.currentWatermark()) {
    31. // we have an event with a valid timestamp, so
    32. // we buffer it until we receive the proper watermark.
    33. //TODO 注册Watermark定时器
    34. saveRegisterWatermarkTimer();
    35. //TODO 缓存数据 key为事件时间 value为数据集合
    36. bufferEvent(value, timestamp);
    37. } else if (lateDataOutputTag != null) {
    38. output.collect(lateDataOutputTag, element);
    39. } else {
    40. numLateRecordsDropped.inc();
    41. }
    42. }
    43. }

    org.apache.flink.cep.operator.CepOperator#onEventTime() 触发计算

    1. @Override
    2. public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
    3. // 1) get the queue of pending elements for the key and the corresponding NFA,
    4. // 2) process the pending elements in event time order and custom comparator if exists
    5. // by feeding them in the NFA
    6. // 3) advance the time to the current watermark, so that expired patterns are discarded.
    7. // 4) update the stored state for the key, by only storing the new NFA and MapState iff they
    8. // have state to be used later.
    9. // 5) update the last seen watermark.
    10. //TODO
    11. // 1)获取键的挂起元素队列和相应的NFA,
    12. // 2)按照事件时间顺序处理挂起的元素,如果存在自定义比较器,则在NFA中输入它们
    13. // 3)将时间提前到当前水印,丢弃过期的图案。
    14. // 4)更新密钥的存储状态,只存储新的 NFA 和 MapState,如果它们有状态要稍后使用。
    15. // 5)更新最后一次出现的水印。
    16. // STEP 1
    17. //TODO 获取优先队列的中的数据
    18. PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
    19. //TODO 用于保存未匹配的完成的状态,和已匹配完成的状态,这里get为空时会初始化
    20. // 先遍历所有的找到其中为start的state作为下一个可匹配的状态
    21. NFAState nfaState = getNFAState();
    22. // STEP 2
    23. while (!sortedTimestamps.isEmpty()
    24. && sortedTimestamps.peek() <= timerService.currentWatermark()) {
    25. long timestamp = sortedTimestamps.poll();
    26. //TODO 处理超时未匹配的数据
    27. advanceTime(nfaState, timestamp);
    28. try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
    29. elements.forEachOrdered(
    30. event -> {
    31. try {
    32. //TODO 将数据排好序(事件时间)以后使用NFA真正的处理逻辑
    33. processEvent(nfaState, event, timestamp);
    34. } catch (Exception e) {
    35. throw new RuntimeException(e);
    36. }
    37. });
    38. }
    39. elementQueueState.remove(timestamp);
    40. }
    41. // STEP 3
    42. advanceTime(nfaState, timerService.currentWatermark());
    43. // STEP 4
    44. updateNFA(nfaState);
    45. if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) {
    46. saveRegisterWatermarkTimer();
    47. }
    48. }

    org.apache.flink.cep.operator.CepOperator#processEvent() 处理数据

    1. /**
    2. * Process the given event by giving it to the NFA and outputting the produced set of matched
    3. * event sequences.
    4. *
    5. * @param nfaState Our NFAState object
    6. * @param event The current event to be processed
    7. * @param timestamp The timestamp of the event
    8. */
    9. private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
    10. try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
    11. //TODO 得到匹配上规则的map,map中包含了这个正则的所有数据
    12. Collection<Map<String, List<IN>>> patterns =
    13. nfa.process( //TODO 真正的处理逻辑
    14. sharedBufferAccessor,
    15. nfaState,
    16. event,
    17. timestamp,
    18. afterMatchSkipStrategy,
    19. cepTimerService);
    20. //TODO 这个map包含了匹配上的一个正则,下面会调用用户的select或者flatselect方法,往下游发送
    21. processMatchedSequences(patterns, timestamp);
    22. }
    23. }

    org.apache.flink.cep.nfa.NFA#doProcess 开始处理数据

    ===> 第一条数据为 (1001,1656914303000,success)

    当前正在匹配的只有 begin 的state

    经过 org.apache.flink.cep.nfa.NFA#computeNextStates 匹配之后

    新的匹配中的state 变为两个 同时版本值会递增 同状态+1 下一个状态新增下一级

    org.apache.flink.cep.operator.CepOperator#processMatchedSequences 判断是否匹配完成输出

    ===> 第二条数据为 (1001,1656914304000,fail)

    经过 org.apache.flink.cep.nfa.NFA#computeNextStates 匹配之后

    ===> 第三数据为 (1001,1656914305000,fail)

    经过 org.apache.flink.cep.nfa.NFA#computeNextStates 匹配之后

    ===> 第四数据为(1001,1656914306000,success)

    经过 org.apache.flink.cep.nfa.NFA#computeNextStates 匹配之后

    此时当前数据匹配到begin,startState版本+1,并且新增一个middle的状态

    ===> 第五数据为(1001,1656914307000,end)

    经过 org.apache.flink.cep.nfa.NFA#computeNextStates 匹配

    正在匹配中的状态变成3个,两个end的state变为完成状态。

    判断匹配到state状态是否为Final状态,是添加到potentialMatches中。

    之后遍历从sharedBufferAccessor中获取数据。

    SharedBufferAccessor 简介

    Flink CEP设计了一个带版本的共享缓冲区。它会给每一次匹配分配一个版本号并使用该版本号来标记在这次匹配中的所有指针。

       

    做三个独立的缓冲区实现上是没有问题,但是我们发现缓冲区3状态stat1的堆栈和缓冲区1状态stat1的堆栈是一样的,我们完全没有必要分别占用内存。而且在实际的模式匹配场景下,每个缓冲区独立维护的堆栈中可能会有大量的数据重叠。随着流事件的不断流入,为每个匹配结果独立维护缓存区占用内存会越来越大。所以Flink CEP 提出了共享缓存区的概念(SharedBuffer),就是用一个共享的缓存区来表示上面三个缓存区。

    org.apache.flink.cep.operator.CepOperator#processMatchedSequences 处理匹配完成的state

    processMatchedSequences内部调用自定义的 select函数 输出满足模式的数据

    3.参考链接

    Flink源码解读系列 | Flink中的CEP复杂事件处理源码分析

    Flink CEP-NFA详解_听挽风讲大数据的博客-CSDN博客_flink nfa

  • 相关阅读:
    吐血整理,最全Pytest自动化测试框架快速上手(超详细)
    如何对用OpenCV开发的API进行测试 (Google Test 版本)
    工厂模式 与 抽象工厂模式 的区别
    德克萨斯大学奥斯汀分校自然语言处理硕士课程汉化版(第九周) - 可解释性
    艾美捷Abnova MYOC (人)抗体对说明书
    Spring框架
    文件上传漏洞
    python批量修改图像像素,修改命名,二值化,划分数据集
    线上环境内存溢出-OutOfMemoryError
    python监听html click教程
  • 原文地址:https://blog.csdn.net/qq_20672231/article/details/125615807