• Flink(六)【DataFrame 转换算子(下)】


    前言

            今天学习剩下的转换算子。

    1、物理分区算子

    常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast),下边我们分别来做了解。

    此外还有我们之前用过的 keyBy 聚合算子,它也是一个分区算子。

    1.1、随机分区(shuffle)

    1. package com.lyh.partition;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. public class PartitionDemo {
    5. public static void main(String[] args) throws Exception {
    6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    7. env.setParallelism(2);
    8. DataStreamSource socketDS = env.socketTextStream("localhost", 9999);
    9. // 随机分区: random.nextInt(numberOfChannels:下游算子并行度)
    10. socketDS.shuffle().print();
    11. env.execute();
    12. }
    13. }

    这里的下游算子并行度在这个案例中指的是我们的 Sink(print)算子,因为我们的并行度是 2 ,所以 random.nextInnt(2) 的结果只会是 0 或 1,也就是说我们的数据会被随机分到这两个编号的任务槽中。 

    运行结果:

    1. 1>4
    2. 2>5
    3. 1>4
    4. 1>2
    5. 1>1
    6. 2>3
    7. 1>5

    可以看到,随机分区的结果,数据是被随机分到各个区的并没有什么规律。 

    1.2、轮询分区(reblancce)

    轮询分区就是根据并行度把数据对每个下游的算子进行轮流分配。这种处理方式非常适合于当 数据源倾斜 的情况下,我们读取的时候利用轮询分区的方式均匀的把数据分给下游的算子。

    分区逻辑: 

    1. // 轮询分区:
    2. socketDS.rebalance().print();

    运行结果:

    1. 2> 1
    2. 1> 2
    3. 2> 3
    4. 1> 1
    5. 2> 5
    6. 1> 2
    7. 2> 2
    8. 1> 1

     1.3、重缩放分区(rescale)

    重缩放分区和轮询分区特别相似,对于下游的 n 个子任务,我们假设有 2 个 source 算子(不一定就是 source 而是带有分区方法的算子),那么使用轮询分区每个 source 算子次都要 n 个子任务都轮询发送数据。而重缩放分区的逻辑就是,每个 source 算子只负责 n/2 个任务,

    所以当下游任务(数据接收方)的数量是上游任务(数据发送方)数量的整数倍时,rescale的效率明显会更高。比如当上游任务数量是 2,下游任务数量是 6 时,上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。由于 rebalance 是所有分区数据的“重新平衡”,当 TaskManager 数据量较多时,这种跨节点的网络传输必然影响效率;而如果我们配置的 task slot 数量合适,用 rescale 的方式进行“局部重缩放”,就可以让数据只在当前 TaskManager 的多个 slot 之间重新分配,从而避免了网络传输带来的损耗。
    从底层实现上看,rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源。

    1. // 缩放分区:
    2. socketDS.rescale().print();

    这里由于 Socket 这种数据源只支持一个 Source 算子读取,所以不做演示。

    1.4、广播(broadcast)

    广播类似于一种轮询,只不过它每次轮询都会把每个数据发送给所有下游任务。

    1. // 广播分区(使用两个并行度来模拟)
    2. socketDS.broadcast().print();

    运行结果: 

    1. 1> 1
    2. 2> 1
    3. 2> 2
    4. 1> 2
    5. 1> 3
    6. 2> 3
    7. 1> 4
    8. 2> 4
    9. 2> 5
    10. 1> 5

     1.5、全局分区(global)

    全局分区会把所有数据都发往下游的第一个任务当中。

    1. // 全局分区:
    2. socketDS.global().print();

    并行度为 2 的情况下,运行结果: 

    1. 1> 1
    2. 1> 2
    3. 1> 3
    4. 1> 4
    5. 1> 5

    1.6、自定义分区(custom)

    我们可以通过使用 partitionCustom(partitioner,keySelector) 方法来自定义分区策略。在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的键字段选择器,我们一般都是自己实现一个 KeySelector。

    1、自定义分区器

    1. // 自定义分区器
    2. public class MyPartitioner implements Partitioner {
    3. // 返回分区号,我们传进来的是一个数字类型的字符串
    4. @Override
    5. public int partition(String key, int numPartitions) {
    6. // 这里我们自己实现一个取模 我们的并行度为2 奇数%2=1 偶数%2=0
    7. return Integer.parseInt(key) % numPartitions;
    8. }
    9. }
    1. public class CustomPartitionDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(2);
    5. DataStreamSource socketDS = env.socketTextStream("localhost", 9999);
    6. socketDS.partitionCustom(new MyPartitioner(),key->key).print();
    7. env.execute();
    8. }
    9. }

    运行结果

    1. 2> 1
    2. 1> 2
    3. 2> 3
    4. 1> 4
    5. 2> 5
    6. 1> 6
    7. 1> 8

    可以看到,奇数都被分到 2 号线程,偶数被分到了 1 号。

    2、分流

    分流就是把我们传进来的数据流根据一定的规则进行筛选后,将符合条件的数据放到对应的流里。

    2.1、Filter 

    读取一个整数数据流,将数据划分为奇数数据流和偶数数据流。其实我们上面在自定义分区器已经实现了,但那是并行度为 2 的情况刚好达到的这么一种效果。

    1. package com.lyh.split;
    2. import org.apache.flink.api.common.functions.FilterFunction;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    6. public class SplitByFilterDemo {
    7. public static void main(String[] args) throws Exception {
    8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(2);
    10. DataStreamSource socketDS = env.socketTextStream("localhost", 9999);
    11. socketDS.filter(new FilterFunction() {
    12. @Override
    13. public boolean filter(String value) throws Exception {
    14. if (Integer.parseInt(value)%2==0){
    15. return true; // 为true则留下来
    16. }
    17. return false;
    18. }
    19. });
    20. // lambda 表达式
    21. // 偶数流
    22. socketDS.filter(value -> Integer.parseInt(value) % 2 == 0).print("偶数流");
    23. // 奇数流
    24. socketDS.filter(value -> Integer.parseInt(value)%2==1).print("奇数流");
    25. env.execute();
    26. }
    27. }

    运行结果:

    1. 偶数流:1> 2
    2. 奇数流:2> 1
    3. 奇数流:1> 3
    4. 偶数流:2> 4
    5. 偶数流:1> 6
    6. 偶数流:2> 8
    7. 奇数流:1> 7
    8. 奇数流:2> 9

    缺点:明显每次 Source 算子传过来的数据需要把所有数据发送给每个转换算子(Filter),明显性能要差一些。

    2.2、侧输出流

    侧输出流后面我们再做详细介绍,这里只做简单使用。简单来说,只需要调用上下文 context 的 .output() 方法,就可以输出任意类型的数据了,而侧输出流的标记和提取,都离不开一个“输出标签” (OutputTag),指定了侧输出流的 id 和 类型。

    案例-我们根据上一节的 POJO 类 WaterSensor 的 id 进行分流(将s1和s2分别分到不同的数据流中去,把非s1、s2的数据保留在主流当中)

    1. package com.lyh.split;
    2. import com.lyh.bean.WaterSensor;
    3. import org.apache.flink.api.common.typeinfo.Types;
    4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    7. import org.apache.flink.streaming.api.functions.ProcessFunction;
    8. import org.apache.flink.util.Collector;
    9. import org.apache.flink.util.OutputTag;
    10. /**
    11. * @author 刘xx
    12. * @version 1.0
    13. * @date 2023-11-16 19:25
    14. * 使用侧输出流实现数据分流
    15. */
    16. public class SideOutputDemo {
    17. public static void main(String[] args) throws Exception {
    18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    19. env.setParallelism(1);
    20. DataStreamSource sensorDS = env.fromElements(
    21. new WaterSensor("s1", 1L, 1),
    22. new WaterSensor("s2", 2L, 2),
    23. new WaterSensor("s3", 3L, 3),
    24. new WaterSensor("s2", 2L, 2)
    25. );
    26. //这里的泛型是我们测流中的数据类型, 注意:如果不是基本数据类型需要单独设置数据类型
    27. OutputTag s1 = new OutputTag("s1", Types.POJO(WaterSensor.class));
    28. OutputTag s2 = new OutputTag("s2", Types.POJO(WaterSensor.class));
    29. /**
    30. * Flink一共有4层API:底层API、DataStream、Table API、Flink SQL
    31. * process(processFunction: 处理逻辑,outputType: 主流的输出类型) 是Flink的底层API
    32. */
    33. SingleOutputStreamOperator process = sensorDS.process(new ProcessFunction() {
    34. @Override
    35. public void processElement(WaterSensor sensor, Context context, Collector out) throws Exception {
    36. if (sensor.getId().equals("s1")) { // 放到侧流s1中
    37. context.output(s1, sensor);
    38. } else if (sensor.getId().equals("s2")) { // 放到测流s2中
    39. context.output(s2, sensor);
    40. } else { // 放到主流
    41. out.collect(sensor);
    42. }
    43. }
    44. });
    45. // 这里打印的是主流的数据,测流需要调用getSideOutput()方法
    46. process.print("主流");
    47. // 打印测流 s1
    48. process.getSideOutput(s1).print("测流s1");
    49. // 打印测流 s2
    50. process.getSideOutput(s2).print("测流s2");
    51. env.execute();
    52. }
    53. }

    运行结果:

    1. 测流s1> WaterSensor{id='s1', ts=1, vc=1}
    2. 测流s2> WaterSensor{id='s2', ts=2, vc=2}
    3. 主流> WaterSensor{id='s3', ts=3, vc=3}
    4. 测流s2> WaterSensor{id='s2', ts=2, vc=2}

     这种方式相较于 Filter 明显要效率更高,因为它对每个数据只处理一次。

    3、合流

    在实际应用中,我们经常会遇到来源不同的多种数据流,需要将它们进行联合处理。这就需要先进行合流,Flink 为我们提供了相应的 API。

    3.1、联合(Union)

    联合是最简单的合流操作,就是直接将多条数据流合在一起。但是它要求每个流中的数据类型必须是相同的,合并之后的新流会包括所有流中的元素,数据类型不变。

    1. public class UnionDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. DataStreamSource source1 = env.fromElements(1, 2, 3, 4, 5);
    6. DataStreamSource source2 = env.fromElements(11, 22, 33, 44, 55);
    7. DataStreamSource source3 = env.fromElements("1", "2", "3", "4", "5");
    8. DataStream union = source1.union(source2,source3.map(Integer::valueOf));// 使用parseInt也可以,因为它默认是10进制
    9. union.print();
    10. env.execute();
    11. }
    12. }

    总结:

    1. 使用 union 时,每条流的数据类型必须一致
    2. 可以合并多条流

    3.2、连接(Connect)

    流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink 还提供了另外一种方便的合流操作——连接(connect)。顾名思义,这种操作就是直接把两条流像接线一样对接起来。

    为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个 DataStream 中的数据只能有唯一的类型,所以连接得到的并不是 DataStream,而是一个“连接流”(ConnectedStreams)。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的 DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个 DataStream 中。

    (1)CoMapFunction 

    1. package com.lyh.combine;
    2. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
    3. import org.apache.flink.streaming.api.datastream.DataStream;
    4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    7. import org.apache.flink.streaming.api.functions.co.CoMapFunction;
    8. /**
    9. * @author 刘xx
    10. * @version 1.0
    11. * @date 2023-11-16 20:04
    12. */
    13. public class ConnectDemo {
    14. public static void main(String[] args) throws Exception {
    15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    16. env.setParallelism(1);
    17. DataStreamSource source1 = env.fromElements(1, 2, 3, 4, 5);
    18. DataStreamSource source2 = env.fromElements("a", "b", "c", "d", "e");
    19. // connect 合并后,两个数据流仍然是独立的
    20. ConnectedStreams connectedStreams = source1.connect(source2);
    21. // map 将两个不同类型的数据转为统一的数据类型
    22. SingleOutputStreamOperator res = connectedStreams.map(new CoMapFunction() {
    23. @Override
    24. public String map1(Integer value) throws Exception {
    25. return String.valueOf(value);
    26. }
    27. @Override
    28. public String map2(String value) throws Exception {
    29. return value;
    30. }
    31. });
    32. res.print();
    33. env.execute();
    34. }
    35. }

    运行结果:

    1. 1
    2. a
    3. 2
    4. b
    5. 3
    6. c
    7. 4
    8. d
    9. 5
    10. e

    总结:

    1. 一次只能连接 2 条流
    2. 流的数据类型可以不一样
    3. 连接后可以调用 map(实现 CoMapFunction 接口)、flatMap(实现 CoFlatMapFunction接口)、process(实现 CoProcessFunction 接口) 来处理,但是各处理各的

    (2)CoFlatMapFunction 

            flatMap 和 map 一样,同样对两种数据流实现两种不同的处理方法(flatMap1 和 flatMap2)。

    (3)CoProcessFunction

            调用 .process()时,传入的则是一个 CoProcessFunction 实现类。抽象类CoProcessFunction 在源码中定义如下:

    1. // IN1: 第一条流的类型 IN2: 第二条流的类型 OUT: 输出类型
    2. public abstract class CoProcessFunction extends AbstractRichFunction {
    3. ...
    4. public abstract void processElement1(IN1 value, Context ctx, Collector out) throws Exception;
    5. public abstract void processElement2(IN2 value, Context ctx, Collector out) throws Exception;
    6. public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {}
    7. public abstract class Context {...}
    8. ...
    9. }

    它需要实现的也是两个方法(processElement1、processElement2),当数据到来的时候,它会根据其来源调用其中的一个方法进行处理。CoProcessFunction 同样可以通过上下文 ctx 来访问 timestamp、水位线,并通过 TimerService 注册定时器;另外也提供了.onTimer()方法,用于定义定时触发的处理操作。

    案例-我们创建两个数据流(一个二元组,一个三元组),要求根据两个不同类型元组的第一个字段匹配,以字符串的形式输出该元组。

    1. package com.lyh.combine;
    2. import org.apache.flink.api.java.tuple.Tuple2;
    3. import org.apache.flink.api.java.tuple.Tuple3;
    4. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
    5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
    9. import org.apache.flink.util.Collector;
    10. import java.util.ArrayList;
    11. import java.util.HashMap;
    12. import java.util.List;
    13. import java.util.Map;
    14. /**
    15. * @author 刘xx
    16. * @version 1.0
    17. * @date 2023-11-17 10:02
    18. */
    19. public class ConnectKeyByDemo {
    20. public static void main(String[] args) throws Exception {
    21. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    22. env.setParallelism(1);
    23. DataStreamSource> source1 = env.fromElements(
    24. Tuple2.of(1, "a1"),
    25. Tuple2.of(1, "a2"),
    26. Tuple2.of(3, "b"),
    27. Tuple2.of(4, "c")
    28. );
    29. DataStreamSource> source2 = env.fromElements(
    30. Tuple3.of(1, "a1",1),
    31. Tuple3.of(1, "a2",2),
    32. Tuple3.of(3, "b",1),
    33. Tuple3.of(4, "c",1)
    34. );
    35. // 连接两条流 输出能根据 id 匹配上的数据(类似 inner join)
    36. ConnectedStreams, Tuple3> connect = source1.connect(source2);
    37. /**
    38. * 每条流实现相互匹配:
    39. * 1、每条流的数据来了之后,因为是各处理各的,所以要关联在一起必须存到一个变量中去
    40. * HashMap>
    41. * 2、除了存变量外,还需要去另一条流存的变量中去查找是否有匹配的
    42. */
    43. SingleOutputStreamOperator process = connect.process(new CoProcessFunction, Tuple3, String>() {
    44. Map>> s1Cache = new HashMap<>();
    45. Map>> s2Cache = new HashMap<>();
    46. @Override
    47. public void processElement1(Tuple2 value, Context ctx, Collector out) throws Exception {
    48. Integer id = value.f0;
    49. // source1 的数据来了就存到变量中去
    50. if (!s1Cache.containsKey(id)) {
    51. List> list = new ArrayList<>();
    52. list.add(value);
    53. s1Cache.put(id, list);
    54. } else {
    55. s1Cache.get(id).add(value);
    56. }
    57. // 去 s2Cache 中去查找是否有匹配的
    58. if (s2Cache.containsKey(id)) {
    59. for (Tuple3 s2Element : s2Cache.get(id)) {
    60. out.collect("s1:" + value + "<-------->" + "s2:" + s2Element);
    61. }
    62. }
    63. }
    64. @Override
    65. public void processElement2(Tuple3 value, Context ctx, Collector out) throws Exception {
    66. Integer id = value.f0;
    67. // source2 的数据来了就存到变量中去
    68. if (!s2Cache.containsKey(id)) {
    69. List> list = new ArrayList<>();
    70. list.add(value);
    71. s2Cache.put(id, list);
    72. } else {
    73. s2Cache.get(id).add(value);
    74. }
    75. // 去 s1Cache 中去查找是否有匹配的
    76. if (s1Cache.containsKey(id)) {
    77. for (Tuple2 s1Element : s1Cache.get(id)) {
    78. out.collect("s2:" + value + "<-------->" + "s1:" + s1Element);
    79. }
    80. }
    81. }
    82. });
    83. process.print();
    84. env.execute();
    85. }
    86. }

    运行结果:

    1. s2:(1,a1,1)<-------->s1:(1,a1)
    2. s1:(1,a2)<-------->s2:(1,a1,1)
    3. s2:(1,a2,2)<-------->s1:(1,a1)
    4. s2:(1,a2,2)<-------->s1:(1,a2)
    5. s2:(3,b,1)<-------->s1:(3,b)
    6. s2:(4,c,1)<-------->s1:(4,c)

    我们设置并行度为 2 再运行:

    env.setParallelism(2);

    运行结果:

    第一次:

    1. 2> s1:(1,a2)<-------->s2:(1,a1,1)
    2. 1> s1:(1,a1)<-------->s2:(1,a2,2)

    第二次:

    1. 2> s2:(1,a2,2)<-------->s1:(1,a2)
    2. 1> s2:(1,a1,1)<-------->s1:(1,a1)
    3. 2> s2:(4,c,1)<-------->s1:(4,c)
    4. 1> s2:(3,b,1)<-------->s1:(3,b)

     我们发现,当并行度为多个的时候,如果不指定分区器的话,每次的运行结果都不一样。

            在CoProcessFunction中,可以通过RuntimeContext对象来获取自己的任务编号。所以我们通过在 processElement1 和 processElement2 方法中 调用getRuntimeContext().getIndexOfThisSubtask() 方法获得当前数据所在的 任务编号可以发现,几乎每次数据的分区结果都不一样,但元组对象的 hash值却是一样的。具体分区细节还得去看源码。

    指定按照 元组的第一个字段进行 keyBy 分区: 

    1. // 多并行度条件下需要根据关联条件进行 keyBy 才能保证相同的 key 分到同一任务中去
    2. ConnectedStreams, Tuple3> connect = source1.connect(source2).keyBy(s1 -> s1.f0,s2->s2.f0);

    运行结果:

    1. 1> s1:(4,c)<-------->s2:(4,c,1)
    2. 2> s1:(1,a1)<-------->s2:(1,a1,1)
    3. 2> s2:(1,a2,2)<-------->s1:(1,a1)
    4. 2> s1:(1,a2)<-------->s2:(1,a1,1)
    5. 2> s1:(1,a2)<-------->s2:(1,a2,2)
    6. 2> s1:(3,b)<-------->s2:(3,b,1)
  • 相关阅读:
    pandas学习(五)merge
    华为路由器AR6300 取消密码重置提醒和密码长期有效
    【Log】为类中的所有日志打印添加前缀
    [Windows] GoLand 加载 k8s v1.14或之前版本 源码
    SQL引擎子系统的工作原理
    预处理详解
    Web3新品牌ZAN亮相外滩大会 为海外客户提供全栈安全可信技术
    《机器人SLAM导航核心技术与实战》第1季:第0章_SLAM发展综述
    3A4000架构银河麒麟V10编译安装filezilla
    使用Keras Tuner进行自动超参数调优的实用教程
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/134447403