• Flink快速入门教程


    Apache Flink 是以高效、可扩展方式处理海量数据的大数据处理框架。本文介绍它的一些核心概念,以及标准数据转换Java版本api,这些API以流畅的方式可以很容易使用Flink的核心数据结构——分布式集合。
    首先介绍Flink DataSet API实现统计单词频次程序,然后简要看下用于实时流式数据处理的DataStream API。

    maven依赖

    
        org.apache.flink
        flink-java
        1.2.0
    
    
        org.apache.flink
        flink-test-utils_2.10
        1.2.0
        test
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    核心API概念

    使用Flink时,选哟知道一些API相关的概念:

    • 每个在分布式集合数据执行转换程序,需要使用多个转换数据函数,包括:filtering, mapping, joining, grouping, and aggregating。
    • Flink中sink操作触发流执行产生程序期望的结果,例如,将结果保存到文件系统或打印到标准输出。
    • Flink转换是懒执行,意味着知道sink操作执行才会真正执行。
    • Flink API支持两种模式——批处理和实时处理。对于有限数据源使用批模式,使用DataSet API;处理无界实时流数据,应该DataStream API。

    DataSet API转换数据

    Flink程序的入口点是ExecutionEnvironment 类的实例, 它定义了程序执行的上下文。下面创建ExecutionEnvironment对下并开始处理数据:

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    
    • 1
    • 2

    注:当你在本地机器上启动程序,则仅在本地JVM上执行处理。如果需要在集群环境中启动处理,则应该在集群中每个服务器上按照Apache Flink并配置相应ExecutionEnvironment。

    创建数据集(DataSet)

    要执行数据转换,需要提供数据。下面使用ExecutionEnvironement创建DataSet class :

    DataSet amounts = env.fromElements(1, 29, 40, 50);
    
    • 1

    也可以从其他数据源创建数据集,如Apache Kafka、CSV文件或其他数据源。

    过滤和归约

    准备好数据集,就可以进行过滤和转换。假设我们需要根据某阈值进行过滤,然后对过滤后的数据进行累加。则可以使用 filter() 和 reduce() 函数实现:

    int threshold = 30;
    List collect = amounts
      .filter(a -> a > threshold)
      .reduce((integer, t1) -> integer + t1)
      .collect();
    
    assertThat(collect.get(0)).isEqualTo(90);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    注:collect()方法是sink操作,它实际触发数据转换。

    map映射

    假设我们有Person对象数据集:

    private static class Person {
        private int age;
        private String name;
    
        // standard constructors/getters/setters
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    接着创建该对象的数据集:

    DataSet personDataSource = env.fromCollection(
      Arrays.asList(
        new Person(23, "Tom"),
        new Person(75, "Michael")));
    
    • 1
    • 2
    • 3
    • 4

    如果我们仅需要每个对象的age属性,可以使用map转换方法实现:

    List ages = personDataSource
      .map(p -> p.age)
      .collect();
    
    assertThat(ages).hasSize(2);
    assertThat(ages).contains(23, 75);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    join方法

    可以对两个数据集基于ID字段进行关联操作,实现连接转换。下面创建用户的事务和地址数据集:

    Tuple3 address
      = new Tuple3<>(1, "5th Avenue", "London");
    DataSet> addresses
      = env.fromElements(address);
    
    Tuple2 firstTransaction 
      = new Tuple2<>(1, "Transaction_1");
    DataSet> transactions 
      = env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    两个元组的第一个字段都是整型,这是连接两个数据集的ID字段。为了执行实际连接逻辑,需要实现地址和事务数据集的KeySelector接口:

    private static class IdKeySelectorTransaction 
      implements KeySelector, Integer> {
        @Override
        public Integer getKey(Tuple2 value) {
            return value.f0;
        }
    }
    
    private static class IdKeySelectorAddress 
      implements KeySelector, Integer> {
        @Override
        public Integer getKey(Tuple3 value) {
            return value.f0;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    每个选择器只返回应该执行联接的字段。不幸的是不能使用lambda表达式简化实现,应该Flink需要泛型类型信息。

    接着使用选择器实现合并逻辑:

    List, Tuple3>>
      joined = transactions.join(addresses)
      .where(new IdKeySelectorTransaction())
      .equalTo(new IdKeySelectorAddress())
      .collect();
    
    assertThat(joined).hasSize(1);
    assertThat(joined).contains(new Tuple2<>(firstTransaction, address));
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    排序

    首先准备一些实例数据,Tuple2类型集合:

    Tuple2 secondPerson = new Tuple2<>(4, "Tom");
    Tuple2 thirdPerson = new Tuple2<>(5, "Scott");
    Tuple2 fourthPerson = new Tuple2<>(200, "Michael");
    Tuple2 firstPerson = new Tuple2<>(1, "Jack");
    DataSet> transactions = env.fromElements(
      fourthPerson, secondPerson, thirdPerson, firstPerson);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    如何需要按Tuple2中第一个字段进行排序,需要使用sortPartition方法执行转换:

    List> sorted = transactions
      .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
      .collect();
    
    assertThat(sorted)
      .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    经典示例

    单词计数是现实大数据处理框架的经典示例,主要对数据文本的内容处理计算单词频数。本节提供Flink实现版本。首先创建LineSplitter 类分割输入为单词,收集每个单词的Tuple2类型(key-value), key即输入中发现的每个单词,value为常数1。

    该类实现FlatMapFunction接口,它接收字符串作为输入,产生 Tuple2作为输出:

    public class LineSplitter implements FlatMapFunction> {
    
        @Override
        public void flatMap(String value, Collector> out) {
            Stream.of(value.toLowerCase().split("\\W+"))
              .filter(t -> t.length() > 0)
              .forEach(token -> out.collect(new Tuple2<>(token, 1)));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    然后调用Collector类的collect方法,推送数据至处理流水线。接着按第一个元素(单词)对元组进行分组并执行sum聚集方法对元组的第二个元素进行求和计算单词的频数。

    public static DataSet> startWordCount(
      ExecutionEnvironment env, List lines) throws Exception {
        DataSet text = env.fromCollection(lines);
    
        return text.flatMap(new LineSplitter())
          .groupBy(0)
          .aggregate(Aggregations.SUM, 1);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    我们使用了三种Flink转换类型:flatMap(), groupBy() 和 aggregate()。下面写完整测试是否与期望一致:

    List lines = Arrays.asList(
      "This is a first sentence",
      "This is a second sentence with a one word");
    
    DataSet> result = WordCount.startWordCount(env, lines);
    
    List> collect = result.collect();
     
    assertThat(collect).containsExactlyInAnyOrder(
      new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),
      new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),
      new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    DataStream API 转换数据

    创建DataStream

    Apache Flink 通过DataStream API支持事件流处理。首先需要使用StreamExecutionEnvironment 类消费事件:

    StreamExecutionEnvironment executionEnvironment
     = StreamExecutionEnvironment.getExecutionEnvironment();
    
    • 1
    • 2

    接着使用executionEnvironment从不同来源创建事件流,它可以是消息总线,如Apache Kafka,但我们简单创建一组字符串元素:

    DataStream dataStream = executionEnvironment.fromElements(
      "This is a first sentence", 
      "This is a second sentence with a one word");
    
    • 1
    • 2
    • 3

    和DataSet类一样,可以对DataStream中的元素应用转换:

    SingleOutputStreamOperator upperCase = text.map(String::toUpperCase);
    
    • 1

    为了触发执行,需要执行sink操作,如print()方法,把转换结果打印至控制台,接着执行StreamExecutionEnvironment 类的execute方法:

    upperCase.print();
    env.execute();
    
    • 1
    • 2

    程序会产生下面输出结果:

    1> THIS IS A FIRST SENTENCE
    2> THIS IS A SECOND SENTENCE WITH A ONE WORD
    
    • 1
    • 2

    窗口事件

    当实时处理事件流时,可能需要把一些事件分为组,基于这些事件窗口进行计算。
    假设事件流中每个事件发送至我们系统中,其中包括事件量和时间戳。我们可以容许事件无序到达,但前提是它们的延迟不超过20秒。对于这种场景首先创建一个流来模拟两个相隔几分钟的事件,并定义一个时间戳提取器来指定延迟阈值:

    SingleOutputStreamOperator> windowed
      = env.fromElements(
      new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
      new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
      .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor
          >(Time.seconds(20)) {
     
            @Override
            public long extractTimestamp(Tuple2 element) {
              return element.f1 * 1000;
            }
        });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    接下来定义一个窗口操作,将事件分组到5秒的窗口中,并对这些事件应用转换:

    SingleOutputStreamOperator> reduced = windowed
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
      .maxBy(0, true);
    reduced.print();
    
    • 1
    • 2
    • 3
    • 4

    它将获得每5秒窗口的最后一个元素,因此它输出:

    1> (15,1491221519)
    
    • 1

    请注意,我们没有看到第二个事件,因为它的到达时间晚于指定的延迟阈值。

    总结

    本文简要介绍了Apache Flink框架,并通过示例展示如何使用一些转换API,包括利用DataSet API实现单词频次计算,利用DataStream API 实现简单实时事件流转换。

  • 相关阅读:
    形式逻辑简介
    <SQL编程工具MySQL、SQLyog安装及环境配置教程>——《SQL》
    【论文阅读】Graph Fusion Network for Text Classification
    华为云如何实现实时音视频全球低时延网络架构
    xss靶场练习之xss.haozi.me解析及答案
    对象由生到死的一些过程
    SpringMVC项目请求(请求映射路径)
    使用超核CH系列连接stm32,连接阿里云
    vue3 prop验证类型
    Vue组件间传值
  • 原文地址:https://blog.csdn.net/neweastsun/article/details/126383371