• 大数据学习之Spark基础


    Spark基础

    简述

    1、spark作业执行的特点:

    (1)只有遇到行动算子的时候,整个spark作业才会被触发执行

    (2)遇到几次,执行几次

    2、RDD: 弹性分布式数据集

    弹性:数据量可大可小

    RDD类似于容器,但是本身存储的不是数据,是计算逻辑

    当遇到行动算子的时候,整个spark作业才会被触发执行,从第一个RDD开始执行,数据才开始产生流动

    数据在RDD之间只是流动关系,不会存储

    流动的数据量可以很大,也可以很小,所以称为弹性

    分布式:

    spark本质上它是需要从HDFS中读取数据的,HDFS是分布式,数据block块将来可能会在不同的datanode上

    RDD中流动的数据,可能会来自不同的datanode中的block块数据

    数据集:

    计算流动过程中,可以短暂地将RDD看成一个容器,容器中有数据,默认情况下在内存中不会进行存储

    后面会有办法将一个RDD的数据存储到磁盘中

    RDD的5大特性:(面试必问!)
    1、RDD是由一系列分区构成

    1)读文件时的minPartitions参数只能决定最小分区数,实际读取文件后的RDD分区数,由数据内容本身以及集群的分布来共同决定的

    2)若设置minPartitions的数量比block块数量还少的话,实际上以block块数量来决定分区数

    3)产生shuffle的算子调用时,可以传入numPartitions(例如:groupby()),可以真正改变RDD的分区数,设置多少,最终RDD就有多少分区

    4)文件会以block块的形式存储在HDFS上,若文件未达到128M默认值的话也会被一个block块存储。

    一开始RDD中的分区数由读取数据的block块数量决定的。

    后一个RDD中的分区数据,除KV函数以外,对应的是前一个RDD中的分区数据所进行逻辑处理后的结果。默认情况下,若后续分区不做处理的话,后续所有的RDD的分区数取决于第一个RDD。

    最终RDD中有几个分区,将来在HDFS中就会看到几个结果文件(HDFS -> RDD -> HDFS)

    2、算子是作用在每一个分区上的(每一个分区都会处理)
    3、RDD与RDD之间存在一些依赖关系

    1)窄依赖 前一个RDD中的某一个分区数据只会到后一个RDD中的某唯一分区中 一对一(也可能前多个分区到后一个分区中)的关系

    2)宽依赖 前一个RDD中的某一个分区数据会进入到后一个RDD中的不同分区中 一对多的关系 也可以通过查看是否产生shuffle来判断

    3)整个spark作业会被宽依赖的个数划分若干个stage, Num(stage) = Num(宽依赖) + 1

    4)当遇到产生shuffle的算子的时候,涉及到从前一个RDD写数据到磁盘中,从磁盘中读取数据到后一个RDD的现象,

    注意:第一次触发执行的时候,磁盘是没有数据的,所以会从第一个RDD产生开始执行

    当重复触发相同的执行的时候,对于同一个DAG有向无环图而言,会直接从shuffle之后的RDD开始执行(省略从前一个RDD写数据到磁盘中的过程),可以直接从磁盘读取数据。

    5)**在一个阶段中,RDD有几个分区,**就会有几个并行task任务

    4、kv算子只能作用在kv的RDD上
    5、spark会提供最优的任务计算方式,只移动计算,不移动数据。

    Spark的设计原则之一是数据本地化(Data Locality),即尽量让计算任务在数据所在的节点上执行,从而减少数据的网络传输开销。

    Spark实例:wordcount
    object WordCount2 {
      def main(args: Array[String]): Unit = {
        //创建spark配置文件对象
        val conf: SparkConf = new SparkConf()
        //设置运行模式
        //如果是本地local模式运行的话,需要设置setMaster
        //将来如果是集群进行,将这句话注释即可
        conf.setMaster("local")
        //设置spark作业的名字
        conf.setAppName("wordcount")
    
        //创建spark core上下文环境对象
        val sc: SparkContext = new SparkContext(conf)
        //===================================================================================
    
        //读取文件,每次读取一行
        //RDD是spark core中的核心数据结构,将来运行的时候,数据会在RDD之间流动,默认基于内存计算
        val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*")
        //    println(s"linesRDD的分区数:${linesRDD.getNumPartitions}")
    
        //一行数据根据分隔符分割
        val wordRDD: RDD[String] = linesRDD.flatMap(_.split("\\|"))
        //    println(s"wordRDD的分区数:${wordRDD.getNumPartitions}")
    
    
        //将每一个单词组成(word,1)
        val kvRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
            println(s"kvRDD的分区数:${kvRDD.getNumPartitions}")
    
        //根据键进行分组,并设置分区数为 5
        val kvRDD2: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(_._1,numPartitions = 5)
            println(s"kvRDD2的分区数:${kvRDD2.getNumPartitions}")
    
        val resRDD: RDD[(String, Int)] = kvRDD2.map((e: (String, Iterable[(String, Int)])) => (e._1, e._2.size))
            println(s"resRDD的分区数:${resRDD.getNumPartitions}")
    
        //打印
        resRDD2.foreach(println)
    
        //指定的是所要写入数据的文件夹的路径
        //spark如果是local本地运行的话,会将本地文件系统看作一个hdfs文件系统
        resRDD.saveAsTextFile("spark/data/outdata1")
    
      }
    }
    
    Spark中RDD调用的函数,称之为算子

    算子分为两类:

    1、转换算子(RDD -> RDD,处理逻辑)

    2、行动算子(触发作业的执行)

    1、转换算子
    1)Map
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Demo1Map {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local")
        conf.setAppName("map算子演示")
        val sc: SparkContext = new SparkContext(conf)
    
        val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
        //map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中
        //将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑
        val rdd2: RDD[(String,String,String,String,String)] = lineRDD.map((line: String) => {
          println("==============处理后的数据========================")
          val array1: Array[String] = line.split(",")
          (array1(0),array1(1),array1(2),array1(3),array1(4))
        })
    
        //foreach是一个行动算子,遇到行动算子,触发作业执行
        /**
         * 转换操作(转换算子中定义了操作逻辑)仅仅是定义了数据应该如何被转换,而不会立即执行。
         * 只有当需要计算一个结果时(即调用行动算子时),才会执行。
         * 打印结果:
         * ==============处理后的数据========================
         * (1500100001,施笑槐,22,女,文科六班)
         * ==============处理后的数据========================
         * (1500100002,吕金鹏,24,男,文科六班)
         *每次调用行动算子(foreach)打印一条数据,都会是整个RDD重新执行一次(所有RDD的执行关系是一个有向无环图)
         */
        rdd2.foreach(println)
    
      }
    
    }
    
    2)filter
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object Demo2Filter {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local")
        conf.setAppName("map算子演示")
        val sc: SparkContext = new SparkContext(conf)
    
        //===============================================================
        val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
        //需求:过滤出所有的男生
        //filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条
        // 若不匹配,则无数据在RDD间流动,在下面执行.foreach(println)时也无数据进行打印,
        // 但是判断中的println()属于scala,并不受影响
        val genderRDD: RDD[String] = lineRDD.filter((line: String) => {
          var b: Boolean = false
          if ("女".equals(line.split(",")(3))) {
            println("============这是女生==================")
          } else {
            println("============这是男生==================")
            b = "男".equals(line.split(",")(3))
          }
          b
        })
    
        genderRDD.foreach(println)
    
      }
    }
    
    3)flatMap
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object Demo3FlatMap {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local")
        conf.setAppName("map算子演示")
        val sc: SparkContext = new SparkContext(conf)
    
        //===============================================================
        val lineRDD: RDD[String] = sc.textFile("spark/data/wcs/words.txt")
    
        /**
         * flatMap: 将rdd中的每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
         * 由于flatMap会“扁平化”结果,因此words RDD将包含所有分割后的单词,而不是单词数组(返回一个元素为单个单词的集合)。
         * 打印结果:
         * ===============一条数据====================
         * hello
         * world
         * ===============一条数据====================
         * java
         * hadoop
         * linux
         */
        val rdd1: RDD[String] = lineRDD.flatMap((line:String)=>{
          println("===============一条数据====================")
          line.split("\\|")
        })
    
        rdd1.foreach(println)
    
    
      }
    }
    
    4)sample
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object Demo4Sample {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local")
        conf.setAppName("map算子演示")
        val sc: SparkContext = new SparkContext(conf)
    
        //===============================================================
        val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    
        /**
         * sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
         * 这个函数主要在机器学习的时候会用到
         * withReplacement :
         * 为True时,抽样结果中可能会包含重复的元素。
         * 为False时,抽样结果中不会包含重复的元素。
         * fraction:这是一个浮点数(Double),指定了抽样的比例,取值范围在[0, 1]之间。
         */
        val rdd1: RDD[String] = lineRDD.sample(withReplacement = false, fraction = 0.1)
    
        rdd1.foreach(println)
      }
    }
    
    5)groupBy
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Demo5GroupBy {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
          .setMaster("local")
          .setAppName("groupBy")
    
        val sc: SparkContext = new SparkContext(conf)
    
        //===================================================
        val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
        //求每个班级的平均年龄
        val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
    
        //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
        val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
          case Array(_, _, age: String, _, clazz: String) =>
            (clazz, age.toInt)
        }
    
        /**
         * groupBy算子的使用
         *
         * 1、groupBy的算子,后面的分组条件是我们自己指定的
         * 2、spark中groupBy之后的,所有值会被封装到一个Iterable迭代器中存储(与scala中不同)
         * 输出:
         * (理科二班,22.556962025316455)
         * (文科三班,22.680851063829788)
         * (理科四班,22.63736263736264)
         * (理科一班,22.333333333333332)
         * (文科五班,22.30952380952381)
         */
        // val map: Map[String, List[Score]] = scoreList.groupBy((s: Score) => s.id)
        val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
    //    groupRDD.foreach(println)
    
        val resKvRDD: RDD[(String, Double)] = groupRDD.map((kv: (String, Iterable[(String, Int)])) => {
          val clazz: String = kv._1
          val avgAge: Double = kv._2.map(_._2).sum.toDouble / kv._2.size
    
          (clazz, avgAge)
        })
        resKvRDD.foreach(println)
    
    //    while (true){
    //
    //    }
      }
    }
    

    在这里插入图片描述

    6)groupByKey
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object Demo6GroupByKey {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
          .setMaster("local")
          .setAppName("groupByKey")
    
        val sc: SparkContext = new SparkContext(conf)
    
        //===================================================
        val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
        //求每个班级的平均年龄
        val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
    
    
        //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
        val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
          case Array(_, _, age: String, _, clazz: String) =>
            (clazz, age.toInt)
        }
    
        /**
         * GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上
         * 也就说,只有kv格式的RDD才能调用kv格式的算子
         * 输出:
         * (理科二班,22.556962025316455)
         * (文科三班,22.680851063829788)
         * (理科四班,22.63736263736264)
         * (理科一班,22.333333333333332)
         * (文科五班,22.30952380952381)
         */
        val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()
    
        val resKvRDD2: RDD[(String, Double)] = groupByKeyRDD.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum.toDouble / kv._2.size))
        resKvRDD2.foreach(println)
    
        /**
         * 面试题:spark core中 groupBy算子与groupByKey算子的区别?
         * 1、代码格式上:
         * groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD
         * groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式RDD
         *
         * 2、执行shuffle数据量来看
         *  groupBy产生的shuffle数据量在一定程度上要大于groupByKey产生的shuffle数据量
         *  所以groupByKey算子的执行效率要比groupBy算子的执行效率要高
         */
    
        while (true) {
    
        }
      }
    }
    

    在这里插入图片描述

    7)reduceByKey
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object Demo7ReduceByKey {
      def main(args: Array[String]): Unit = {
    
        val conf: SparkConf = new SparkConf()
          .setMaster("local")
          .setAppName("reduceByKey")
    
        val sc: SparkContext = new SparkContext(conf)
    
        //===================================================
        val linesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
        //求每个班级的平均年龄
        val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
        //分别使用groupByKey和reduceBykey计算每个学生的总分
        val idWithScoreRDD: RDD[(String, Int)] = arrayRDD.map {
          case Array(id: String, _, score: String) =>
            (id, score.toInt)
        }
    
        /**
         * groupByKey实现
         */
    //        val kvRDD1: RDD[(String, Iterable[Int])] = idWithScoreRDD.groupByKey()
    //        val resRDD1: RDD[(String, Int)] = kvRDD1.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum))
    //        resRDD1.foreach(println)
    
        /**
         * reduceByKey实现
         * 输出:
         * (1500100113,519)
         * (1500100724,440)
         * (1500100369,376)
         * (1500100378,402)
         * (1500100306,505)
         * (1500100578,397)
         */
        val resRDD2: RDD[(String, Int)] = idWithScoreRDD.reduceByKey((v1: Int, v2: Int) => v1 + v2)
        resRDD2.foreach(println)
    
    
        /**
         * 面试题:
         * groupByKey与reduceBykey的区别?
         * 相同点:
         * 它们都是kv格式的算子,只有kv格式的RDD才能调用
         * 不同点:
         * 1)groupByKey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
         * 2)reduceByKey 相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
         * 3)groupByKey的灵活度要比reduceByKey灵活度要高,reduceBykey无法做一些复杂的操作,比如方差。但是groupByKey可以在分组之后的RDD进行方差操作
         */
    
        while (true){
    
        }
      }
    }
    

    在这里插入图片描述

    8)union
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Demo8Union {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
          .setMaster("local")
          .setAppName("reduceByKey")
    
        val sc: SparkContext = new SparkContext(conf)
    
        //===================================================
        //parallelize:将scala的集合变成spark中的RDD
        val rdd1: RDD[(String, String)] = sc.parallelize(List(
          ("1001", "张三"),
          ("1002", "张三2"),
          ("1003", "张三3"),
          ("1004", "张三4"),
          ("1005", "张三5")
        ))
        println(s"rdd1的分区数:${rdd1.getNumPartitions}")
    
        val rdd2: RDD[(String, String)] = sc.parallelize(List(
          ("1006", "李四6"),
          ("1007", "李四7"),
          ("1003", "张三3"),
          ("1008", "李四8"),
          ("1009", "李四9")
        ))
        println(s"rdd2的分区数:${rdd2.getNumPartitions}")
    
        val rdd3: RDD[(String, Int)] = sc.parallelize(List(
          ("1006", 111),
          ("1007", 22),
          ("1003", 33),
          ("1008", 444),
          ("1009", 55)
        ))
    
        //两个RDD要想进行union合并,必须保证元素的格式和数据类型是一致的
        //分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定
        //    rdd1.union(rdd3)
        val resRDD1: RDD[(String, String)] = rdd1.union(rdd2)
        resRDD1.foreach(println)
        println(s"resRDD1的分区数:${resRDD1.getNumPartitions}")
    
        /**
         * 输出:
         * rdd1的分区数:1
         * rdd2的分区数:1
         * (1001,张三)
         * (1002,张三2)
         * (1003,张三3)
         * (1004,张三4)
         * (1005,张三5)
         * (1006,李四6)
         * (1007,李四7)
         * (1003,张三3)
         * (1008,李四8)
         * (1009,李四9)
         * resRDD1的分区数:2
         */
      }
    }
    

    9)join

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    /**
     * join算子也要作用在kv格式的RDD上
     */
    object Demo9Join {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
          .setMaster("local")
          .setAppName("reduceByKey")
    
        val sc: SparkContext = new SparkContext(conf)
    
        //===================================================
        //parallelize:将scala的集合变成spark中的RDD
        val rdd1: RDD[(String, String)] = sc.parallelize(List(
          ("1001", "张三"),
          ("1002", "李四"),
          ("1003", "王五"),
          ("1004", "小明"),
          ("1005", "小红")
        ))
    
        val rdd2: RDD[(String, String)] = sc.parallelize(List(
          ("1001", "看美女"),
          ("1002", "看综艺"),
          ("1003", "看八卦"),
          ("1004", "打游戏"),
          ("1009", "学习")
        ))
    
        /**
         * join 内连接
         * right join 右连接
         * left join 左连接
         * full join 全连接
         */
        // join 内连接 两个rdd共同拥有的键才会进行关联
        /**
         * (1001,张三,看美女)
         * (1002,李四,看综艺)
         * (1004,小明,打游戏)
         * (1003,王五,看八卦)
         */
            val resRDD1: RDD[(String, (String, String))] = rdd1.join(rdd2)
            val resRDD2: RDD[(String, String, String)] = resRDD1.map {
              case (id: String, (name: String, like: String)) =>
                (id, name, like)
            }
            resRDD2.foreach(println)
    
        //right join 右连接 保证右边rdd键的完整性
        /**
         * (1001,张三,看美女)
         * (1002,李四,看综艺)
         * (1004,小明,打游戏)
         * (1009,查无此人,学习)
         * (1003,王五,看八卦)
         */
            val resRDD2: RDD[(String, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
            val resRDD3: RDD[(String, String, String)] = resRDD2.map {
              case (id: String, (Some(name), like: String)) =>
                (id, name, like)
              case (id: String, (None, like: String)) =>
                (id, "查无此人", like)
            }
            resRDD3.foreach(println)
    
        //left join: 左连接
        /**
         * (1005,小红,此人无爱好)
         * (1001,张三,看美女)
         * (1002,李四,看综艺)
         * (1004,小明,打游戏)
         * (1003,王五,看八卦)
         */
        val resRDD1: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
        val resRDD2: RDD[(String, String, String)] = resRDD1.map {
          case (id: String, (name: String, Some(like: String))) =>
            (id, name, like)
          case (id: String, (name: String, None)) =>
            (id, name, "此人无爱好")
        }
        resRDD2.foreach(println)
    
    
        //全连接,保证所有的键、值的完整
        /**
         * (1005,小红,此人无爱好)
         * (1001,张三,看美女)
         * (1002,李四,看综艺)
         * (1004,小明,打游戏)
         * (1009,查无此人,学习)
         * (1003,王五,看八卦)
         */
        val resRDD2: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
        val resRDD3: RDD[(String, String, String)] = resRDD2.map {
          case (id: String, (Some(name), Some(like))) =>
            (id, name, like)
          case (id: String, (Some(name), None)) =>
            (id, name, "此人无爱好")
          case (id: String, (None, Some(like))) =>
            (id, "查无此人", like)
        }
        resRDD3.foreach(println)
    
    
      }
    }
    
  • 相关阅读:
    Ubuntu下无法输入中文问题解决
    Leetcode 116. Populating Next Right Pointers in Each Node (BFS 题)
    IDEA快捷方式
    ccd电池充电器坏了
    设计模式之状态模式
    css-inpu边框
    iOS开发之Undefined symbol:_OBJC_CLASS_$_****
    【C语言】顺序表(上卷)
    R 语言学习教程,从入门到精通,R 基础运算(5)
    文件字符流说明
  • 原文地址:https://blog.csdn.net/m0_58050808/article/details/140363447