物理分区 可以分为6类,分别是:
1、随机分区:对于全部的分区,进行洗牌
//1、随机分区 对于全部的分区 进行洗牌
stream.shuffle().print("随机分区").setParallelism(4);
2、轮询分区:对于全部的分区 进行发牌
//2、轮询分区 对于全部的分区 进行发牌
stream.rebalance().print("轮询分区").setParallelism(4);
3、rescale重缩放分区:对于全部的分区先进行一个划分,然后再 在已经分好的分区中,对数据进行轮询分区,发牌。拿我们这里的例子来说,先将全部分区分成2部分,再针对这两部分进行发牌,奇数在一部分分区中 偶数在一部分分区中。
/*
3、rescale重缩放分区 对于全部的分区先进行一个划分,然后再 在已经分好的分区中,对数据进行轮询分区,发牌
拿我们这里的例子来说,先将全部分区分成2部分,再针对这两部分进行发牌,奇数在一部分分区中 偶数在一部分分区中
*/
env.addSource(new RichParallelSourceFunction() {
@Override
public void run(SourceContext sourceContext) throws Exception {
for (int i=0;i<8;i++){
//将奇偶数分别发送到0号到1号并行分区
if(i%2==getRuntimeContext().getIndexOfThisSubtask()){
sourceContext.collect(i);
}
}
}
@Override
public void cancel() {
}
}).setParallelism(2)
.rescale()
.print()
.setParallelism(4);
4、 广播 把一个数据分发到了下面所有的子任务当中去
//4、广播 把一个数据分发到了下面所有的子任务当中去
stream.broadcast().print().setParallelism(2);
5、全局分区 将全部数据都圈到一个分区里面去
//5、全局分区 将全部数据都圈到一个分区里面去
stream.global().print().setParallelism(4);
6、自定义重分区
//6、自定义重分区
env.fromElements(1,2,3,4,5,6,7,8)
.partitionCustom(new Partitioner() {
@Override
public int partition(Integer key, int i) {
return key % 2;
}
}, new KeySelector() {
@Override
public Integer getKey(Integer integer) throws Exception {
return integer;
}
}).print().setParallelism(4)
;
测试
package com.atguigu.chapter05;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
/**
* @author potential
*/
public class TransformPartitionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从元素中读取数据
DataStreamSource stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Bob", "./home", 3500L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L)
);
//1、随机分区 对于全部的分区 进行洗牌
stream.shuffle().print("随机分区").setParallelism(4);
//
//2、轮询分区 对于全部的分区 进行发牌
stream.rebalance().print("轮询分区").setParallelism(4);
/*
3、rescale重缩放分区 对于全部的分区先进行一个划分,然后再 在已经分好的分区中,对数据进行轮询分区,发牌
拿我们这里的例子来说,先将全部分区分成2部分,再针对这两部分进行发牌,奇数在一部分分区中 偶数在一部分分区中
*/
env.addSource(new RichParallelSourceFunction() {
@Override
public void run(SourceContext sourceContext) throws Exception {
for (int i=0;i<8;i++){
//将奇偶数分别发送到0号到1号并行分区
if(i%2==getRuntimeContext().getIndexOfThisSubtask()){
sourceContext.collect(i);
}
}
}
@Override
public void cancel() {
}
}).setParallelism(2)
.rescale()
.print()
.setParallelism(4);
//4、广播 把一个数据分发到了下面所有的子任务当中去
stream.broadcast().print().setParallelism(2);
//5、全局分区 将全部数据都圈到一个分区里面去
stream.global().print().setParallelism(4);
//6、自定义重分区
env.fromElements(1,2,3,4,5,6,7,8)
.partitionCustom(new Partitioner() {
@Override
public int partition(Integer key, int i) {
return key % 2;
}
}, new KeySelector() {
@Override
public Integer getKey(Integer integer) throws Exception {
return integer;
}
}).print().setParallelism(4)
;
env.execute();
}
}
测试结果:
1、随机分区

2、轮询分区

3、rescale重缩放分区

可以从测试结果中看出,1,3,5,7这几个奇数全部在3,4这俩分区里面,0,2,4,6这几个偶数全部在1,2这俩分区里面。
4、广播

设置并行度为2,使用广播这种物理分区,则每条数据都会被分配到每一条通道中。
5、全局分区

从测试的结果来看,全部的数据都在1这个分区当中。
6、自定义重分区

我们代码中将分区划分为两个,尽管后来并行度为4,但是return key % 2已经分为2个了,所以并行度在这里并没有起作用。而我们看到测试的结果中,0,2,4,6,8这几个偶数全部都在1这个分区当中,1,3,5,7这几个奇数全部都在2这个分区当中。