重要提示
同一个task的不同运行实例,必须放在不同的task slot上运行;
同一个task slot,可以运行多个不同task的各一个并行实例;
上下游算子,能否chain在一起,放在一个Task中,取决于如下3个条件:
3个条件都满足,才能合并为一个task;否则不能合并成一个task;
当然,即使满足上述3个条件,也不一定就非要把上下游算子绑定成算子链;
flink提供了相关的api,来让用户可以根据自己的需求,进行灵活的算子链合并或拆分;
单个并行演示
- package com.blok2;
-
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * @Date: 22.11.11
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _01_Task_Chain {
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
- //1 加载数据流
- DataStreamSource
ds = see.socketTextStream("linux01", 50820); - System.out.println(ds.getParallelism()); //打印数据流的并行度
- //2 处理每条数据流
- SingleOutputStreamOperator
ds2 = ds.map(line -> line.toUpperCase()); - System.out.println(ds2.getParallelism()); //打印数据流的并行度
- //3 处理每条数据流
- SingleOutputStreamOperator
ds3 = ds2.map(line -> "YY-" + line + "-YY"); - ds3.print("结果数据流: ") ;
- see.execute("路虽远行则将至");
- }
- }
禁用自动算子链
//1 加载数据流 DataStreamSourceds = see.socketTextStream("linux01", 50820); ds.disableChaining() ; //2 处理每条数据流 SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase()); ds2.disableChaining() ; //3 处理每条数据流 SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY"); ds3.print("结果数据流: ") ;
前后subTask并行度不同
see.setParallelism(1); //1 加载数据流 DataStreamSourceds = see.socketTextStream("linux01", 50820); //2 处理每条数据流 SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase()); ds2.setParallelism(2) ; //3 处理每条数据流 SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY"); ds3.print("结果数据流: ") ; see.execute("路虽远行则将至");
自动taskchain合并, 可以手动指定阶段taskchain
see.setParallelism(1); //1 加载数据流 DataStreamSourceds = see.socketTextStream("linux01", 50820); //2 处理每条数据流 SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase()); ds2.startNewChain() ; //3 处理每条数据流 SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY"); ds3.print("结果数据流: ") ; see.execute("路虽远行则将至");
设置算子槽位共享组
see.setParallelism(1); /** * ds ds2 ds3 默认在同一个算子槽位共享组 自动合并taskchain * 三个source设置同一个算子槽位共享组 自动合并taskchain * ds2.slotSharingGroup("a") ; * * 三个source设置不同算子槽位共享组 自动合并taskchain */ //1 加载数据流 DataStreamSourceds = see.socketTextStream("linux01", 50820); ds.slotSharingGroup("a") ; //2 处理每条数据流 SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase()); ds2.slotSharingGroup("b") ; //3 处理每条数据流 SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY"); ds3.slotSharingGroup("a") ; ds3.print("结果数据流: ") ; see.execute("路虽远行则将至");
分区算子:用于指定上游task的各并行subtask与下游task的subtask之间如何传输数据;
Flink中,对于上下游subTask之间的数据传输控制,由ChannelSelector策略来控制,而且Flink内针对各种场景,开发了众多ChannelSelector的具体实现
设置数据传输策略时,不需要显式指定partitioner,而是调用封装好的算子即可:
默认情况下,flink会优先使用REBALANCE分发策略
/** * @Date: 22.11.12 * @Author: Hang.Nian.YY * @qq: 598196583 * @Tips: 学大数据 ,到多易教育 * @Description: */ public class _02_Partition { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 8888); StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); //1 加载数据流 DataStreamSourceds = see.socketTextStream("doitedu01", 50820); //2 处理每条数据流 SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase()) .setParallelism(2) //指定上游到下游分发数据的规则 .shuffle() .map(line -> "YY-" + line + "-YY") .setParallelism(3); ds2.print("结果数据流: "); see.execute("路虽远行则将至"); } }