重要提示

同一个task的不同运行实例,必须放在不同的task slot上运行;
同一个task slot,可以运行多个不同task的各一个并行实例;
上下游算子,能否chain在一起,放在一个Task中,取决于如下3个条件:
3个条件都满足,才能合并为一个task;否则不能合并成一个task;

当然,即使满足上述3个条件,也不一定就非要把上下游算子绑定成算子链;
flink提供了相关的api,来让用户可以根据自己的需求,进行灵活的算子链合并或拆分;
单个并行演示
- package com.blok2;
-
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * @Date: 22.11.11
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _01_Task_Chain {
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
- //1 加载数据流
- DataStreamSource
ds = see.socketTextStream("linux01", 50820); - System.out.println(ds.getParallelism()); //打印数据流的并行度
- //2 处理每条数据流
- SingleOutputStreamOperator
ds2 = ds.map(line -> line.toUpperCase()); - System.out.println(ds2.getParallelism()); //打印数据流的并行度
- //3 处理每条数据流
- SingleOutputStreamOperator
ds3 = ds2.map(line -> "YY-" + line + "-YY"); - ds3.print("结果数据流: ") ;
- see.execute("路虽远行则将至");
- }
- }

禁用自动算子链
//1 加载数据流 DataStreamSourceds = see.socketTextStream("linux01", 50820); ds.disableChaining() ; //2 处理每条数据流 SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase()); ds2.disableChaining() ; //3 处理每条数据流 SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY"); ds3.print("结果数据流: ") ;

前后subTask并行度不同
see.setParallelism(1); //1 加载数据流 DataStreamSourceds = see.socketTextStream("linux01", 50820); //2 处理每条数据流 SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase()); ds2.setParallelism(2) ; //3 处理每条数据流 SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY"); ds3.print("结果数据流: ") ; see.execute("路虽远行则将至");

自动taskchain合并, 可以手动指定阶段taskchain
see.setParallelism(1); //1 加载数据流 DataStreamSourceds = see.socketTextStream("linux01", 50820); //2 处理每条数据流 SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase()); ds2.startNewChain() ; //3 处理每条数据流 SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY"); ds3.print("结果数据流: ") ; see.execute("路虽远行则将至");

设置算子槽位共享组
see.setParallelism(1);
/**
* ds ds2 ds3 默认在同一个算子槽位共享组 自动合并taskchain
* 三个source设置同一个算子槽位共享组 自动合并taskchain
* ds2.slotSharingGroup("a") ;
*
* 三个source设置不同算子槽位共享组 自动合并taskchain
*/
//1 加载数据流
DataStreamSource ds = see.socketTextStream("linux01", 50820);
ds.slotSharingGroup("a") ;
//2 处理每条数据流
SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase());
ds2.slotSharingGroup("b") ;
//3 处理每条数据流
SingleOutputStreamOperator ds3 = ds2.map(line -> "YY-" + line + "-YY");
ds3.slotSharingGroup("a") ;
ds3.print("结果数据流: ") ;
see.execute("路虽远行则将至");

分区算子:用于指定上游task的各并行subtask与下游task的subtask之间如何传输数据;
Flink中,对于上下游subTask之间的数据传输控制,由ChannelSelector策略来控制,而且Flink内针对各种场景,开发了众多ChannelSelector的具体实现

设置数据传输策略时,不需要显式指定partitioner,而是调用封装好的算子即可:

默认情况下,flink会优先使用REBALANCE分发策略
/**
* @Date: 22.11.12
* @Author: Hang.Nian.YY
* @qq: 598196583
* @Tips: 学大数据 ,到多易教育
* @Description:
*/
public class _02_Partition {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8888);
StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
//1 加载数据流
DataStreamSource ds = see.socketTextStream("doitedu01", 50820);
//2 处理每条数据流
SingleOutputStreamOperator ds2 = ds.map(line -> line.toUpperCase())
.setParallelism(2)
//指定上游到下游分发数据的规则
.shuffle()
.map(line -> "YY-" + line + "-YY")
.setParallelism(3);
ds2.print("结果数据流: ");
see.execute("路虽远行则将至");
}
}