• Spark(2)-基础tranform算子(一)


    一、算子列表

    编号名称
    1map算子
    2flatMap算子
    3filter算子
    4mapPartitions算子
    5mapPartitionsWithIndex算子
    6keys算子
    7values算子
    8mapValues算子
    9flatMaplValues算子
    10union算子
    11reducedByKey算子
    12combineByKey算子
    13groupByKey算子
    14foldByKey算子
    15aggregateByKey算子
    16ShuffledRDD算子
    17distinct算子
    18partitionBy算子

     二、代码示例

    1. package sparkCore
    2. import org.apache.hadoop.mapreduce.task.reduce.Shuffle
    3. import org.apache.log4j.{Level, Logger}
    4. import org.apache.spark.rdd.{RDD, ShuffledRDD}
    5. import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
    6. import org.apache.spark.{Aggregator, HashPartitioner, SparkConf, SparkContext, TaskContext}
    7. /**
    8. * spark基本算子
    9. */
    10. object basi_transform_02 {
    11. def main(args: Array[String]): Unit = {
    12. val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")
    13. val sc: SparkContext = new SparkContext(conf)
    14. sc.setLogLevel("WARN")
    15. //1. map算子
    16. val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7),2)
    17. val map_rdd: RDD[Int] = rdd1.map(_ * 2)
    18. println("*****1. map算子************")
    19. map_rdd.foreach(println(_))
    20. //2.flatMap算子
    21. println("*****2.flatMap算子************")
    22. val arr: Array[String] = Array(
    23. "Hive python spark",
    24. "Java Hello Word"
    25. )
    26. val rdd2: RDD[String] = sc.makeRDD(arr, 2)
    27. val flatMap_rdd: RDD[String] = rdd2.flatMap(_.split(" "))
    28. flatMap_rdd.foreach(println(_))
    29. //3.filter算子
    30. println("*****3.filter算子***********")
    31. val rdd3: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10))
    32. val filter_rdd :RDD[Int]= rdd3.filter(_ % 2 == 0)
    33. filter_rdd.foreach(println(_))
    34. //4. mapPartitions算子:将数据以分区的形式返回,进行map操作,一个分区对应一个迭代器
    35. // 应用场景: 比如在进行数据库操作时,在操作数据之前,需要通过JDBC方式连接数据库,如果使用map,那每条数据处理之前
    36. // 都需要连接一次数据库,效率显然很低.如果使用mapPartitions,则每个分区连接一次即可
    37. println("*****4. mapPartitions算子**********")
    38. val rdd4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10),2)
    39. val mapParition_rdd: RDD[Int] = rdd4.mapPartitions(iter => {
    40. print("模拟数据库连接操作")
    41. iter.map(_ * 2)
    42. })
    43. mapParition_rdd.foreach(println(_))
    44. //5. mapPartitionsWithIndex算子,类似于mapPartitions,不过有两个参数
    45. // 第一个参数是分区索引,第二个是对应的迭代器
    46. // 注意:函数返回的是一个迭代器
    47. println("*****5. mapPartitionsWithIndex算子**********")
    48. val rdd5: RDD[Int] = sc.parallelize(List(10, 20, 30, 40, 60),2)
    49. val mapPartitionWithIndex_Rdd: RDD[String] = rdd5.mapPartitionsWithIndex((index, it) => {
    50. it.map(e => s"partition:$index,val:$e")
    51. })
    52. mapPartitionWithIndex_Rdd.foreach(println(_))
    53. //6.keys算子: RDD中的数据是【对偶元组】类型,返回【对偶元组】的全部key
    54. println("*****6.keys算子**********")
    55. val lst: List[(String, Int)] = List(
    56. ("spark", 1), ("spark", 3), ("hive", 2),
    57. ("Java", 1), ("Scala", 3), ("Python", 2)
    58. )
    59. val rdd6: RDD[(String, Int)] = sc.parallelize(lst)
    60. val keysRdd: RDD[String] = rdd6.keys
    61. keysRdd.foreach(println(_))
    62. //7.values: RDD中的数据是【对偶元组】类型,返回【对偶元组】的全部value
    63. println("*****7.values算子**********")
    64. val values_RDD: RDD[Int] = rdd6.values
    65. values_RDD.foreach(println(_))
    66. //8.mapValues: RDD中的数据为对偶元组类型, 将value进行计算,然后与原Key进行组合返回(即返回的仍然是元组)
    67. println("*****8.mapValues算子**********")
    68. val lst2: List[(String, Int)] = List(
    69. ("Hello", 1), ("world", 2),
    70. ("I", 2), ("love", 3), ("you", 2)
    71. )
    72. val rdd8: RDD[(String, Int)] = sc.parallelize(lst2, 2)
    73. val mapValues_rdd: RDD[(String, Int)] = rdd8.mapValues(_ * 10)
    74. mapValues_rdd.foreach(println(_))
    75. //9.flatMaplValues:RDD是对偶元组,将value应用传入flatMap打平后,再与key组合
    76. println("*****9.flatMaplValues算子**********")
    77. // ("spark","1 2 3") => ("spark",1),("spark",2),("spark",3)
    78. val lst3: List[(String,String )] = List(
    79. ("Hello", "1 2 3"), ("world", "4 5 6"),
    80. )
    81. val rdd9: RDD[(String, String)] = sc.parallelize(lst3)
    82. // 第一个_是指初始元组中的value;第二个_是指value拆分后的每一个值(转换成整数)
    83. val flatMapValues: RDD[(String, Int)] = rdd9.flatMapValues(_.split(" ").map(_.toInt))
    84. flatMapValues.foreach(println(_))
    85. //10.union:将两个类型一样的RDD合并到一起,返回一个新的RDD,新的RDD分区数量是两个RDD分区数量之和
    86. println("*****10.union算子**********")
    87. val union_rdd1 = sc.parallelize(List(1, 2, 3), 2)
    88. val union_rdd2 = sc.parallelize(List(4, 5, 6), 3)
    89. val union_rdd: RDD[Int] = union_rdd1.union(union_rdd2)
    90. union_rdd.foreach(println(_))
    91. //11.reducedByKey,在每个分区中进行局部分组聚合,然后将每个分区聚合的结果从上游拉到下游再进行全局分组聚合
    92. println("*****11.reducedByKey算子**********")
    93. val lst4: List[(String, Int)] = List(
    94. ("spark", 1), ("spark", 1), ("hive", 3),
    95. ("Python", 1), ("Java", 1), ("Scala", 3),
    96. ("flink", 1), ("Mysql", 1), ("hive", 3)
    97. )
    98. val rdd11: RDD[(String, Int)] = sc.parallelize(lst4, 2)
    99. val reduced_rdd: RDD[(String, Int)] = rdd11.reduceByKey(_ + _)
    100. reduced_rdd.foreach(println(_))
    101. //12.combineByKey:相比reducedByKey更底层的方法,后者分区内和分区之间相同Key对应的value值计算逻辑相同,但是前者可以分别定义不同的
    102. // 的计算逻辑.combineByKey 需要传入三个函数作为参数:
    103. // 其中第一个函数:key在上游分区第一次出现时,对应的value该如何处理
    104. // 第二个函数:分区内相同key对应value的处理逻辑
    105. // 第三个函数: 分区间相同Key对应value的处理逻辑
    106. println("*****12.combineByKey算子**********")
    107. val f1 = (v:Int) => {
    108. val stage = TaskContext.get().stageId()
    109. val partition = TaskContext.getPartitionId()
    110. println(s"f1 function invoked in stage: $stage,partiton:$partition")
    111. v
    112. }
    113. //分区内相同key对应的value使用乘积
    114. val f2 = (a:Int,b:Int) => {
    115. val stage = TaskContext.get().stageId()
    116. val partition = TaskContext.getPartitionId()
    117. println(s"f2 function invoked in stage: $stage,partiton:$partition")
    118. a * b
    119. }
    120. //分区间相同key对应的value使用加法
    121. val f3 = (m:Int,n:Int) => {
    122. val stage = TaskContext.get().stageId()
    123. val partition = TaskContext.getPartitionId()
    124. println(s"f3 function invoked in stage: $stage,partiton:$partition")
    125. m + n
    126. }
    127. val rdd12: RDD[(String, Int)] = sc.parallelize(lst4,2)
    128. val combineByKey_rdd: RDD[(String, Int)] = rdd12.combineByKey(f1, f2, f3)
    129. combineByKey_rdd.foreach(println(_))
    130. //13.groupByKey:按key进行分组,返回的是(key,iter(value集合)
    131. println("*****13.groupByKey算子**********")
    132. val rdd13: RDD[(String, Int)] = sc.parallelize(lst4, 3)
    133. val groupByKey_rdd: RDD[(String, Iterable[Int])] = rdd13.groupByKey()
    134. groupByKey_rdd.foreach(println(_))
    135. //14.foldByKey:每个分区应⽤⼀次初始值,先在每个进⾏局部聚合,然后再全局聚合(注意全局聚合的时候,初始值并不会被用到)
    136. // 局部聚合的逻辑与全局聚合的逻辑相同
    137. println("*****14.foldByKey算子**********")
    138. val lst5: List[(String, Int)] = List(
    139. ("maple", 1), ("kelly", 1), ("Avery", 1),
    140. ("maple", 1), ("kelly", 1), ("Avery", 1)
    141. )
    142. val rdd14: RDD[(String, Int)] = sc.parallelize(lst5)
    143. val foldByKey_rdd: RDD[(String, Int)] = rdd14.foldByKey(1)(_ + _)
    144. foldByKey_rdd.foreach(println(_))
    145. //15.aggregateByKey:foldByKey,并且可以指定初始值,每个分区应⽤⼀次初始值,传⼊两个函数,分别是局部聚合的计算逻辑
    146. // 和全局聚合的逻辑
    147. println("*****15.aggregateByKey算子**********")
    148. val rdd15: RDD[(String, Int)] = sc.parallelize(lst5)
    149. val aggregateByKey_rdd: RDD[(String, Int)] = rdd15.aggregateByKey(1)(_ + _,_ * _ )
    150. aggregateByKey_rdd.foreach(print(_))
    151. //16 ShuffledRDD:reduceByKey、combineByKey、aggregateByKey、foldByKey底层都是使⽤的ShuffledRDD,
    152. // 并且 mapSideCombine = true
    153. println("*****16.ShuffledRDD算子**********")
    154. val rdd16: RDD[(String, Int)] = sc.parallelize(lst5,2)
    155. val partitioner = new HashPartitioner(rdd16.partitions.length)
    156. // 对rdd16按照指定分区器进行分区
    157. // String是rdd16中Key的数据类型,第一个Int是rdd16中value的数据类型,第二个Int是中间结果的数据类型(当然前提是传入聚合器-里面包含计算逻辑
    158. // [可以据此知晓中间结果的数据类型])
    159. val shuffledRDD: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String, Int, Int](rdd16,partitioner)
    160. // 设置一个聚合器: 指定rdd16的计算逻辑(包含三个函数,分别是分区内一个key对应value的处理逻辑;分区内相同key对应value计算逻辑
    161. // 和分区间相同Key对应value计算逻辑)
    162. val aggregator: Aggregator[String, Int, Int] = new Aggregator[String, Int, Int](f1, f2, f3)
    163. // 给shuffledRDD设置聚合器
    164. shuffledRDD.setAggregator(aggregator)
    165. shuffledRDD.setMapSideCombine(true) // 设置Map端聚合
    166. println(shuffledRDD.collect().toList)
    167. // 17.distinct算子:对RDD元素进行去重
    168. println("*****17.distinct算子**********")
    169. val lst6: Array[String] = Array(
    170. "spark", "spark", "hive",
    171. "Python", "Python", "Java"
    172. )
    173. val rdd17: RDD[String] = sc.parallelize(lst6)
    174. val distinct_rdd: RDD[String] = rdd17.distinct()
    175. println(distinct_rdd.collect().toList)
    176. // 18.partitionBy: 按照指定的分区器进行分区(底层使用的是ShuffleRDD)
    177. println("***** 18.partitionBy算子**********")
    178. val rdd18: RDD[(String,Int)] = sc.parallelize(lst5,2)
    179. val partitioner2 = new HashPartitioner(rdd18.partitions.length)
    180. val partitioned_rdd: RDD[(String, Int)] = rdd18.partitionBy(partitioner2)
    181. println(partitioned_rdd.collect().toList)
    182. sc.stop()
    183. }
    184. }

  • 相关阅读:
    Mysql执行报错:[Err] 1292 - Truncated incorrect DOUBLE value:***
    SAP ABAP OData 服务如何支持 $select 有选择性地仅读取部分模型字段值试读版
    LeetCode --- 2. Add Two Numbers 解题报告
    你还弄不清xxxForCausalLM和xxxForConditionalGeneration吗?
    zookeeper的安装与配置
    图像滤波总结
    java计算机毕业设计京津冀畅游网设计源码+mysql数据库+系统+lw文档+部署
    web安全之XSS攻击
    文心大模型4.0亮相2023百度世界大会,助力各赛道应用进一步发展
    搭建私有git服务器:GitLab部署
  • 原文地址:https://blog.csdn.net/weixin_37901366/article/details/136438083