• Spark repartitionAndSortWithinPartitions


    算子含义

    根据给定的分区器对 RDD 进行重新分区,并在每个生成的分区中,按记录的key对记录进行排序。这比调用 repartition 然后在每个分区内排序更有效,因为它可以将排序向下推到 shuffle 机器中。
    如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子
    必须是 类型的rdd才可以调用

    建议使用javaApi,scala的优点麻烦的

    在这里插入图片描述

    spark.java.api

    flatMapToPair + repartitionAndSortWithinPartitions

      JavaPairRDD<ImmutableBytesWritable, KeyValue> hFileRdd = ds.javaRDD()
                    .flatMapToPair(new PairFlatMapFunction<Row, ImmutableBytesWritable, KeyValue>() {
                        @Override
                        public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(Row row) throws Exception {
                            String rowKey = row.getString(0); // 按照约定,第一个字段是rowKey
    
                            ArrayList<Tuple2<ImmutableBytesWritable, KeyValue>> list = new ArrayList<>();
    
                            for (int i = 1; i < row.length(); i++) {
                                String fieldName = row.schema().fields()[i].name();
                                String columnFamily = fieldName.split(":")[0];
                                String qualifier = fieldName.split(":")[1];
                                String value = String.valueOf(row.get(i));
                                KeyValue keyValue = new KeyValue(
                                        Bytes.toBytes(rowKey),
                                        Bytes.toBytes(columnFamily),
                                        Bytes.toBytes(qualifier),
                                        Bytes.toBytes(value));
                                list.add(new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), keyValue));
                            }
    
                            return list.iterator();
                        }
                    });
    
    //可以直接调用 repartitionAndSortWithinPartitions
    JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitioned =
                        hFileRdd.repartitionAndSortWithinPartitions(new RegionPartitioner(regionSplits.toArray(new String[regionSplits.size()])));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    spark.scala.api

    flatMapRdd + OrderedRDDFunctions + repartitionAndSortWithinPartitions

     val flatMapRdd: RDD[(ImmutableBytesWritable, KeyValue)] = df.rdd.flatMap(row => {
    
          val rowkey: Int = row.getInt(0)
          val columnFamily: String = "i"
    
          val list = new util.ArrayList[(ImmutableBytesWritable, KeyValue)]()
    
          for (i <- 0 until row.length) {
            val fieldName: String = row.schema.fields(i).name
            val qualifier: String = fieldName.split(":")(0)
            val value: String = String.valueOf(row.get(i))
            val keyValue = new KeyValue(
              Bytes.toBytes(rowkey),
              Bytes.toBytes(columnFamily),
              Bytes.toBytes(qualifier),
              Bytes.toBytes(value)
            )
            list.add(new Tuple2[ImmutableBytesWritable, KeyValue](new ImmutableBytesWritable(Bytes.toBytes(rowkey)), keyValue))
          }
          list.asScala
        })
    
     //必须要加
        implicit val caseInsensitiveOrdering = new Ordering[ImmutableBytesWritable] {
          override def compare(x: ImmutableBytesWritable, y: ImmutableBytesWritable): Int = x.compareTo(y)
        }
    
    //此时才可以调用 repartitionAndSortWithinPartitions
    flatMapRdd.repartitionAndSortWithinPartitions(new RegionPartitioner())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    也可以不加

     def main(args: Array[String]): Unit = {
        val session: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local").getOrCreate()
        val sc: SparkContext = session.sparkContext
        val rdd: RDD[String] = sc.makeRDD(List("spark", "flink", "hbase", "kafka"))
    
        val pairRdd: RDD[(Long, String)] = rdd.zipWithIndex().map(f => (f._2, f._1))
    
        val rsRdd: RDD[(Long, String)] = pairRdd.repartitionAndSortWithinPartitions(new HashPartitioner(2))
    
        rsRdd.collect().foreach(println)
    
        session.close()
    	 
    	-------------------------- 输出-------------------------
    		(0,spark)
    		(2,hbase)
    		(1,flink)
    		(3,kafka)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    关于OrderedRDDFunctions

    /**
    (key, value) 对的 RDD 上可用的额外函数,其中键可通过隐式转换进行排序。
    它们将适用于在范围内具有隐式 Ordering[K] 的任何键类型 K。
    所有标准基本类型都已存在排序对象。用户还可以为自定义类型定义自己的排序,或覆盖默认排序。将使用最近范围内的隐式排序
    demo:
    import org.apache.spark.SparkContext._
    
       val rdd: RDD[(String, Int)] = ...
       implicit val caseInsensitiveOrdering = new Ordering[String] {
         override def compare(a: String, b: String) =
           a.toLowerCase(Locale.ROOT).compare(b.toLowerCase(Locale.ROOT))
       }
    
       // Sort by key, using the above case insensitive ordering.
       rdd.sortByKey()
    **/
    class OrderedRDDFunctions[K : Ordering : ClassTag,
                              V: ClassTag,
                              P <: Product2[K, V] : ClassTag] @DeveloperApi() (
        self: RDD[P])
      extends Logging with Serializable {
      private val ordering = implicitly[Ordering[K]]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
  • 相关阅读:
    Yew应用中如何获取<textarea/>的值?
    js绘制的漂亮玫瑰曲线rose curve
    速盾:ddos高防ip原理
    数据库(入门)
    ADAS/AD笔记之特斯拉Autopilot-HW3.0系统
    使用idea时,光标变成了不能按空格键,只能修改的vim格式,怎么切换回正常光标
    【教程】 iOS混淆加固原理篇
    Java常量池
    基于centos7安装nginx
    ValueError: invalid literal for int() with base 10: ‘‘
  • 原文地址:https://blog.csdn.net/Lzx116/article/details/125127422