• Flink分区之Flink分区策略整理



    title: Flink系列


    一、Flink Partitioner 分区策略整理

    Flink 的分区策略:

    	批处理的分区策略: Partitioner
    	流处理的分区策略: StreamPartitioner
    
    • 1
    • 2

    关于 Flink 的分区策略,请看代码

    1.1 批处理之Rebalance分区

    package com.aa.flinkjava.partitioner;
    
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.PartitionOperator;
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.partitioner
     * Flink分区批处理的分区策略有 :
     * 1、rebalance
     * 2、partitionByHash
     * 3、partitionByRange
     * 4、partitionByCustom
     */
    public class FlinkBatchPartitionerRebalance {
        public static void main(String[] args) throws Exception {
            //1、获取批处理编程入口
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
    
            //2、准备数据
            DataSource<Integer> dataSource = executionEnvironment.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 14, 13, 12, 11);
    
            //3、测试一下rebalance分区策略
            //rebalance 其实就是 采用 Round-Robin 轮询的方式
            PartitionOperator<Integer> result = dataSource.rebalance().setParallelism(3);
    
            //4、输出数据
            /**
             * 因为设置了并行度是3,所以结果是3个文件,分别表示的是是三个分区的结果数据:
             * 分区1: 1、4、7、10、13
             * 分区2: 2、5、8、15、12
             * 分区3: 3、6、9、14、11
             *
             * 给后面的几个数据故意的调换了位置,就是为了测试这个效果。
             */
            //如果结果是一个文件,那么FlinkBatchPartitionerRebalance就是一个文件。
            //如果结果是多个文件,那么FlinkBatchPartitionerRebalance是文件夹。
            result.writeAsText("D:\\flinkres\\FlinkBatchPartitionerRebalance");
    
            //5、执行数据
            executionEnvironment.execute("FlinkBatchPartitionerRebalance");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    1.2 批处理之Range分区

    package com.aa.flinkjava.partitioner;
    
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.PartitionOperator;
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.partitioner
     * Flink分区批处理的分区策略有 :
     * 1、rebalance
     * 2、partitionByHash
     * 3、partitionByRange
     * 4、partitionByCustom
     */
    public class FlinkBatchPartitionerPartitionByRange {
        public static void main(String[] args) throws Exception {
            //1、获取批处理编程入口
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
    
            //2、准备数据
            DataSource<Integer> dataSource = executionEnvironment.fromElements(
                    1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6);
    
            //3、测试一下分区策略
            //partitionByRange   是一个范围上的区分。
            PartitionOperator<Integer> result = dataSource.partitionByRange(integer -> integer).setParallelism(3);
    
            //4、输出数据
            /**
             * 因为设置了并行度是3,所以结果是3个文件,分别表示的是是三个分区的结果数据:
             * 分区1: 1-3的范围
             * 分区2: 4-6的范围
             * 分区3: 7-9的范围
             *
             */
            result.writeAsText("D:\\flinkres\\FlinkBatchPartitionerPartitionByRange");
    
            //5、执行数据
            executionEnvironment.execute("FlinkBatchPartitionerPartitionByRange");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    1.3 批处理之Hash分区

    package com.aa.flinkjava.partitioner;
    
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.PartitionOperator;
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.partitioner
     */
    public class FlinkBatchPartitionerPartitionByHash {
        public static void main(String[] args) throws Exception {
            //1、获取批处理编程入口
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
    
            //2、准备数据
            DataSource<Integer> dataSource = executionEnvironment.fromElements(
                    1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6);
    
            //3、测试一下分区策略
            //partitionByHash  哈希分区,先求哈希,然后模除以分区数(并行度)
            PartitionOperator<Integer> result = dataSource.partitionByHash(integer -> integer).setParallelism(3);
    
            //4、输出数据
            /**
             * 因为设置了并行度是3,所以结果是3个文件,分别表示的是是三个分区的结果数据:
             * 分区1: 1,5,6,1,5,6
             * 分区2: 4,4
             * 分区3: 2,3,7,8,9,2,3
             *
             * 哈希分区: 相同的数据一定在一个分区中。
             */
            result.writeAsText("D:\\flinkres\\FlinkBatchPartitionerPartitionByHash");
    
            //5、执行数据
            executionEnvironment.execute("FlinkBatchPartitionerPartitionByHash");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    1.4 批处理之自定义分区

    package com.aa.flinkjava.partitioner;
    
    import org.apache.flink.api.common.functions.Partitioner;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.PartitionOperator;
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.partitioner
     *
     * 自定义分区案例
     */
    public class FlinkBatchPartitionerCustom {
        public static void main(String[] args) throws Exception {
            //1、获取批处理编程入口
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
    
            //2、准备数据
            DataSource<Integer> dataSource = executionEnvironment.fromElements(
                    1, 2, 3, 4, 5, 60, 70, 80, 90, 10, 200, 300, 400, 500, 600);
    
            //3、测试一下分区策略
            //partitionCustom  自定义分区。
            PartitionOperator<Integer> result = dataSource.partitionCustom(new MyPartitioner(),word->word).setParallelism(3);
    
            //4、输出数据
            result.writeAsText("D:\\flinkres\\FlinkBatchPartitionerCustom");
    
            //5、执行数据
            executionEnvironment.execute("FlinkBatchPartitionerCustom");
        }
    
        static class MyPartitioner implements Partitioner<Integer>{
    
            /**
             * 定义具体的分区规则
             * 比如:
             * 将数据的数据的按照如下的规则进行划分:
             * 范围  <= 9   在一个分区
             * 范围  10-99  在一个分区
             * 范围  > 99   在一个分区
             * @param integer  输入的数据
             * @param numPartitions  分区的个数  现在在如下的案例中分区的数是3个,也就是输入的并行度。
             * @return
             */
            @Override
            public int partition(Integer integer, int numPartitions) {
                if (integer <= 9){
                    return 0;
                }else if (integer >= 10 && integer <= 99){
                    return 1;
                }else {
                    return 2;
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    1.5 流处理之内置N 大分区

    package com.aa.flinkjava.partitioner;
    
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.partitioner
     * Flink流中 内置的分区策略
     */
    public class FlinkStreamPartitionerBuiltin {
        public static void main(String[] args) {
            //1、获取环境变量
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //2、读取数据源
            DataStreamSource<String> dataStreamSource = executionEnvironment.socketTextStream("hadoop12", 9999);
    
            //3、下面都是分区的测试了。
            //分区一、伸缩 。 1-》N 或者 N-》1
            DataStream<String> rescaleDS = dataStreamSource.rescale();
    
            //分区二、rebalance 其实就是轮询
            DataStream<String> rebalanceDS = dataStreamSource.rebalance();
    
            //分区三、keyBy。 根据指定的字段来进行分区,相同的key必然在一个分区中。按照指定的key来做hash
            KeyedStream<String, Tuple> keyByDS = dataStreamSource.keyBy(1);
    
            //分区四、global 。 所有的输出数据都发送到下游的第一个task
            DataStream<String> globalDS = dataStreamSource.global();
    
            //分区五、forward。 上下游的本地task映射
            DataStream<String> forwardDS = dataStreamSource.forward();
    
            //分区六、shuffle。 随机分区。 随机均匀分布元素
            DataStream<String> shuffleDS = dataStreamSource.shuffle();
    
            //分区七、broadcast。广播。 向每个分区广播元素,帮所有的元素广播到所有的分区。
            DataStream<String> broadcastDS = dataStreamSource.broadcast();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    1.6 流处理之自定义分区

    package com.aa.flinkjava.partitioner;
    
    import org.apache.flink.api.common.functions.Partitioner;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * @Author AA
     * @Project bigdatapre
     * @Package com.aa.flinkjava.partitioner
     */
    public class FlinkStreamPartitionerCustom {
        public static void main(String[] args) throws Exception {
            //1、获取环境变量
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //2、读取数据源
            DataStreamSource<String> dataStreamSource = executionEnvironment.socketTextStream("hadoop12", 9999);
    
            //3、下面都是分区的测试了。
            DataStream<String> partitionCustomDS = dataStreamSource.partitionCustom(new MyStreamPartitioner(),word->word);
    
            //4、打印输出
            partitionCustomDS.print().setParallelism(1);
    
            //5、执行。
            executionEnvironment.execute();
        }
    
        static class MyStreamPartitioner implements Partitioner<String>{
    
            /**
             * @param key
             * @param numPartitions
             * @return
             */
            @Override
            public int partition(String key, int numPartitions) {
                return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46


    声明:
            文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


    By luoyepiaoxue2014

    B站: https://space.bilibili.com/1523287361 点击打开链接
    微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

  • 相关阅读:
    【LeetCode】每日一题&&两数之和&&寻找正序数组的中位数&&找出字符串中第一个匹配项的下标&&在排序数组中查找元素的第一个和最后一个位置
    深度学习模型相关部署的学习:(yolov5)WIN10+CUDA11.3+TensorRT
    【中秋特辑】C++比C语言更加规范、方便?是因为增加了如下特性 | C++98 & C++11 | C++难学?带领大家一步一步深度剖析 | 简单易懂
    基于ubuntu 22, jdk 8x64搭建图数据库环境 hugegraph--google镜像chatgpt
    登录网页优化与最佳做法
    IntelliJ IDEA 控制台中文乱码的四种解决方法
    8.strtok函数
    utm 转 经纬度坐标 cesium Ue4 CityEngine
    从程序员到架构师:大数据量、缓存、高并发、微服务、多团队协同等核心场景实战书籍
    3.8-镜像的发布
  • 原文地址:https://blog.csdn.net/luoyepiaoxue2014/article/details/128079358