• 大数据-玩转数据-Flink CEP编程


    一、Flink CEP

    FlinkCEP(Complex event processing for Flink) 是在Flink实现的复杂事件处理库。它可以让你在无界流中检测出特定的数据,有机会掌握数据中重要的那部分。
    是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。

    1. 目标:从有序的简单事件流中发现一些高阶特征
    2. 输入:一个或多个由简单事件构成的事件流
    3. 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
    4. 输出:满足规则的复杂事件

    二、Flink CEP应用场景

    风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
    策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
    运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。

    三、CEP开发基本步骤

    导入CEP相关依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    代码案例

    package com.lyh.flink11;
    
    import com.lyh.bean.WaterSensor;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.cep.CEP;
    import org.apache.flink.cep.PatternSelectFunction;
    import org.apache.flink.cep.PatternStream;
    import org.apache.flink.cep.pattern.Pattern;
    import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.time.Duration;
    import java.util.List;
    import java.util.Map;
    
    public class Flink_CEP_S {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
            SingleOutputStreamOperator<WaterSensor> stream = env.readTextFile("input/sensor.txt")
                    .map(line -> {
                        String[] datas = line.split(",");
                        return new WaterSensor(
                                datas[0],
                                Long.valueOf(datas[1]),
                                Integer.valueOf(datas[2])
                        );
                    }).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                            .withTimestampAssigner((element, timeStamp) -> element.getTs()));
    
            Pattern<WaterSensor, WaterSensor> sensor_1 = Pattern.<WaterSensor>begin("sensor_1")
                    .where(new SimpleCondition<WaterSensor>() {
                        @Override
                        public boolean filter(WaterSensor value) throws Exception {
                            return "sensor_1".equals(value.getId());
                        }
                    });
            PatternStream<WaterSensor> pattern = CEP.pattern(stream, sensor_1);
            pattern.select(new PatternSelectFunction<WaterSensor, String>() {
                @Override
                public String select(Map<String, List<WaterSensor>> map) throws Exception {
                    return map.toString();
                }
            }).print();
    env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    四、运行结果

    在这里插入图片描述

  • 相关阅读:
    数字IC手撕代码-XX公司笔试真题(数据流pipeline加和)
    代码随想录算法训练营19期第53天
    【C/C++】使用 g++ 编译器编译 C++ 程序的完全指南
    Nginx请求强制缓存设置
    jsonp 原理详解及 jsonp-pro 源码解析
    产品解读 | 分布式多模数据库:KaiwuDB
    【第十二篇】Camunda系列-事件篇-信号事件
    6-Mysql子查询,多表连接(内连接,外连接,交叉连接)
    [学习记录] SpringBoot 1. 基础入门
    【算法】查找类——二分查找算法
  • 原文地址:https://blog.csdn.net/s_unbo/article/details/132980330