• Spark【RDD编程(二)RDD编程基础】


    前言

    接上午的那一篇,下午我们学习剩下的RDD编程,RDD操作中的剩下的转换操作和行动操作,最好把剩下的RDD编程都学完。

    Spark【RDD编程(一)RDD编程基础】

    RDD 转换操作

    6、distinct

    对 RDD 集合内部的元素进行去重,然后把去重后的其他元素放到一个新的 RDD 集合内。

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.{SparkConf, SparkContext}
    3. object RDDTransForm {
    4. def main(args: Array[String]): Unit = {
    5. // 创建SparkContext对象
    6. val conf = new SparkConf()
    7. conf.setAppName("spark core rdd transform").setMaster("local")
    8. val sc = new SparkContext(conf)
    9. // 通过并行集合创建RDD对象
    10. val arr = Array("Spark","Flink","Spark","Storm")
    11. val rdd1: RDD[String] = sc.parallelize(arr)
    12. val rdd2: RDD[String] = rdd1.distinct()
    13. rdd2.foreach(println)
    14. //关闭SparkContext
    15. sc.stop()
    16. }
    17. }

    运行输出:

    1. Flink
    2. Spark
    3. Storm

    可以看到,重复的元素"Spark"被去除掉。 

    7、union

            对 两个 RDD 集合进行并集运算,并返回新的 RDD集合,虽然是并集运算,但整个过程不会把重复的元素去除掉。
    1. // 通过并行集合创建RDD对象
    2. val arr1 = Array("Spark","Flink","Storm")
    3. val arr2 = Array("Spark","Flink","Hadoop")
    4. val rdd1: RDD[String] = sc.parallelize(arr1)
    5. val rdd2: RDD[String] = sc.parallelize(arr2)
    6. val rdd3: RDD[String] = rdd1.union(rdd2)
    7. rdd3.foreach(println)

    运行结果:

    1. Spark
    2. Flink
    3. Storm
    4. Spark
    5. Flink
    6. Hadoop
    可以看到,重复的元素"Spark"和"Flink"没有被去除。

    8、intersection

    对两个RDD 集合进行交集运算。

    1. // 通过并行集合创建RDD对象
    2. val arr1 = Array("Spark","Flink","Storm")
    3. val arr2 = Array("Spark","Flink","Hadoop")
    4. val rdd1: RDD[String] = sc.parallelize(arr1)
    5. val rdd2: RDD[String] = sc.parallelize(arr2)
    6. val rdd3: RDD[String] = rdd1.intersection(rdd2)
    7. rdd3.foreach(println)

    运行结果:

    1. Spark
    2. Flink

    "Spark"和"Flink"是两个RDD集合都有的。 

    9、subtract

    对两个RDD 集合进行差集运算,并返回新的RDD 集合。

    rdd1.substract(rdd2) 返回的是 rdd1有而rdd2中没有的元素,并不会把rdd2中有rdd1中没有的元素也包进来。

    1. // 通过并行集合创建RDD对象
    2. val arr1 = Array("Spark","Flink","Storm")
    3. val arr2 = Array("Spark","Flink","Hadoop")
    4. val rdd1: RDD[String] = sc.parallelize(arr1)
    5. val rdd2: RDD[String] = sc.parallelize(arr2)
    6. val rdd3: RDD[String] = rdd1.subtract(rdd2)
    7. rdd3.foreach(println)

    运算结果:

    Storm

    "Storm"是rdd1中有的二rdd2中没有的,并不会返回"Hadoop"。 

    10、zip

    把两个 RDD 集合中的元素以键值对的形式进行合并,所以需要确保两个RDD 集合的元素个数必须是相同的。

    1. // 通过并行集合创建RDD对象
    2. val arr1 = Array("Spark","Flink","Storm")
    3. val arr2 = Array(1,3,5)
    4. val rdd1: RDD[String] = sc.parallelize(arr1)
    5. val rdd2: RDD[Int] = sc.parallelize(arr2)
    6. val rdd3: RDD[(String,Int)] = rdd1.zip(rdd2)
    7. rdd3.foreach(println)

    运行结果:

    1. (Spark,1)
    2. (Flink,3)
    3. (Storm,5)

    RDD 行动操作

    RDD 的行动操作是真正触发计算的操作,计算过程十分简单。

    1、count

    返回 RDD 集合中的元素数量。

    2、collect

    以数组的形式返回 RDD 集合中所有元素。

    3、first

    返回 RDD 集合中的第一个元素。

    4、take(n)

    返回 RDD 集合中前n个元素。

    5、reduce(func)

    以规则函数func对RDD集合中的元素进行循环处理,比如将所有元素加到一起或乘起来。

    6、foreach

    对RDD 集合进行遍历,输出RDD集合中所有元素。

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.{SparkConf, SparkContext}
    3. object RDDAction {
    4. def main(args: Array[String]): Unit = {
    5. // 创建SparkContext对象
    6. val conf = new SparkConf()
    7. conf.setAppName("spark core rdd transform").setMaster("local")
    8. val sc = new SparkContext(conf)
    9. //通过并行集合创建 RDD 对象
    10. val arr: Array[Int] = Array(1,2,3,4,5)
    11. val rdd: RDD[Int] = sc.parallelize(arr)
    12. val size: Long = rdd.count()
    13. val nums: Array[Int] = rdd.collect()
    14. val value: Int = rdd.first()
    15. val res: Array[Int] = rdd.take(3)
    16. val sum: Int = rdd.reduce((v1, v2) => v1 + v2)
    17. println("size = " + size)
    18. println("The all elements are ")
    19. nums.foreach(println)
    20. println("The first element in rdd is " + value)
    21. println("The first three elements are ")
    22. res.foreach(println)
    23. println("sum is " + sum)
    24. rdd.foreach(print)
    25. //关闭SparkContext
    26. sc.stop()
    27. }
    28. }

    运行结果:

    1. size = 5
    2. The all elements are
    3. 1
    4. 2
    5. 3
    6. 4
    7. 5
    8. The first element in rdd is 1
    9. The first three elements are
    10. 1
    11. 2
    12. 3
    13. sum is 15
    14. 12345
    15. Process finished with exit code 0

    文本长度计算案例

    计算 data 目录下的文件字节数(文本总长度)。

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.{SparkConf, SparkContext}
    3. object FileLength {
    4. def main(args: Array[String]): Unit = {
    5. val conf = new SparkConf()
    6. conf.setAppName("spark core rdd transform").setMaster("local")
    7. val sc = new SparkContext(conf)
    8. val rdd1: RDD[String] = sc.textFile("data")
    9. val rdd2: RDD[Int] = rdd1.map(line => line.length)
    10. val fileLength: Int = rdd2.reduce((len1, len2) => len1 + len2)
    11. println("File length is " + fileLength)
    12. sc.stop()
    13. }
    14. }

    持久化

    在Spark 中,RDD采用惰性机制,每次遇到行动操作,就会从头到尾开始执行计算,这对于迭代计算代价是很大的,因为迭代计算经常需要多次重复使用相同的一组数据。

    1、cache

    • 使用cache() 方法将需要持久化的RDD对象持久化进缓存中
    • 使用unpersist() 方法将持久化rdd从缓存中释放出来
    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.{SparkConf, SparkContext}
    3. object RDDCache {
    4. def main(args: Array[String]): Unit = {
    5. val conf = new SparkConf()
    6. conf.setAppName("spark core rdd transform").setMaster("local")
    7. val sc = new SparkContext(conf)
    8. val list = List("Hadoop","Spark","Hive","Flink")
    9. val rdd: RDD[String] = sc.parallelize(list)
    10. rdd.cache()
    11. println(rdd.count()) //第一次行动操作
    12. println(rdd.collect.mkString(",")) //第二次行动操作
    13. rdd.unpersist() //把这个持久化的rdd从缓存中移除,释放内存空间
    14. sc.stop()
    15. }
    16. }

    优点:保存在内存,读取速度快

    缺点:存储在数据数据不完全,容易丢失

    2、persist

            都是在要持久化的对象后面直接 .persist() ,和 cache 的区别是,persist 是把数据持久化到磁盘中去的。

            优点是数据安全,不容易丢失。        

            缺点就是增加了磁盘的 IO 开销,而且数据不能跨作业共享(这个作业结束后,临时持久化的数据也就丢失了)

    3、checkpoint

             首先需要设置检查点的路径比如 sc.checkpoint("./data") ,然后当又要持久化的对象时,直接 df.checkpoint() 即可。

            同样,优点是数据保存在磁盘,更加安全,而且可以跨作业!

            缺点除了增加磁盘 IO 开销以外,checkpoint 为了保证数据安全会额外开启一个作业。

    所以,一般我们通过把 checkpoint 和 cache 结合来使用:

    1. rdd.cache()
    2. rdd.checkpoint()

    分区

    分区的作用

            RDD 是弹性分布式数据集,通过 RDD 都很大,会被分成多个分区,分别保存在不同的节点上。进行分区的好处:  

    1. 增加并行度。一个RDD不分区直接进行计算的话,不能充分利用分布式集群的计算优势;如果对RDD集合进行分区,由于一个文件保存在分布式系统中不同的机器节点上,可以就近利用本分区的机器进行计算,从而实现多个分区多节点同时计算,并行度更高。
    2. 减少通信开销。通过数据分区,对于一些特定的操作(如join、reduceByKey、groupByKey、leftOuterJoin等),可以大幅度降低网络传输。

    分区的原则

            使分区数量尽量等于集群中CPU核心数目。可以通过设置配置文件中的 spark.default.parallelism 这个参数的值,来配置默认的分区数目。

    设置分区的个数 

    1、创建 RDD对象时指定分区的数量

    1.1、通过本地文件系统或HDFS加载
    sc.textFile(path,partitionNum)
    1.2、通过并行集合加载 

     对于通过并行集合来创建的RDD 对象,如果没有在参数中指定分区数量,默认分区数目为 min(defaultParallelism,2) ,其中defaultParallelism就是配置文件中的spark.default.parallelism。如果是从HDFS中读取文件,则分区数目为文件分片的数目。

    2、使用repartition()方法重新设置分区个数

    val rdd2 = rdd1.repartition(1)    //重新设置分区为1

    自定义分区函数

    继承 org.apache.spark.Partitioner 这个类,并实现下面3个方法:

    1. numPartitions: Int ,用于返回创建出来的分区数。
    2. getPartition(key: Any),用于返回给定键的分区编号(0~paratitionNum-1)。
    3. equals(),Java中判断相等想的标准方法。

    注意:Spark 的分区函数针对的是(key,value)类型的RDD,也就是说,RDD中的每个元素都是(key,value)类型的,然后函数根据 key 对RDD 元素进行分区。所以,当要对一些非(key,value)类型的 RDD 进行自定义分区时,需要首先把 RDD 元素转换为(key,value)类型,然后再使用分区函数。

    案例

    将奇数和偶数分开写到不同的文件中去。

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    3. class MyPartitioner(numParts: Int = 2) extends Partitioner{
    4. //覆盖默认的分区数目
    5. override def numPartitions: Int = numParts
    6. //覆盖默认的分区规则
    7. override def getPartition(key: Any): Int = {
    8. if (key.toString.toInt%2==0) 1 else 0
    9. }
    10. }
    11. object MyPartitioner{
    12. def main(args: Array[String]): Unit = {
    13. val conf = new SparkConf()
    14. conf.setAppName("partitioner").setMaster("local")
    15. val sc: SparkContext = new SparkContext(conf)
    16. val data: Array[Int] = (1 to 100).toArray
    17. val rdd: RDD[Int] = sc.parallelize(data,5)
    18. val savePath:String = System.getProperty("user.dir")+"/data/rdd/out"
    19. rdd.map((_,1)).partitionBy(new MyPartitioner()).map(_._1).saveAsTextFile(savePath)
    20. sc.stop()
    21. }
    22. }

    我们在代码中创建RDD 对象的时候,我们指定了分区默认的数量为 5,然后我们使用我们自定义的分区,观察会不会覆盖掉默认的分区数量: 

    运行结果:

    我们可以看到,除了校验文件,一共生成了两个文件,其中一个保存了1~100的所有奇数,一个保存了1~100的所有偶数; 

    综合案例

    在上一篇博客中,我们已经做过WordCount了,但是明显篇幅比较长,这里我们简化后只需要两行代码:

    1. //使用本地文件作为数据加载创建RDD 对象
    2. val rdd: RDD[String] = sc.textFile("data/word.txt")
    3. //RDD("Hadoop is good","Spark is better","Spark is fast")
    4. val res_rdd: RDD[(String,Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    5. //flatMap:
    6. //RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))
    7. //RDD("Hadoop","is",good","Spark","is","better","Spark","is","fast"))

    运行结果:

    1. (Spark,2)
    2. (is,3)
    3. (fast,1)
    4. (good,1)
    5. (better,1)
    6. (Hadoop,1)

    总结

    至此,我们RDD基础编程部分就结束了,但是RDD编程还没有结束,接下来我会继续学习键值对RDD、数据读写,最后总结性低做一个大的综合案例。

  • 相关阅读:
    【方向盘】Java EE几十种技术,“活着的”还剩几何(企业应用技术篇)
    腾讯云tca认证要具备哪些能力?腾讯云tca认证适合那些人群
    STM32CubeMX教程29 USB_HOST - 使用FatFs文件系统读写U盘
    PLC中ST编程的基础知识
    Qt5开发从入门到精通——第五篇一节( 文本编辑器 Easy Word 开发 V1.0 详解 )
    rust-learn
    线程池的核心底层参数
    基于Spring Boot开发的汽车租赁管理系统
    ECCV 2022 旷视入选论文亮点解读(上)
    C++哪些函数不能成为虚函数
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/132651323