• Flink-看完就会flink基础API



    执行环境、数据源(source)、转换操作(transformation)、输出(sink)四大部分

    在这里插入图片描述

    一、执行环境(Execution Environment)

    1、创建执行环境

    // 批处理环境
    ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
    //  流处理环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    • 1
    • 2
    • 3
    • 4
    • 智能执行环境:getExecutionEnvironment( )

    最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    • 1

    这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行环境的方式。

    • 本地执行环境:createLocalEnvironment()
      这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数。
    LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    
    • 1
    • 集群执行环境:createRemoteEnvironment( )
      这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定
      要在集群中运行的 Jar 包。
    ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
            "host",
            1234,
            "path/to/jarFile.jar"
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、执行模式(Execution Mode)

    处理环境获取

    • 流执行模式(STREAMING)

    这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 STREAMING 执行模式。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //	流执行模式(STREAMING)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    
    • 1
    • 2
    • 3
    • 批执行模式(BATCH)

    专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。对于不会持续计算的有界数据,我们用这种模式处理会更方便。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //	批执行模式(BATCH
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    
    • 1
    • 2
    • 3
    • 自动模式(AUTOMATIC)

    在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //	自动模式(AUTOMATIC)
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    
    • 1
    • 2
    • 3

    Batch模式其它配置

    1. 通过命令行配置
    bin/flink run -Dexecution.runtime-mode=BATCH ...
    
    • 1

    在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。

    1. 通过代码配置
    //	获取流式处理环境
    StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
    //	采用批执行模式
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    
    • 1
    • 2
    • 3
    • 4

    3、触发程序执行

    需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当 main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”(lazy execution)。

    所以我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

    env.execute();
    
    • 1

    二、源算子(Source)

    在这里插入图片描述

    一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。

    Flink 代码中通用的添加 source 的方式,是调用执行环境的 addSource()方法:

    DataStream<String> stream = env.addSource(...);
    
    • 1

    1、数据源类准备

    Event类字段设计(抽象化数据源)

    字段名数据类型说明
    userString用户名
    urlString用户访问的url
    timestampLong用户访问的url的时间戳

    代码:

    public class Event {
        public String user;
        public String url;
        public Long timstamp;
    
        public Event(String user, String url, Long timstamp) {
            this.user = user;
            this.url = url;
            this.timstamp = timstamp;
        }
    
        public Event() {
        }
    
        @Override
        public String toString() {
            return "Event{" +
                    "user='" + user + '\'' +
                    ", url='" + url + '\'' +
                    ", timstamp=" + timstamp +
                    '}';
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    2、从集合中读取数据

    //  1、从集合中读取数据
    StreamExecutionEnvironment env01 = StreamExecutionEnvironment.getExecutionEnvironment();
    env01.setParallelism(1);
    //  创建集合
    ArrayList<Event> clicks = new ArrayList<>();
    clicks.add(new Event("Mary", "./home", 1000L));
    clicks.add(new Event("Bob", "./cart", 2000L));
    DataStreamSource<ArrayList<Event>> stream01 = env01.fromElements(clicks);
    
    stream01.print();
    env01.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    直接列举元素

    DataStreamSource<Event> steam01 = env01.fromElements(
            new Event("Mary", "./home", 1000L),
            new Event("Bob", "./cart", 2000L)
    );
    
    • 1
    • 2
    • 3
    • 4

    3、从文件中读取数据

    从存储介质中读取数据

    StreamExecutionEnvironment env02 = StreamExecutionEnvironment.getExecutionEnvironment();
    //	2、从文件中读取数据
    DataStreamSource<String> stream02 = env02.readTextFile("input/word.txt");
    
    • 1
    • 2
    • 3

    4、从Socket中读取数据

    行程流式数据读取,读取 socket 文本流。这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

    StreamExecutionEnvironment env03 = StreamExecutionEnvironment.getExecutionEnvironment();
    //  3、从Socket中读取数据
    DataStreamSource<String> stream03 = env03.socketTextStream("localhost", 7777);
    
    • 1
    • 2
    • 3

    5、从Kafka中读取数据

    Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传输方式,恰恰和流处理是完全一致的。所以可以说 Kafka 和 Flink 天生一对,是当前处理流式数据的双子星。在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分析计算,这样的架构已经成为众多企业的首选。

    在这里插入图片描述

    略微遗憾的是,与 Kafka 的连接比较复杂,Flink 内部并没有提供预实现的方法。所以我们只能采用通用的 addSource 方式、实现一个 SourceFunction 了。

    以Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的SourceFunction。

    pom依赖

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-kafka_${scala.binary.version}artifactId>
        <version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    代码

    //  4、从Kafka中读取数据
    StreamExecutionEnvironment env04 = StreamExecutionEnvironment.getExecutionEnvironment();
    env04.setParallelism(1);
    //	Kafka配置信息
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "hadoop102:9092");
    properties.setProperty("group.id", "consumer-group");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("auto.offset.reset", "latest");
    //	添加数据源(addSource)
    DataStreamSource<String> stream04 = env04.addSource(new FlinkKafkaConsumer<String>(
            "clicks",
            new SimpleStringSchema(),
            properties
    ));
    stream04.print("kafka");
    env04.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    6、自定义源算子(source)

    创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法:run()cancel()

    • run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
    • cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。

    数据源类

    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.util.Calendar;
    import java.util.Random;
    
    public class ClickSource implements SourceFunction<Event> {
    
        //    声明一个布尔常量,作为控制数据生成的标识位
        private Boolean running = true;
    
        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            Random random = new Random();
            String[] users = {"Mary", "Alice", "Bob"};
            String[] urls = {"/data/en", "/data/en1", "/data/en2"};
            while (running) {
                ctx.collect(new Event(
                        users[random.nextInt(users.length)],
                        users[random.nextInt(urls.length)],
                        Calendar.getInstance().getTimeInMillis()
                ));
    //            间隔1s生成一个点击事件,方便观看
                Thread.sleep(1000);
            }
        }
    
        @Override
        public void cancel() {
            running = false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    执行代码:

    //  5、自定义源算子
    StreamExecutionEnvironment env05 = StreamExecutionEnvironment.getExecutionEnvironment();
    //  设置并行度
    env05.setParallelism(1);
    DataStreamSource<Event> stream05 = env05.addSource(new ClickSource());
    stream05.print("sourceCustom");
    env05.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    设置2个并行度

    //  设置2个并行度(大于1)
    StreamExecutionEnvironment env06 = StreamExecutionEnvironment.getExecutionEnvironment();
    env06.addSource(new CustomerSource()).setParallelism(2).print();
    env06.execute();
    
    • 1
    • 2
    • 3
    • 4

    三、转换算子(Transformation)

    在这里插入图片描述

    数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。

    我们可以针对一条流进行转换处理,也可以进行分流、合流等多流转换操作,从而组合成复杂的数据流拓扑。

    1、基本转换算子

    1.1 映射(map)

    map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。

    在这里插入图片描述

    我们只需要基于 DataStrema 调用 map()方法就可以进行转换处理。方法需要传入的参数是接口 MapFunction 的实现;返回值类型还是 DataStream,不过泛型(流中的元素类型)可能改变。

    //	1、创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //  2、从文件中读取数据:按行读取(存储的元素就是每行的文本)
    env.setParallelism(1);
    DataStreamSource<Event> stream = env.fromElements();
    
    //  -Map-1 传入匿名类,实现MapFunction
    stream.map(new MapFunction<Event, String>() {
        @Override
        public String map(Event e) throws Exception {
            return e.user;
        }
    });
    //  -Map-2 传入MapFunction的实现类
    stream.map(new UserExtractor()).print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    1.2 过滤(filter)

    a

    filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。

    进行 filter 转换之后的新数据流的数据类型与原数据流是相同的。filter 转换需要传入的参数需要实现 FilterFunction 接口,而FilterFunction 内要实现 filter()方法,就相当于一个返回布尔类型的条件表达式。

    //  -Filter-1 传入匿名类实现FilterFunction接口
    stream.filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event e) throws Exception {
            return e.user.equals("Mary");
        }
    });
    //  -Filter-2 传入FilterFunction实现类
    stream.filter(new UserFilter()).print();
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1.3 扁平映射(flatMap)

    在这里插入图片描述

    flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

    同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。

    flatMap 操作会应用在每一个输入事件上面,FlatMapFunction 接口中定义了 flatMap 方法,用户可以重写这个方法,在这个方法中对输入数据进行处理,并决定是返回 0 个、1 个或多个结果数据。因此 flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来指定输出。希望输出结果时,只要调用收集器的.collect()方法就可以了;这个方法可以多次调用,也可以不调用。所以flatMap 方法也可以实现 map 方法和 filter 方法的功能,当返回结果是 0 个的时候,就相当于对数据进行了过滤,当返回结果是 1 个的时候,相当于对数据进行了简单的转换操作。

    //        1、创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //  2、从文件中读取数据:按行读取(存储的元素就是每行的文本)
    env.setParallelism(1);
    
    //  -FlatMap-1 传入FlatMapFunction实现类
    stream.flatMap(new MyFlatMap()).print();
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    实现类(MyFlatMap)

    class MyFlatMap implements FlatMapFunction<Event, String> {
    
        @Override
        public void flatMap(Event value, Collector<String> out) throws Exception {
            if (value.user.equals("Mary")) {
                out.collect(value.user);
            } else if (value.user.equals("Bob")) {
                out.collect(value.user);
                out.collect(value.url);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2、聚合算子(Aggregation)

    要对每个词出现的频次进行叠加统计,这种操作不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并这就是所谓的“聚合”(Aggregation),也对应着 MapReduce 中的 reduce 操作。

    2.1 按键分区(keyBy)

    在这里插入图片描述

    对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的。

    keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。

    基于不同的 key,流中的数据将被分配到不同的分区中去,所有相同的key都会聚集到同一个分区中。

    在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。

    keyBy()方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合;对于 POJO 类型,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。

    //	1、创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //  2、从文件中读取数据:按行读取(存储的元素就是每行的文本)
    env.setParallelism(1);
    DataStreamSource<Event> stream = env.fromElements();
    
    //  - 1=  按键分区-keyBy-1:Lamda表示
    KeyedStream<Event, String> keyenStream1 = stream.keyBy(e -> e.user);
    //  - 按键分区-keyBy-2:使用匿名类实现KeySelector
    KeyedStream<Event, String> keyedStream2 = stream.keyBy(new KeySelector<Event, String>() {
        @Override
        public String getKey(Event e) throws Exception {
            return e.user;
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    需要注意的是:keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为KeyedStream。KeyedStream 可以认为是“分区流”或者“键控流”,它是对 DataStream 按照key 的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 的类型。

    2.2 简单聚合(keyBy)

    有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们内置实现了一些最基本、最简单的聚合 API,主要有以下几种:

    • sum():在输入流上,对指定的字段做叠加求和的操作。

    • min():在输入流上,对指定的字段求最小值。

    • max():在输入流上,对指定的字段求最大值。

    • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据。

    • maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。

    ​ 简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。

    ​ 对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以 f0、f1、f2、…来命名的。

    DataStreamSource<Tuple2<String, Integer>> stream2 = env.fromElements(
            Tuple2.of("a", 1),
            Tuple2.of("b", 2)
    );
    //  -2=简单聚合:集合
    stream2.keyBy(r -> r.f0).sum(1).print();
    stream2.keyBy(r -> r.f0).sum("f1").print();
    stream2.keyBy(r -> r.f0).max(1).print();
    stream2.keyBy(r -> r.f0).max("f1").print();
    stream2.keyBy(r -> r.f0).min(1).print();
    stream2.keyBy(r -> r.f0).min("f1").print();
    stream2.keyBy(r -> r.f0).maxBy(1).print();
    stream2.keyBy(r -> r.f0).maxBy("f1").print();
    stream2.keyBy(r -> r.f0).minBy(1).print();
    stream2.keyBy(r -> r.f0).minBy("f1").print();
    
    //  -2=简单聚合:POJO类
    DataStreamSource<Event> stream3 = env.fromElements(
            new Event("Mary", "./home", 100L),
            new Event("Mary", "./home", 100L)
    );
    stream3.keyBy(s -> s.user).max("timstamp");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    Event类:

    public class Event {
        public String user;
        public String url;
        public Long timstamp;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    2.3 归约聚合(reduce)

    它可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。

    与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。

    案例:
    我们将数据流按照用户 id 进行分区,然后用一个 reduce 算子实现 sum 的功能,统计每个用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce 算子实现 maxBy 的功能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。

    //  -3规约聚合(reduce)
    SingleOutputStreamOperator<Tuple2<String, Long>> stream03 = env.addSource(new MyClickSource())
            .map(new MapFunction<Event, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(Event value) throws Exception {
                    return Tuple2.of(value.user, 1L);
                }
            });
    
    //  使用用户名,进行分流
    stream03.keyBy(s -> s.f0)
            .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                   //   每到1条数据,用户PV的统计值+1
                    return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                }
            })
              .map(s->{ Tuple2<String, Long> s1 = s;})
           //   为每一条数据分配同一个key,将聚合结果发送到1条数据流中
            .keyBy(r -> true)
            .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                    //  将累加器更新为当前最大的PV统计值,然后向下游发送累加器的值
                    return value1.f1 > value2.f1 ? value1 : value2;
                }
            })
            .print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    3、用户自定义函数(UDF)

    概念:可以通过自定义函数类或者匿名类来实现接口,也可以直接传入 Lambda 表达式。这就是所谓的用户自定义函数(user-defined function,UDF)

    3.1 函数类(Function Classes)

    对于大部分操作而言,都需要传入一个用户自定义函数(UDF),实现相关操作的接口,来完成处理逻辑的定义。Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,最简单直接的方式,就是自定义一个函数类,实现对应的接口。

    例如: MapFunction、FilterFunction、ReduceFunction 等。

    实现了 FilterFunction 接口,用来筛选 url 中包含“home”的事件:

    //  0、创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //  -1 函数类型
    DataStreamSource<Event> stream01 = env.fromElements(
            new Event("Mary", "/hom", 100L),
            new Event("Mary", "/hom", 100L)
    );
    stream01.flatMap(new MyFlatMap2()).print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    自定义筛选类:

    class MyFlatMap2 implements FlatMapFunction<Event, String> {
        @Override
        public void flatMap(Event value, Collector<String> out) throws Exception {
            if (value.user.equals("Mary")) {
                out.collect(value.user);
            } else if (value.user.equals("Bob")) {
                out.collect(value.user);
                out.collect(value.url);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    匿名函数实现

    //  -1 函数类型 -匿名函数
    SingleOutputStreamOperator<Event> data = stream01.filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.url.contains("home");
        }
    });
    data.print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    3.2 Lamdba表达式(匿名函数)

    Flink 的所有算子都可以使用 Lambda 表达式的方式来进行编码,但是,当 Lambda 表达式使用 Java 的泛型时,我们需要显式的声明类型信息。

    下例演示了如何使用 Lambda 表达式来实现一个简单的 map() 函数,我们使用 Lambda 表达式来计算输入的平方。在这里,我们不需要声明 map() 函数的输入 i 和输出参数的数据类型,因为 Java 编译器会对它们做出类型推断。

    //  -2 lambda表达式
    DataStreamSource<Event> stream2 = env.fromElements(
            new Event("Mary", "/hom", 100L),
            new Event("Mary", "/hom", 100L)
    );
    stream2.map(s -> s.url).print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    指定返回类型为String

    //  指定返回值类型(string)
     DataStream<String> returns = stream2.flatMap((Event a, Collector<String> out) -> {
         out.collect(a.url);
     }).returns(Types.STRING);
    
    • 1
    • 2
    • 3
    • 4
    3.3 复函数类(Rich Function Classes)

    “富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其Rich 版本。富函数类一般是以抽象类的形式出现的。富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

    例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。

    Rich Function 有生命周期的概念。典型的生命周期方法有:

    • open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调用。所以像文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在 open()方法中完成。

    • close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。

    // 复函数类(Rich Function Classes)
    StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
    env2.setParallelism(2);
    DataStreamSource<Event> stream3 = env2.fromElements(
            new Event("Mary", "/hom", 1000L),
            new Event("Mary", "/hom", 2000L),
            new Event("Mary", "/hom", 3000L),
            new Event("Mary", "/hom", 4000L)
    );
    
    //  将点击事件转换成长整型的时间戳输出
    stream3.map(new RichMapFunction<Event, Long> () {
        @Override
        public void open(Configuration parameters) throws Exception {
           //   初始化一些工作(建立MySQL连接)
            super.open(parameters);
            System.out.println("索引为:" + getRuntimeContext().getIndexOfThisSubtask()+"的任务开始");
        }
        @Override
        public Long map(Event value) throws Exception {
            //  对数据库进行读/写
            return value.timstamp;
        }
        @Override
        public void close() throws Exception {
            //  清理工作(关闭MySQL连接)
            super.close();
            System.out.println("索引为:" + getRuntimeContext().getIndexOfThisSubtask()+"执行任务结束");
        }
    }).print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    输出结果:

    输出结果:
    索引为:0 的任务开始
    索引为:1 的任务开始
    1> 1000
    2> 2000
    2> 4000
    1> 3000
    索引为:0 执行任务结束
    索引为:1 执行任务结束
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    常用的操作步骤:

    public class MyFlatMap extends RichMapFunction<IN, OUT> {
    
        @Override
        public void open(Configuration configuration) {
            // 做一些初始化工作
            // 例如建立一个和 MySQL 的连接
        }
    
        @Override
        public void flatMap(IN in, Collector<OUT out) {
            // 对数据库进行读写
        }
    
        @Override
        public void close() {
        // 清理工作,关闭和 MySQL 数据库的连接。
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    4、物理分区(Physical Partitioning)

    keyBy():按照键的哈希值来进行重新分区的操作。只不过这种分区操作只能保证把数据按key“分开”,至于分得均不均匀、每个 key 的数据具体会分到哪一区去,这些是完全无从控制的——所以我们有时也说,keyBy 是一种逻辑分区(logical partitioning)操作。

    物理分区:物理分区与 keyBy 另一大区别在于,keyBy 之后得到的是一个 KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式。

    常见的物理分区策略:

    • 随机分配(Random)
    • 轮询分配(Round-Robin)
    • 重缩放(Rescale)
    • 广播(Broadcast)
    4.1 随机分区(shuffle)

    在这里插入图片描述

    随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区,因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

    经过随机分区之后,得到的依然是一个 DataStream。

    案例:将数据读入之后直接打印到控制台,将输出的并行度设置为 4,中间经历一次 shuffle。执行多次,观察结果是否相同。

    //	1、创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //  - 1、随机分区(.shuffle())
    env.setParallelism(1);//并行度为1
    DataStreamSource<Event> stream01 = env.addSource(new MyClickSource());
    // 经洗牌后打印输出,并行度为4(分4个线程进行数据打印-处理)
    stream01.shuffle().print("shuffle").setParallelism(4);
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    执行结果:

    shuffle:1> Event{user='Bob', url='./cart', timestamp=...}
    shuffle:4> Event{user='Cary', url='./home', timestamp=...}
    shuffle:3> Event{user='Alice', url='./fav', timestamp=...}
    shuffle:4> Event{user='Cary', url='./cart', timestamp=...}
    shuffle:3> Event{user='Cary', url='./fav', timestamp=...}
    shuffle:1> Event{user='Cary', url='./home', timestamp=...}
    shuffle:2> Event{user='Mary', url='./home', timestamp=...} 
    shuffle:1> Event{user='Bob', url='./fav', timestamp=...}
    shuffle:2> Event{user='Mary', url='./home', timestamp=...}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    4.2 轮询分区(Round-Robin)

    在这里插入图片描述

    轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

    注:Round-Robin 算法用在了很多地方,例如 Kafka 和 Nginx。

    //  - 2、轮询分区(.rebalance())
    env.setParallelism(1);
    //  读取数据源,并行度为1
    DataStreamSource<Event> stream02 = env.addSource(new MyClickSource());
    //  经轮询重分组后打印输出,并行度为4
    stream02.rebalance().print("rebalance").setParallelism(4);
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    结果:数据被平均分配到所有并行任务中去了。

    rebalance:2> Event{user='Cary', url='./fav', timestamp=...}
    rebalance:3> Event{user='Mary', url='./cart', timestamp=...}
    rebalance:4> Event{user='Mary', url='./fav', timestamp=...}
    rebalance:1> Event{user='Cary', url='./home', timestamp=...}
    rebalance:2> Event{user='Cary', url='./cart', timestamp=...}
    rebalance:3> Event{user='Alice', url='./prod?id=1', timestamp=...}
    rebalance:4> Event{user='Cary', url='./prod?id=2', timestamp=...}
    rebalance:1> Event{user='Bob', url='./prod?id=2', timestamp=...}
    rebalance:2> Event{user='Alice', url='./prod?id=1', timestamp=...}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    4.3 重缩放分区(rescale)

    在这里插入图片描述

    ​ 重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,也就是说,“发牌人”如果有多个,那么 rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

    底层原理:

    从底层实现上看,rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源。

    //  - 3、重缩放分区(.rescale())
    env.setParallelism(1);
    //
    DataStreamSource<Integer> stream03 = env.addSource(new RichParallelSourceFunction<Integer>() {
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for (int i = 0; i < 8; i++) {
                //  将奇数发送到索引为1的并行子任务上
                //  将偶数发送到索引为0的并行子任务上
                //  这里使用了并行数据源的富函数版本
                //  这样可以调用 getRuntimeContext 方法来获取运行时上下文的一些信息
                if ((i + 1) % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                    ctx.collect(i + 1);
                }
            }
        }
        @Override
        public void cancel() {
        }
    }).setParallelism(2);
    //  执行重缩放分区
    stream03.rescale().print().setParallelism(4);
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    执行结果:

    4> 3
    3> 1
    1> 2
    1> 6
    3> 5
    4> 7
    2> 4
    2> 8
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    4.4 广播(broadcast)

    这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

    //  - 4、广播()
    env.setParallelism(1);
    //  读取数据源,并行度为1
    DataStreamSource<Event> stream04 = env.addSource(new MyClickSource());
    //  经广播后打印输出,并行度为4
    stream04.broadcast().print("broadcast").setParallelism(4);
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    输出结果:(数据被复制然后广播到了下游的所有并行任务中去了)

    broadcast:3> Event{user='Mary', url='./cart', timestamp=...}
    broadcast:1> Event{user='Mary', url='./cart', timestamp=...}
    broadcast:4> Event{user='Mary', url='./cart', timestamp=...}
    broadcast:2> Event{user='Mary', url='./cart', timestamp=...}
    broadcast:2> Event{user='Alice', url='./fav', timestamp=...}
    broadcast:1> Event{user='Alice', url='./fav', timestamp=...}
    broadcast:3> Event{user='Alice', url='./fav', timestamp=...}
    broadcast:4> Event{user='Alice', url='./fav', timestamp=...}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    4.5 全局分区(global)

    全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

    //  - 5、全局分区
    stream04.global().print();
    
    • 1
    • 2
    4.6 自定义分区(Custom)

    当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用partitionCustom()方法来自定义分区策略。

    在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector。

    案例:我们可以对一组自然数按照奇偶性进行重分区。

    //  将自然数按奇偶进行分区
    DataStreamSource<Integer> stream06 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
    stream06.partitionCustom(new Partitioner<Integer>() {
                //	自定义分区规则
                @Override
                public int partition(Integer key, int numPartitions) {
                    return key % 2;
                }
            }, new KeySelector<Integer, Integer>() {
                @Override
                public Integer getKey(Integer value) throws Exception {
                    return value;
                }
            })
            .print().setParallelism(2);
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    四、输出算子(Sink)

    在这里插入图片描述

    Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持,本节将主要讲解 Flink 中的 Sink 操作。我们已经了解了 Flink 程序如何对数据进行读取、转换等操作,最后一步当然就应该将结果数据保存或输出到外部系统了。

    1、连接到外部系统

    ​ 为了避免这样的问题,Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算 子完成的。

    ​ Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论怎样理解,Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”。

    ​ 之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。查看源码可以发现,print 方法返回的就是一个 DataStreamSink。

    @PublicEvolving
    public DataStreamSink<T> print() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
        return addSink(printFunction).name("Print to Std. Out");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下 Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

    stream.addSink(new SinkFunction());
    
    • 1

    Flink 官方目前支持的第三方系统连接器:

    在这里插入图片描述

    2、输出到文件

    ​ Flink 为此专门提供了一个流式文件系统的连接器:StreamingFileSink,它继承自抽象类RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactly once)的一致性语义。

    ​ StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统。它可以保证精确一次的状态一致性,大大改进了之前流式文件 Sink 的方式。它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据。

    ​ StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:

    • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。
    • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。

    在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。

    案例:测试数据直接写入文件:

    //  -1、输出到文件
    //  创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);
    //	数据源
    DataStreamSource<Event> stream01 = env.fromElements(
            new Event("Mary", "./home", 1000L),
            new Event("Alice", "./home", 2000L),
            new Event("Bob", "./home", 3000L),
            new Event("Mary", "./prod", 4000L)
    );
    //	写出到文件相关配置
    StreamingFileSink<String> fileSink = StreamingFileSink
            .<String>forRowFormat(new Path("./output"),
                    new SimpleStringEncoder<>("UTF-8"))
            .withRollingPolicy(
                    DefaultRollingPolicy.builder()
                            //  至少包含15分钟数据
                            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                            //  最近5分钟没有收到新的数据
                            .withInactivityInterval(TimeUnit.MILLISECONDS.toMillis(5))
                            //  文件大小已达到1GB。
                            .withMaxPartSize(1024 * 1024 * 1024)
                            .build())
            .build();
    //  将event转为String写入文件
    stream01.map(Event::toString).addSink(fileSink);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    这里我们创建了一个简单的文件 Sink,通过.withRollingPolicy()方法指定了一个“滚动策

    略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以

    我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面

    的代码设置了在以下 3 种情况下,我们就会滚动分区文件:

    • 至少包含 15 分钟的数据。
    • 最近 5 分钟没有收到新的数据。
    • 文件大小已达到 1 GB。

    3、连接到Kafka

    ​ Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink“天生一对”,经常会作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。

    ​ Flink 从 Kakfa 的一个 topic 读取消费数据,然后进行处理转换,最终将结果数据写入 Kafka 的另一个 topic——数据从 Kafka 流入、经Flink处理后又流回到 Kafka 去,这就是所谓的“数据管道”应用。

    ​ 真正让它们密不可分的是,Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。

    案例:将用户行为数据保存为文件 clicks.csv,读取后不做转换直接写入 Kafka,主题(topic)命名为clicks

    //  -2、输出到Kafka
    env.setParallelism(1);
    //  加载数据文件
    DataStreamSource<String> stream02 = env.readTextFile("input/clkicks.csv");
    //  加载Kafka配置
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "hadoop102:9092");
    //	执行输出源
    stream02.addSink(new FlinkKafkaProducer<String>(
            "clicks",
            new SimpleStringSchema(),
            properties
    ));
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    可以看到,addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性。

    4、输出到Redis

    Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目还是担任了合格的辅助角色,为我们提供了 Flink-Redis 的连接工具。但版本升级略显滞后,目前连接器版本为 1.0,支持的Scala 版本最新到 2.11。

    导入的 Redis 连接器依赖

    
    <dependency>
        <groupId>org.apache.bahirgroupId>
        <artifactId>flink-connector-redis_2.11artifactId>
        <version>1.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    连接器为我们提供了一个 RedisSink,它继承了抽象类 RichSinkFunction,这就是已经实现好的向 Redis 写入数据的 SinkFunction。我们可以直接将 Event 数据输出到 Redis:

    //  -3、输出到Redis
    env.setParallelism(1);
    //  获取数据
    DataStreamSource<Event> stream03 = env.addSource(new MyClickSource());
    //  配置Redis连接信息
    FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").build();
    stream03.addSink(new RedisSink<>(conf, new MyRedisMapper()));
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    是定义一个 Redis 的映射类,实现 RedisMapper 接口。

    class MyRedisMapper implements RedisMapper<Event> {
        @Override
        public RedisCommandDescription getCommandDescription() {
          //	RedisCommand.HSET:存储类型为Hash类型
            return new RedisCommandDescription(RedisCommand.HSET, "clicks");
        }
    
        @Override
        public String getKeyFromData(Event event) {
            return event.user;
        }
    
        @Override
        public String getValueFromData(Event event) {
            return event.url;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这里 RedisSink 的构造方法需要传入两个参数:

    • JFlinkJedisConfigBase:Jedis 的连接配置。
    • RedisMapper:Redis 映射类接口,说明怎样将数据转换成可以写入 Redis 的类型。

    5、输出到ES

    ElasticSearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据。ElasticSearch有着简洁的 REST 风格的 API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域应用非常广泛。

    Flink 为 ElasticSearch 专门提供了官方的 Sink 连接器。

    pom依赖

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-elasticsearch6_${scala.binary.version}artifactId>
        <version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出到ES中

    //  -4、输出到ES
    DataStreamSource<Event> stream04 = env.fromElements(
            new Event("Bob", "./cart", 100L),
            new Event("Alice", "./home", 200L),
            new Event("Bob", "./cart", 300L),
            new Event("Alice", "./home", 400L)
    );
    ArrayList<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(new HttpHost("hadopp102", 9200, "http"));
    
    //  创建一个ES的Function
    ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {
        @Override
        public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
            HashMap<String, String> data = new HashMap<>();
            data.put(event.user, event.url);
            IndexRequest request = Requests.indexRequest().index("clicks")
                    .type("type")
                    .source(data);
            requestIndexer.add(request);
        }
    };
    //	绑定输出数据源(ES)
    stream04.addSink(new ElasticsearchSink.Builder<Event>(httpHosts, elasticsearchSinkFunction).build());
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    与 RedisSink 类 似 , 连 接 器 也 为 我 们 实 现 了 写 入 到 Elasticsearch 的SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用 ElasticsearchSink 的 Builder 内部静态类,调用它的 build()方法才能创建出真正的SinkFunction。

    而 Builder 的构造方法中又有两个参数:

    • httpHosts:连接到的 Elasticsearch 集群主机列表
    • elasticsearchSinkFunction:这并不是我们所说的 SinkFunction,而是用来说明具体处理逻辑、准备数据向 Elasticsearch 发送请求的函数。

    具体的操作需要重写中 elasticsearchSinkFunction 中的 process 方法,我们可以将要发送的数据放在一个 HashMap 中,包装成 IndexRequest 向外部发送 HTTP 请求。

    6、输出到 MySQL(JDBC)

    关系型数据库有着非常好的结构化数据设计、方便的 SQL 查询,是很多企业中业务数据存储的主要形式。MySQL 就是其中的典型代表。

    
    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-jdbc_${scala.binary.version}artifactId>
        <version>${flink.version}version>
    dependency>
    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>5.1.47version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    代码:

    //  -5、输出到MySql
    env.setParallelism(1);
    //	数据源
    DataStreamSource<Event> stream05 = env.fromElements(
            new Event("Mary", "./home", 100L),
            new Event("Alice", "./var", 200L)
    );
    //	sql/statementBuilder/executionOptions/connectionOptions
    stream05.addSink(
            JdbcSink.sink("INSERT INTO clicks (user, url) VALUES (?, ?)",
                    (statement, r) -> {
                        statement.setString(1, r.user);
                        statement.setString(2, r.url);
                    },
                    JdbcExecutionOptions.builder().
                            withBatchSize(1000)
                            .withBatchIntervalMs(200)
                            .withMaxRetries(5)
                            .build(),
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withDriverName("com.mysql.cj.jdbc.Driver")
                            .withUsername("username")
                            .withPassword("passwod")
                            .build()
            )
    );
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    7、自定义 Sink 输出

    如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,又该怎么办呢?

    ​ 与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类,只要实现它,通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。之前与外部系统的连接,其实都是连接器帮我们实现了 SinkFunction。

    ​ 我们这里使用了 SinkFunction 的富函数版本,因为这里我们又使用到了生命周期的概念,创建 HBase 的连接以及关闭 HBase 的连接需要分别放在 open()方法和 close()方法中。

    • open():开启链接
    • close():关闭连接
    • invoke():执行逻辑

    pom依赖

    <dependency>
     <groupId>org.apache.hbasegroupId>
     <artifactId>hbase-clientartifactId>
     <version>${hbase.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    代码:

    //  -6、自定义sink输出
    env.setParallelism(1);
    //	绑定数据源
    DataStreamSource<String> stream06 = env.fromElements("hello", "word");
    //	添加自定义输出
    stream06.addSink(
            new RichSinkFunction<String>() {
                //  管理Hbase的配置信息,这里因为是Configuration的重名问题,将类以完成路径导入
                public org.apache.hadoop.conf.Configuration configuration;
                //  管理Hbase连接
                public Connection connection;
                //  打开连接
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    configuration = HBaseConfiguration.create();
                    configuration.set("hbase.zookeeper.quorum", "hadoop102:2181");
                    connection = ConnectionFactory.createConnection(configuration);
                }
                //  执行的步骤
                @Override
                public void invoke(String value, Context context) throws Exception {
                      表名为test
                    Table table = connection.getTable(TableName.valueOf("test"));
                    //  指定rowKey
                    Put put = new Put("row".getBytes(StandardCharsets.UTF_8));
                    put.addColumn("info".getBytes(StandardCharsets.UTF_8),
                              写入的数据
                            value.getBytes(StandardCharsets.UTF_8),
                              写入的数据
                            "1".getBytes(StandardCharsets.UTF_8));
                      执行put操作
                    table.put(put);
                      将表关闭
                    table.close();
                    invoke(value);
                }
                //  关闭连接操作
                @Override
                public void close() throws Exception {
                    super.close();
                    //  关闭连接
                    connection.close();
                }
            }
    );
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
  • 相关阅读:
    scrapy爬取数据
    语音芯片KT142C两种音频输出方式PWM和DAC的区别
    ubuntu安装 miniconda 同步时间
    vue的路由
    软考视频,系统架构师视频、系统分析师视频、软件设计师视频,试卷,真题
    如何根据不同仪器选择适合的电源模块?
    AI面试必备-《家居必备的AI精选资源列表》免费分享
    架构基本概念和架构本质
    详解欧拉计划第72题:分数计数
    9月8日上课内容 第一章 rsync远程同步
  • 原文地址:https://blog.csdn.net/weixin_44624117/article/details/127426347