• 207.Flink(二):架构及核心概念,flink从各种数据源读取数据,各种算子转化数据,将数据推送到各数据源


    一、Flink架构及核心概念

    1.系统架构

    • JobMaster是JobManager中最核心的组件,负责处理单独的作业(Job)。
    • 一个job对应一个jobManager

     2.并行度

    (1)并行度(Parallelism)概念

    一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。

    流程序的并行度 = 其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。

    (2)设置并行度

    对某个具体算子设置并行度:

    stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);

    全局设置并行度:

    env.setParallelism(2);

    提交任务时指定:

    • 通过页面上传jar的时候可以指定
    • 可以在命令行启动的时候通过 -p 3指定

    flink-conf.yaml中配置:

    parallelism.default: 2

    优先级:

    代码中具体算子 > 代码中全局 > 提交任务指定 > 配置文件中指定

    3.算子链

    (1)算子间的数据传输

    *1)一对一(One-to-one,forwarding)

    这种模式下,数据流维护着分区以及元素的顺序。它们之间不需要重新分区,也不需要调整数据的顺序。map、filter、flatMap等算子都是这种one-to-one的对应关系。这种关系类似于Spark中的窄依赖。

    *2)重分区(Redistributing)

    在这种模式下,数据流的分区会发生改变。每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程,这一过程类似于Spark中的shuffle。

    (2)合并算子链

    在Flink中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分 

    1. // 禁用算子链,该算子不会和前面和后面串在一起
    2. .map(word -> Tuple2.of(word, 1L)).disableChaining();
    3. // 全局禁用算子链
    4. env.disableChaining();
    5. // 从当前算子开始新链
    6. .map(word -> Tuple2.of(word, 1L)).startNewChain()

    • 当一对一的时候,每个运算量都很大,这个时候不适合串在一起。
    • 当需要定位具体问题的时候,不串在一起更容易排查问题

    4.任务槽

    (1)任务槽(Task Slots)概念

    Flink中每一个TaskManager都是一个JVM进程,它可以启动多个独立的线程,来并行执行多个子任务(subtask)。

    TaskManager的计算资源是有限的,为了控制并发量,TaskManager对每个任务运行所占用的内存资源做出明确的划分,这就是所谓的任务槽(task slots)。

    每个任务槽的大小是均等的,且任务槽之间的资源不可以互相借用。

    如图,每个TaskManager有三个任务槽,每个槽运行自己的任务。槽的大小均等。

    (2)任务槽数量的设置

    在Flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中,可以设置TaskManager的slot数量,默认是1个slot。

    taskmanager.numberOfTaskSlots: 8

    slot目前仅仅用来隔离内存,不会涉及CPU的隔离。在具体应用时,建议将slot数量配置为机器的CPU核心数。

    (3)任务对任务槽的共享

    在同一个作业中,不同任务节点的并行子任务可以放在同一个slot上执行

     可以共享:

    • 同一个job中,不同算子的子任务才可以共享同一个slot。这些子任务是同时运行
    • 前提是:属于同一个slot共享组,默认都是“default”

    手动指定共享组:

    .map(word -> Tuple2.of(word, 1L)).slotSharingGroup("1");

    共享的好处:允许我们保存完整的作业管道。这样一来,即使某个TaskManager出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行

    (4)任务槽和并行度的关系

    • 任务槽是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置
    • 并行度是动态概念,也就是TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置

    如果是yarn模式,申请的TaskManager的数量 = job并行度 / 每个TM的slot数量,向上取整

    即:假设10个并行度的job,每个TM的slot是3个,那么需要10/3,向上取整,即需要最少4个TaskManager

    二、作业提交流程

    1.Standalone会话模式作业提交流程

    逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)。

    • 逻辑流图:列出并行度,算子,各算子之间关系(一对一还是需要重分区)
    • 作业图:将一对一的算子做算子链的优化,作业中间会有中间结果集
    • 执行图:将并行度展开,并标注每个并行处理的算子
    • 物理图:基本同执行图,是执行图的落地

    2.Yarn应用模式作业提交流程

    三、 DataStream API

    DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。

    1.执行环境(Execution Environment)

    (1)创建执行环境

    *1)StreamExecutionEnvironment.getExecutionEnvironment();

    它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境

    *2)StreamExecutionEnvironment.createLocalEnvironment();

    这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数

    *3)StreamExecutionEnvironment
              .createRemoteEnvironment(
                "host",                   // JobManager主机名
                1234,                     // JobManager进程端口号
                   "path/to/jarFile.jar"  // 提交给JobManager的JAR包
            );

    这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。

     (2)执行模式(Execution Mode)

    流批一体:代码api是同一套,可以指定为 批,也可以指定为 流。

    通话代码配置:

    env.setRuntimeMode(RuntimeExecutionMode.BATCH);

    通过命令行配置:

    bin/flink run -Dexecution.runtime-mode=BATCH

    (3)触发程序执行

    当main()方法被调用时,并没有真正处理数据。只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。

    所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

    如果在一段代码里面执行多个任务,可以使用env.executeAsync();

    1. package com.atguigu.env;
    2. import org.apache.flink.api.common.RuntimeExecutionMode;
    3. import org.apache.flink.api.common.typeinfo.Types;
    4. import org.apache.flink.api.java.tuple.Tuple2;
    5. import org.apache.flink.configuration.Configuration;
    6. import org.apache.flink.configuration.RestOptions;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. import org.apache.flink.util.Collector;
    9. /**
    10. * TODO
    11. *
    12. * @author cjp
    13. * @version 1.0
    14. */
    15. public class EnvDemo {
    16. public static void main(String[] args) throws Exception {
    17. Configuration conf = new Configuration();
    18. conf.set(RestOptions.BIND_PORT, "8082");
    19. StreamExecutionEnvironment env = StreamExecutionEnvironment
    20. // .getExecutionEnvironment(); // 自动识别是 远程集群 ,还是idea本地环境
    21. .getExecutionEnvironment(conf); // conf对象可以去修改一些参数
    22. // .createLocalEnvironment()
    23. // .createRemoteEnvironment("hadoop102", 8081,"/xxx")
    24. // 流批一体:代码api是同一套,可以指定为 批,也可以指定为 流
    25. // 默认 STREAMING
    26. // 一般不在代码写死,提交时 参数指定:-Dexecution.runtime-mode=BATCH
    27. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    28. env
    29. // .socketTextStream("hadoop102", 7777)
    30. .readTextFile("input/word.txt")
    31. .flatMap(
    32. (String value, Collector<Tuple2<String, Integer>> out) -> {
    33. String[] words = value.split(" ");
    34. for (String word : words) {
    35. out.collect(Tuple2.of(word, 1));
    36. }
    37. }
    38. )
    39. .returns(Types.TUPLE(Types.STRING, Types.INT))
    40. .keyBy(value -> value.f0)
    41. .sum(1)
    42. .print();
    43. env.execute();
    44. /** TODO 关于execute总结(了解)
    45. * 1、默认 env.execute()触发一个flink job:
    46. * 一个main方法可以调用多个execute,但是没意义,指定到第一个就会阻塞住
    47. * 2、env.executeAsync(),异步触发,不阻塞
    48. * => 一个main方法里 executeAsync()个数 = 生成的flink job数
    49. * 3、思考:
    50. * yarn-application 集群,提交一次,集群里会有几个flink job?
    51. * =》 取决于 调用了n个 executeAsync()
    52. * =》 对应 application集群里,会有n个job
    53. * =》 对应 Jobmanager当中,会有 n个 JobMaster
    54. */
    55. // env.executeAsync();
    56. // ……
    57. // env.executeAsync();
    58. }
    59. }

    2.源算子(Source)

    从Flink1.12开始,主要使用流批统一的新Source架构:

    DataStreamSource<String> stream = env.fromSource(…)

    (1)创建pojo对象

    需要空参构造器,所有属性的类型都是可以序列化的

    1. package com.atguigu.bean;
    2. import java.util.Objects;
    3. /**
    4. * TODO
    5. *
    6. * @author cjp
    7. * @version 1.0
    8. */
    9. public class WaterSensor {
    10. public String id;//水位传感器类型
    11. public Long ts;//传感器记录时间戳
    12. public Integer vc;//水位记录
    13. // 一定要提供一个 空参 的构造器
    14. public WaterSensor() {
    15. }
    16. public WaterSensor(String id, Long ts, Integer vc) {
    17. this.id = id;
    18. this.ts = ts;
    19. this.vc = vc;
    20. }
    21. public String getId() {
    22. return id;
    23. }
    24. public void setId(String id) {
    25. this.id = id;
    26. }
    27. public Long getTs() {
    28. return ts;
    29. }
    30. public void setTs(Long ts) {
    31. this.ts = ts;
    32. }
    33. public Integer getVc() {
    34. return vc;
    35. }
    36. public void setVc(Integer vc) {
    37. this.vc = vc;
    38. }
    39. @Override
    40. public String toString() {
    41. return "WaterSensor{" +
    42. "id='" + id + '\'' +
    43. ", ts=" + ts +
    44. ", vc=" + vc +
    45. '}';
    46. }
    47. @Override
    48. public boolean equals(Object o) {
    49. if (this == o) {
    50. return true;
    51. }
    52. if (o == null || getClass() != o.getClass()) {
    53. return false;
    54. }
    55. WaterSensor that = (WaterSensor) o;
    56. return Objects.equals(id, that.id) &&
    57. Objects.equals(ts, that.ts) &&
    58. Objects.equals(vc, that.vc);
    59. }
    60. @Override
    61. public int hashCode() {
    62. return Objects.hash(id, ts, vc);
    63. }
    64. }

    (2)从集合中读取数据

    1. package com.atguigu.source;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. /**
    5. * TODO
    6. *
    7. * @author cjp
    8. * @version 1.0
    9. */
    10. public class CollectionDemo {
    11. public static void main(String[] args) throws Exception {
    12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    13. // TODO 从集合读取数据
    14. DataStreamSource<Integer> source = env
    15. .fromElements(1,2,33); // 从元素读
    16. // .fromCollection(Arrays.asList(1, 22, 3)); // 从集合读
    17. source.print();
    18. env.execute();
    19. }
    20. }

    (3)从文件读取数据

    先添加配置:

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-connector-files</artifactId>
    4. <version>1.17.0</version>
    5. </dependency>
    1. package com.atguigu.source;
    2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    3. import org.apache.flink.connector.file.src.FileSource;
    4. import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
    5. import org.apache.flink.core.fs.Path;
    6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    7. /**
    8. * TODO
    9. *
    10. * @author cjp
    11. * @version 1.0
    12. */
    13. public class FileSourceDemo {
    14. public static void main(String[] args) throws Exception {
    15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    16. env.setParallelism(1);
    17. // TODO 从文件读: 新Source架构
    18. FileSource<String> fileSource = FileSource
    19. .forRecordStreamFormat(
    20. new TextLineInputFormat(),
    21. new Path("input/word.txt")
    22. )
    23. .build();
    24. env
    25. .fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource")
    26. .print();
    27. env.execute();
    28. }
    29. }
    30. /**
    31. *
    32. * 新的Source写法:
    33. * env.fromSource(Source的实现类,Watermark,名字)
    34. *
    35. */

    (4)从Socket读取数据

    DataStream<String> stream = env.socketTextStream("localhost", 7777);

    (5)从Kafka读取数据

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-connector-kafka</artifactId>
    4. <version>1.17.0</version>
    5. </dependency>
    1. package com.atguigu.source;
    2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    4. import org.apache.flink.connector.kafka.source.KafkaSource;
    5. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    7. import java.time.Duration;
    8. /**
    9. * TODO
    10. *
    11. * @author cjp
    12. * @version 1.0
    13. */
    14. public class
  • 相关阅读:
    定时器+按键控制LED流水灯模式+定时器时钟——“51单片机”
    【Vue】描述项目中两个功能模块的业务(一点见解)
    空域变换-直方图匹配(直方图规定化)
    你可能不太了解的前端知识
    产品研发团队协作神器!10款提效工具大盘点!
    uniapp-地区的四级联动
    Netty学习笔记
    vue大型电商项目尚品汇(后台篇)day03
    183.Hive(五):面试题解析:连续问题、分组问题、间隔分组问题、打折日期交叉问题,同时在线问题
    两台Linux机器scp不输密码
  • 原文地址:https://blog.csdn.net/qq_40594696/article/details/132483926