• Apache Flink如何设置并行度


    如何设置并行度

    Apache Flink支持在不同的级别设置并行度。配置文件、env级别、算子级别。

    1. 配置文件默认
      在我们提交一个Job的时候如果没有考虑并行度的话,那么Flink会使用默认配置文件中的并行度。我们可以通过命令查看Flink配置文件的并行度。

    1. $ cat flink-conf.yaml |grep "parallelism.default"
    2. parallelism.default: 1

     

    例如当前获取到的并行度为1。也就是说当你不设置并行度的时候它就会使用配置文件默认的并行度 1
    2.  env级别
    env的级别就是Environment级别。也就是通过Execution Environment来设置整体的Job并行度。

    1. val env = Stream...
    2. env.setParallelism(5)

     

    1. 客户端级别
      如果在执行Job时候,发现代码中没有设置并行度而又不修改配置文件的话,可以通过Client来设置Job的并行度。

    ./bin/flink run -p 5 ../wordCount-java*.jar
    

     

    -p即设置WordCount的Job并行度为5。4.  算子级别
    我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度,例如为了实现读取Kafka的最高效读取需要参考Kafka的partition的数量对并行度进行设置,在Sink时需要对于Sink的介质设置不同的并行度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置

    1. val env = Stream...
    2. val text = ...
    3. text.keyBy(XXX)
    4.    .flatMap(XXX).setParallelism(5)  //计算时设置为5
    5.    .addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1

     

    • 并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism

    从优先级上来看:  算子级别 > env级别 > Client级别 > 系统默认级别

    在实际的使用中,我们需要设置合理的并行度来保证数据的高效处理,在一般情况下例如source,Sink等可能会需要不同的并行度来保证数据的快速读取与写入负载等。

      

     

     

    并行度设置的数量

    Apache Flink的并行度设置并不是说越大越好、数据处理的效率就越高。而是需要设置合理的并行度。那么何谓合理呢?
    Apache Flink的 并行度取决于每个TaskManager上的slot数量而决定的。Flink的JobManager把任务分成子任务提交给slot进行执行。相同的slot共享相同的JVM资源,同时对Flink提供维护的心跳等信息。
    slot是指TaskManagere的并发执行能力,通常来说TaskManager有多少核CPU也就会有多少个slot。这样来看,我们设置的并行度其实是与TaskManager所有Slot数量有关的

     

  • 相关阅读:
    Maven3种打包方式之一maven-shade-plugin的使用
    Flink 基础概念
    GeoServer改造Springboot源码一(公共部分)
    Peter算法小课堂—球盒问题
    工业互联网的概念、体系架构及关键技术
    Nginx 的安装与使用(入门教程)
    docker network
    RTU通讯控制器S274如何操作
    Java编程学习-MySQL(函数)
    Unity --- 给游戏物体做标记 以及 快捷键补充
  • 原文地址:https://blog.csdn.net/Angel_asp/article/details/126212060