• Apache Flink如何设置并行度


    如何设置并行度

    Apache Flink支持在不同的级别设置并行度。配置文件、env级别、算子级别。

    1. 配置文件默认
      在我们提交一个Job的时候如果没有考虑并行度的话,那么Flink会使用默认配置文件中的并行度。我们可以通过命令查看Flink配置文件的并行度。

    1. $ cat flink-conf.yaml |grep "parallelism.default"
    2. parallelism.default: 1

     

    例如当前获取到的并行度为1。也就是说当你不设置并行度的时候它就会使用配置文件默认的并行度 1
    2.  env级别
    env的级别就是Environment级别。也就是通过Execution Environment来设置整体的Job并行度。

    1. val env = Stream...
    2. env.setParallelism(5)

     

    1. 客户端级别
      如果在执行Job时候,发现代码中没有设置并行度而又不修改配置文件的话,可以通过Client来设置Job的并行度。

    ./bin/flink run -p 5 ../wordCount-java*.jar
    

     

    -p即设置WordCount的Job并行度为5。4.  算子级别
    我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度,例如为了实现读取Kafka的最高效读取需要参考Kafka的partition的数量对并行度进行设置,在Sink时需要对于Sink的介质设置不同的并行度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置

    1. val env = Stream...
    2. val text = ...
    3. text.keyBy(XXX)
    4.    .flatMap(XXX).setParallelism(5)  //计算时设置为5
    5.    .addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1

     

    • 并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism

    从优先级上来看:  算子级别 > env级别 > Client级别 > 系统默认级别

    在实际的使用中,我们需要设置合理的并行度来保证数据的高效处理,在一般情况下例如source,Sink等可能会需要不同的并行度来保证数据的快速读取与写入负载等。

      

     

     

    并行度设置的数量

    Apache Flink的并行度设置并不是说越大越好、数据处理的效率就越高。而是需要设置合理的并行度。那么何谓合理呢?
    Apache Flink的 并行度取决于每个TaskManager上的slot数量而决定的。Flink的JobManager把任务分成子任务提交给slot进行执行。相同的slot共享相同的JVM资源,同时对Flink提供维护的心跳等信息。
    slot是指TaskManagere的并发执行能力,通常来说TaskManager有多少核CPU也就会有多少个slot。这样来看,我们设置的并行度其实是与TaskManager所有Slot数量有关的

     

  • 相关阅读:
    sql学习笔记(三)
    2022杭电多校七 1007-Weighted Beautiful Tree(树形DP)
    局域网内vue2 配置本地IP地址访问项目
    C++ 重载运算符和重载函数
    万字长文,带你轻松学习 Spark
    运营商停止提供公网IP地址,如何远程访问网络服务?
    使用yum install和reposync下载rpm安装包以及wget和curl下载文件
    九度 1463 招聘会(任务调度, 贪心算法)
    安装数据库中间件——Mycat
    Linux 命令行——文本处理命令:cat、sort、uniq、cut、comm、diff、patch
  • 原文地址:https://blog.csdn.net/Angel_asp/article/details/126212060