• Flink系列文档-(YY08)-Flink核心概念


     1 核心概念

    1.1 基础概念

    1. 用户通过算子api所开发的代码,会被flink任务提交客户端解析成jobGraph
    2. 然后,jobGraph提交到集群JobManager,转化成ExecutionGraph(并行化后的执行图)
    3. 然后,ExecutionGraph中的各个task会以多并行实例(subTask)部署到taskmanager上执行
    4. subTask运行的位置是taskmanager所提供的槽位(task slot),槽位简单理解就是线程;

      重要提示

    1. 一个算子的逻辑,可以封装在一个独立的task中(可以有多个运行时实例:subTask);
    2. 也可把多个算子的逻辑chain在一起后封装在一个独立的task中(可以有多个运行时实例:subTask);

    同一个task的不同运行实例,必须放在不同的task slot上运行;

    同一个task slot,可以运行多个不同task的各一个并行实例;

    1.2 task与算子链(operator chain)

      上下游算子,能否chain在一起,放在一个Task中,取决于如下3个条件:

    1. 上下游算子实例间是oneToOne数据传输(forward);
    2. 上下游算子并行度相同;
    3. 上下游算子属于相同的slotSharingGroup(槽位共享组);

    3个条件都满足,才能合并为一个task;否则不能合并成一个task;

    当然,即使满足上述3个条件,也不一定就非要把上下游算子绑定成算子链;

    flink提供了相关的api,来让用户可以根据自己的需求,进行灵活的算子链合并或拆分;

    1. setParallelism   设置算子的并行度
    2. slotSharingGroup   设置算子的槽位共享组
    3. disableChaining    对算子禁用前后链合并
    4. startNewChain    对算子开启新链(即禁用算子前链合并)

    单个并行演示

    1. package com.blok2;
    2. import org.apache.flink.configuration.Configuration;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    6. /**
    7. * @Date: 22.11.11
    8. * @Author: Hang.Nian.YY
    9. * @qq: 598196583
    10. * @Tips: 学大数据 ,到多易教育
    11. * @Description:
    12. */
    13. public class _01_Task_Chain {
    14. public static void main(String[] args) throws Exception {
    15. Configuration conf = new Configuration();
    16. conf.setInteger("rest.port", 8888);
    17. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    18. see.setParallelism(1);
    19. //1 加载数据流
    20. DataStreamSource ds = see.socketTextStream("linux01", 50820);
    21. System.out.println(ds.getParallelism()); //打印数据流的并行度
    22. //2 处理每条数据流
    23. SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase());
    24. System.out.println(ds2.getParallelism()); //打印数据流的并行度
    25. //3 处理每条数据流
    26. SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY");
    27. ds3.print("结果数据流: ") ;
    28. see.execute("路虽远行则将至");
    29. }
    30. }

    禁用自动算子链

    //1  加载数据流
    DataStreamSource ds = see.socketTextStream("linux01", 50820);
    ds.disableChaining() ;
    //2 处理每条数据流
    SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase());
    ds2.disableChaining() ;
     //3  处理每条数据流
    SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY");
    ds3.print("结果数据流: ")  ;
    

     前后subTask并行度不同

    see.setParallelism(1);
    //1  加载数据流
    DataStreamSource ds = see.socketTextStream("linux01", 50820);
    
    //2 处理每条数据流
    SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase());
    ds2.setParallelism(2) ;
     //3  处理每条数据流
    SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY");
    ds3.print("结果数据流: ")  ;
    see.execute("路虽远行则将至");
    

     自动taskchain合并, 可以手动指定阶段taskchain

    see.setParallelism(1);
    //1  加载数据流
    DataStreamSource ds = see.socketTextStream("linux01", 50820);
    //2 处理每条数据流
    SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase());
    ds2.startNewChain() ;
     //3  处理每条数据流
    SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY");
    ds3.print("结果数据流: ")  ;
    see.execute("路虽远行则将至");

     设置算子槽位共享组

    see.setParallelism(1);
    /**
     * ds ds2  ds3 默认在同一个算子槽位共享组  自动合并taskchain
     * 三个source设置同一个算子槽位共享组  自动合并taskchain
     * ds2.slotSharingGroup("a") ;
     *
     * 三个source设置不同算子槽位共享组  自动合并taskchain
     */
    //1  加载数据流
    DataStreamSource ds = see.socketTextStream("linux01", 50820);
    ds.slotSharingGroup("a") ;
    //2 处理每条数据流
    SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase());
    ds2.slotSharingGroup("b") ;
     //3  处理每条数据流
    SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY");
    ds3.slotSharingGroup("a") ;
    
    ds3.print("结果数据流: ")  ;
    see.execute("路虽远行则将至");

    1.3 分区算子

      分区算子:用于指定上游task的各并行subtask与下游task的subtask之间如何传输数据;

    Flink中,对于上下游subTask之间的数据传输控制,由ChannelSelector策略来控制,而且Flink内针对各种场景,开发了众多ChannelSelector的具体实现

       设置数据传输策略时,不需要显式指定partitioner,而是调用封装好的算子即可

     默认情况下,flink会优先使用REBALANCE分发策略

    /**
     * @Date: 22.11.12
     * @Author: Hang.Nian.YY
     * @qq: 598196583
     * @Tips: 学大数据 ,到多易教育
     * @Description:
     */
    public class _02_Partition {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.setInteger("rest.port", 8888);
            StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    
            //1  加载数据流
            DataStreamSource ds = see.socketTextStream("doitedu01", 50820);
            //2 处理每条数据流
            SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase())
                    .setParallelism(2)
                    //指定上游到下游分发数据的规则
                    .shuffle()
                    .map(line -> "YY-" + line + "-YY")
                    .setParallelism(3);
            ds2.print("结果数据流: ");
            see.execute("路虽远行则将至");
            
        }
    }
    

     

     

  • 相关阅读:
    B. Inflation-Educational Codeforces Round 103 (Rated for Div. 2)
    RabbitMQ之延迟队列
    【二】【SQL Server】如何运用SQL Server中查询设计器通关数据库期末查询大题
    美国电力传输公司使用 OpenText 内容管理平台建立具有成本效益的记录管理流程
    LVS+DR部署
    嵌入式开发环境Vscode开发STM32单片机程序
    springboot毕设项目宠物社区网站mud2d(java+VUE+Mybatis+Maven+Mysql)
    vue防抖和限流
    新基建助力智能化道路交通领域的转型发展
    linux 3.13版本nvme驱动阅读记录四
  • 原文地址:https://blog.csdn.net/qq_37933018/article/details/127814264