title: Flink系列
Flink 的分区策略:
批处理的分区策略: Partitioner
流处理的分区策略: StreamPartitioner
关于 Flink 的分区策略,请看代码
package com.aa.flinkjava.partitioner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.PartitionOperator;
/**
* @Author AA
* @Project bigdatapre
* @Package com.aa.flinkjava.partitioner
* Flink分区批处理的分区策略有 :
* 1、rebalance
* 2、partitionByHash
* 3、partitionByRange
* 4、partitionByCustom
*/
public class FlinkBatchPartitionerRebalance {
public static void main(String[] args) throws Exception {
//1、获取批处理编程入口
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
//2、准备数据
DataSource<Integer> dataSource = executionEnvironment.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 14, 13, 12, 11);
//3、测试一下rebalance分区策略
//rebalance 其实就是 采用 Round-Robin 轮询的方式
PartitionOperator<Integer> result = dataSource.rebalance().setParallelism(3);
//4、输出数据
/**
* 因为设置了并行度是3,所以结果是3个文件,分别表示的是是三个分区的结果数据:
* 分区1: 1、4、7、10、13
* 分区2: 2、5、8、15、12
* 分区3: 3、6、9、14、11
*
* 给后面的几个数据故意的调换了位置,就是为了测试这个效果。
*/
//如果结果是一个文件,那么FlinkBatchPartitionerRebalance就是一个文件。
//如果结果是多个文件,那么FlinkBatchPartitionerRebalance是文件夹。
result.writeAsText("D:\\flinkres\\FlinkBatchPartitionerRebalance");
//5、执行数据
executionEnvironment.execute("FlinkBatchPartitionerRebalance");
}
}
package com.aa.flinkjava.partitioner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.PartitionOperator;
/**
* @Author AA
* @Project bigdatapre
* @Package com.aa.flinkjava.partitioner
* Flink分区批处理的分区策略有 :
* 1、rebalance
* 2、partitionByHash
* 3、partitionByRange
* 4、partitionByCustom
*/
public class FlinkBatchPartitionerPartitionByRange {
public static void main(String[] args) throws Exception {
//1、获取批处理编程入口
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
//2、准备数据
DataSource<Integer> dataSource = executionEnvironment.fromElements(
1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6);
//3、测试一下分区策略
//partitionByRange 是一个范围上的区分。
PartitionOperator<Integer> result = dataSource.partitionByRange(integer -> integer).setParallelism(3);
//4、输出数据
/**
* 因为设置了并行度是3,所以结果是3个文件,分别表示的是是三个分区的结果数据:
* 分区1: 1-3的范围
* 分区2: 4-6的范围
* 分区3: 7-9的范围
*
*/
result.writeAsText("D:\\flinkres\\FlinkBatchPartitionerPartitionByRange");
//5、执行数据
executionEnvironment.execute("FlinkBatchPartitionerPartitionByRange");
}
}
package com.aa.flinkjava.partitioner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.PartitionOperator;
/**
* @Author AA
* @Project bigdatapre
* @Package com.aa.flinkjava.partitioner
*/
public class FlinkBatchPartitionerPartitionByHash {
public static void main(String[] args) throws Exception {
//1、获取批处理编程入口
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
//2、准备数据
DataSource<Integer> dataSource = executionEnvironment.fromElements(
1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6);
//3、测试一下分区策略
//partitionByHash 哈希分区,先求哈希,然后模除以分区数(并行度)
PartitionOperator<Integer> result = dataSource.partitionByHash(integer -> integer).setParallelism(3);
//4、输出数据
/**
* 因为设置了并行度是3,所以结果是3个文件,分别表示的是是三个分区的结果数据:
* 分区1: 1,5,6,1,5,6
* 分区2: 4,4
* 分区3: 2,3,7,8,9,2,3
*
* 哈希分区: 相同的数据一定在一个分区中。
*/
result.writeAsText("D:\\flinkres\\FlinkBatchPartitionerPartitionByHash");
//5、执行数据
executionEnvironment.execute("FlinkBatchPartitionerPartitionByHash");
}
}
package com.aa.flinkjava.partitioner;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.PartitionOperator;
/**
* @Author AA
* @Project bigdatapre
* @Package com.aa.flinkjava.partitioner
*
* 自定义分区案例
*/
public class FlinkBatchPartitionerCustom {
public static void main(String[] args) throws Exception {
//1、获取批处理编程入口
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
//2、准备数据
DataSource<Integer> dataSource = executionEnvironment.fromElements(
1, 2, 3, 4, 5, 60, 70, 80, 90, 10, 200, 300, 400, 500, 600);
//3、测试一下分区策略
//partitionCustom 自定义分区。
PartitionOperator<Integer> result = dataSource.partitionCustom(new MyPartitioner(),word->word).setParallelism(3);
//4、输出数据
result.writeAsText("D:\\flinkres\\FlinkBatchPartitionerCustom");
//5、执行数据
executionEnvironment.execute("FlinkBatchPartitionerCustom");
}
static class MyPartitioner implements Partitioner<Integer>{
/**
* 定义具体的分区规则
* 比如:
* 将数据的数据的按照如下的规则进行划分:
* 范围 <= 9 在一个分区
* 范围 10-99 在一个分区
* 范围 > 99 在一个分区
* @param integer 输入的数据
* @param numPartitions 分区的个数 现在在如下的案例中分区的数是3个,也就是输入的并行度。
* @return
*/
@Override
public int partition(Integer integer, int numPartitions) {
if (integer <= 9){
return 0;
}else if (integer >= 10 && integer <= 99){
return 1;
}else {
return 2;
}
}
}
}
package com.aa.flinkjava.partitioner;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Author AA
* @Project bigdatapre
* @Package com.aa.flinkjava.partitioner
* Flink流中 内置的分区策略
*/
public class FlinkStreamPartitionerBuiltin {
public static void main(String[] args) {
//1、获取环境变量
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//2、读取数据源
DataStreamSource<String> dataStreamSource = executionEnvironment.socketTextStream("hadoop12", 9999);
//3、下面都是分区的测试了。
//分区一、伸缩 。 1-》N 或者 N-》1
DataStream<String> rescaleDS = dataStreamSource.rescale();
//分区二、rebalance 其实就是轮询
DataStream<String> rebalanceDS = dataStreamSource.rebalance();
//分区三、keyBy。 根据指定的字段来进行分区,相同的key必然在一个分区中。按照指定的key来做hash
KeyedStream<String, Tuple> keyByDS = dataStreamSource.keyBy(1);
//分区四、global 。 所有的输出数据都发送到下游的第一个task
DataStream<String> globalDS = dataStreamSource.global();
//分区五、forward。 上下游的本地task映射
DataStream<String> forwardDS = dataStreamSource.forward();
//分区六、shuffle。 随机分区。 随机均匀分布元素
DataStream<String> shuffleDS = dataStreamSource.shuffle();
//分区七、broadcast。广播。 向每个分区广播元素,帮所有的元素广播到所有的分区。
DataStream<String> broadcastDS = dataStreamSource.broadcast();
}
}
package com.aa.flinkjava.partitioner;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Author AA
* @Project bigdatapre
* @Package com.aa.flinkjava.partitioner
*/
public class FlinkStreamPartitionerCustom {
public static void main(String[] args) throws Exception {
//1、获取环境变量
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//2、读取数据源
DataStreamSource<String> dataStreamSource = executionEnvironment.socketTextStream("hadoop12", 9999);
//3、下面都是分区的测试了。
DataStream<String> partitionCustomDS = dataStreamSource.partitionCustom(new MyStreamPartitioner(),word->word);
//4、打印输出
partitionCustomDS.print().setParallelism(1);
//5、执行。
executionEnvironment.execute();
}
static class MyStreamPartitioner implements Partitioner<String>{
/**
* @param key
* @param numPartitions
* @return
*/
@Override
public int partition(String key, int numPartitions) {
return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
}
声明:
文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。
B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接