• 【尚硅谷Java版】Flink1.13 转换算子之物理分区


    【尚硅谷Java版】Flink1.13转换算子之物理分区


    分区:将数据进行重新分布,传递到不同的流分区去进行下一步处理。

    物理分区 分类

    物理分区 可以分为6类,分别是:

    1. 随机分区
    2. 轮询分区
    3. rescale分区
    4. 广播
    5. 全局分区
    6. 自定义重分区

    具体操作

    1、随机分区:对于全部的分区,进行洗牌

     //1、随机分区 对于全部的分区 进行洗牌
        stream.shuffle().print("随机分区").setParallelism(4);
    
    • 1
    • 2

    2、轮询分区:对于全部的分区 进行发牌

     //2、轮询分区  对于全部的分区 进行发牌
            stream.rebalance().print("轮询分区").setParallelism(4);
    
    • 1
    • 2

    3、rescale重缩放分区:对于全部的分区先进行一个划分,然后再 在已经分好的分区中,对数据进行轮询分区,发牌。拿我们这里的例子来说,先将全部分区分成2部分,再针对这两部分进行发牌,奇数在一部分分区中 偶数在一部分分区中。

    /*
            3、rescale重缩放分区  对于全部的分区先进行一个划分,然后再 在已经分好的分区中,对数据进行轮询分区,发牌
               拿我们这里的例子来说,先将全部分区分成2部分,再针对这两部分进行发牌,奇数在一部分分区中 偶数在一部分分区中
            */
            env.addSource(new RichParallelSourceFunction() {
    
    
                @Override
                public void run(SourceContext sourceContext) throws Exception {
    
                    for (int i=0;i<8;i++){
                        //将奇偶数分别发送到0号到1号并行分区
                        if(i%2==getRuntimeContext().getIndexOfThisSubtask()){
                            sourceContext.collect(i);
                        }
                    }
                }
                @Override
                public void cancel() {
                }
            }).setParallelism(2)
                    .rescale()
                    .print()
                    .setParallelism(4);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    4、 广播 把一个数据分发到了下面所有的子任务当中去

    //4、广播 把一个数据分发到了下面所有的子任务当中去
            stream.broadcast().print().setParallelism(2);
    
    • 1
    • 2

    5、全局分区 将全部数据都圈到一个分区里面去

      //5、全局分区   将全部数据都圈到一个分区里面去
        stream.global().print().setParallelism(4);
    
    • 1
    • 2

    6、自定义重分区

    //6、自定义重分区
    env.fromElements(1,2,3,4,5,6,7,8)
            .partitionCustom(new Partitioner() {
                @Override
                public int partition(Integer key, int i) {
                    return key % 2;
                }
    
            }, new KeySelector() {
    
    
                @Override
                public Integer getKey(Integer integer) throws Exception {
                    return integer;
                }
            }).print().setParallelism(4)
    ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    测试

    package com.atguigu.chapter05;
    
    import org.apache.flink.api.common.functions.Partitioner;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    
    /**
     * @author potential
     */
    public class TransformPartitionTest {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            //从元素中读取数据
            DataStreamSource stream = env.fromElements(
                    new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice", "./prod?id=100", 3000L),
                    new Event("Bob", "./prod?id=1", 3300L),
                    new Event("Bob", "./home", 3500L),
                    new Event("Alice", "./prod?id=200", 3000L),
                    new Event("Bob", "./prod?id=2", 3800L),
                    new Event("Bob", "./prod?id=3", 4200L)
    
            );
    
            //1、随机分区 对于全部的分区 进行洗牌
            stream.shuffle().print("随机分区").setParallelism(4);
    
    //
            //2、轮询分区  对于全部的分区 进行发牌
            stream.rebalance().print("轮询分区").setParallelism(4);
    
            /*
            3、rescale重缩放分区  对于全部的分区先进行一个划分,然后再 在已经分好的分区中,对数据进行轮询分区,发牌
               拿我们这里的例子来说,先将全部分区分成2部分,再针对这两部分进行发牌,奇数在一部分分区中 偶数在一部分分区中
            */
            env.addSource(new RichParallelSourceFunction() {
    
    
                @Override
                public void run(SourceContext sourceContext) throws Exception {
    
                    for (int i=0;i<8;i++){
                        //将奇偶数分别发送到0号到1号并行分区
                        if(i%2==getRuntimeContext().getIndexOfThisSubtask()){
                            sourceContext.collect(i);
                        }
                    }
                }
                @Override
                public void cancel() {
                }
            }).setParallelism(2)
                    .rescale()
                    .print()
                    .setParallelism(4);
    
            //4、广播 把一个数据分发到了下面所有的子任务当中去
            stream.broadcast().print().setParallelism(2);
    
            //5、全局分区   将全部数据都圈到一个分区里面去
            stream.global().print().setParallelism(4);
    
    
            //6、自定义重分区
            env.fromElements(1,2,3,4,5,6,7,8)
                    .partitionCustom(new Partitioner() {
                        @Override
                        public int partition(Integer key, int i) {
                            return key % 2;
                        }
    
                    }, new KeySelector() {
    
    
                        @Override
                        public Integer getKey(Integer integer) throws Exception {
                            return integer;
                        }
                    }).print().setParallelism(4)
            ;
           
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    测试结果:
    1、随机分区
    在这里插入图片描述
    2、轮询分区
    在这里插入图片描述
    3、rescale重缩放分区
    在这里插入图片描述
    可以从测试结果中看出,1,3,5,7这几个奇数全部在3,4这俩分区里面,0,2,4,6这几个偶数全部在1,2这俩分区里面。

    4、广播
    在这里插入图片描述
    设置并行度为2,使用广播这种物理分区,则每条数据都会被分配到每一条通道中。

    5、全局分区

    在这里插入图片描述
    从测试的结果来看,全部的数据都在1这个分区当中。

    6、自定义重分区
    在这里插入图片描述
    我们代码中将分区划分为两个,尽管后来并行度为4,但是return key % 2已经分为2个了,所以并行度在这里并没有起作用。而我们看到测试的结果中,0,2,4,6,8这几个偶数全部都在1这个分区当中,1,3,5,7这几个奇数全部都在2这个分区当中。

  • 相关阅读:
    高数常用公式定理总结
    数据要素市场化的理论内涵、现实挑战和实践路径
    Linux ❀ 进程出现process information unavailable时的消除方法
    海运整柜出口操作流程有哪些注意事项?
    python import相关内容简单介绍
    DELL OMSA(SrvAdmin)-监控软件与Promethues结合
    对Mysql中redo log、undo log、binlog深入理解
    056:mapboxGL中layer的layout,paint,filter的属性值表达式说明总结
    优秀测试成长之路
    【Unity】预制体材质变(Clone)克隆体问题
  • 原文地址:https://blog.csdn.net/junR_980218/article/details/126227681