• 关于SparkRdd和SparkSql的几个指标统计,scala语言,打包上传到spark集群,yarn模式运行


    需求:

    ❖ 要求:分别用SparkRDD, SparkSQL两种编程方式完成下列数据分析,结合webUI监控比较性能优劣并给出结果的合理化解释.
    1、分别统计用户,性别,职业的个数:
    2、查看统计年龄分布情况(按照年龄分段为7段)
    3、查看统计职业分布情况(按照职业统计人数)
    4、统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数:
    5、统计评分分布情况
    6、统计不同用户的评分次数。
    7、统计不同类型的电影分布情况
    8、统计每年的电影发布情况。
    9、统计每部电影有多少用户评价,总评分情况,平均分情况
    10、统计每个用户评价次数,评价总分以及平均分情况
    11、求被评分次数最多的 10 部电影,并给出评分次数(电影名,评分次数)
    12、分别求男性,女性当中评分最高的 10 部电影(性别,电影名,影评分)
    13、分别求男性,女性看过最多的 10 部电影(性别,电影名)
    14、年龄段在“18-24”的男人,最喜欢看 10 部电影
    15、求 movieid = 2116 这部电影各年龄段(年龄段为7段)的平均影评(年龄段,影评分)
    16、求最喜欢看电影(影评次数最多)的那位女性评最高分的 10 部电影的平均影评分(观影者,电影名,影评分)
    17、求好片(评分>=4.0)最多的那个年份的最好看的 10 部电影
    18、求1997年上映的电影中,评分最高的10部喜剧类电影
    19、该影评库中各种类型电影中评价最高的 5 部电影(类型,电影名,平均影评分)
    20、各年评分最高的电影类型(年份,类型,影评分)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    构建maven工程

    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
    
        <groupId>com.dataAnalysisgroupId>
        <artifactId>SparkRddAndSparkSQLartifactId>
        <version>1.0-SNAPSHOTversion>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.sparkgroupId>
                <artifactId>spark-core_2.12artifactId>
                <version>3.0.0version>
               <scope>providedscope>
            dependency>
    
            <dependency>
                <groupId>org.apache.sparkgroupId>
                <artifactId>spark-sql_2.12artifactId>
                <version>3.0.0version>
               <scope>providedscope>
    
            dependency>
    
            <dependency>
                <groupId>org.apache.sparkgroupId>
                <artifactId>spark-hive_2.12artifactId>
                <version>3.0.0version>
               <scope>providedscope>
            dependency>
    
            <dependency>
                <groupId>mysqlgroupId>
                <artifactId>mysql-connector-javaartifactId>
                <version>5.1.27version>
               <scope>providedscope>
            dependency>
    
        dependencies>
    
        <build>
            <finalName>MovieDataAnalysisBySparkRDDfinalName>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.mavengroupId>
                    <artifactId>scala-maven-pluginartifactId>
                    <version>3.4.6version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compilegoal>
                                <goal>testCompilegoal>
                            goals>
                        execution>
                    executions>
                plugin>
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-assembly-pluginartifactId>
                    <version>3.0.0version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>RunmainClass>
                            manifest>
                        archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependenciesdescriptorRef>
                        descriptorRefs>
                    configuration>
                    <executions>
                        <execution>
                            <id>make-assemblyid>
                            <phase>packagephase>
                            <goals>
                                <goal>singlegoal>
                            goals>
                        execution>
                    executions>
                plugin>
            plugins>
        build>
    
    
    
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    hive-site.xml

    
    
    <configuration>
    
        
        <property>
            <name>javax.jdo.option.ConnectionURLname>
            <value>jdbc:mysql://hadoop102:3306/metastore?useSSL=falsevalue>
    	property>
        
        <property>
            <name>javax.jdo.option.ConnectionDriverNamename>
            <value>com.mysql.jdbc.Drivervalue>
    	property>
    
    	
        <property>
            <name>javax.jdo.option.ConnectionUserNamename>
            <value>rootvalue>
        property>
    
        
        <property>
            <name>javax.jdo.option.ConnectionPasswordname>
            <value>123456value>
        property>
        
        <property>
            <name>hive.metastore.warehouse.dirname>
            <value>/user/hive/warehousevalue>
        property>
        
        
        <property>
            <name>hive.server2.thrift.portname>
            <value>10000value>
        property>
       
        <property>
            <name>hive.server2.thrift.bind.hostname>
            <value>hadoop102value>
    	property>
    
        
    
        
        <property>
            <name>hive.metastore.event.db.notification.api.authname>
            <value>falsevalue>
    	property>
    	
    		<property>
    			<name>hive.metastore.schema.verificationname>
    			<value>falsevalue>
    	property>
    
    	
    	<property>
    		<name>hive.server2.active.passive.ha.enablename>
    		<value>truevalue>
    	property>
    	
    	
    	<property>
    		<name>hive.server2.support.dynamic.service.discoveryname>
    		<value>truevalue>
    	property>
    	<property>
    		<name>hive.server2.zookeeper.namespacename>
    		<value>hiveserver2_zkvalue>
    	property>
    	<property>
    		<name>hive.zookeeper.quorumname>
    		<value> hadoop102:2181,hadoop103:2181,hadoop104:2181value>
    	property>
    	<property>
    		<name>hive.zookeeper.client.portname>
    		<value>2181value>
    	property>
    	<property>
    		<name>hive.server2.thrift.bind.hostname>
    		<value>hadoop102value>
    	property>
    	
    	<property>
    		<name>hive.metastore.urisname>
    		<value>thrift://hadoop102:9083,thrift://hadoop104:9083value>
    	property>
    
    	
    	
    	<property>
    		<name>spark.yarn.jarsname>
    		<value>hdfs://yang-HA/spark-jars/*value>
    	property>
    	  
    	
    	<property>
    		<name>hive.execution.enginename>
    		<value>sparkvalue>
    	property>
    
    	
    	<property>
    		<name>hive.spark.client.connect.timeoutname>
    		<value>10000msvalue>
    	property>
    	
    configuration>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110

    run.scala

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Dataset, SparkSession}
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    import scala.collection.mutable.ListBuffer
    
    /**
     * @description:
     * @author: 宇文智
     * @date 2022/5/18 10:22
     * @version 1.0
     */
    object MovieDataAnalysisBySparkRDD {
    
      //使用spark Rdd分别统计用户,性别,职业的个数
      def demand_1_by_sparkRdd(spark:SparkSession): Unit = {
    
        val sc = spark.sparkContext
    
        //分别统计用户,性别,职业的个数:    用户身份::性别::年龄阶段::职业::邮政编码
        //local[*]:默认模式。自动帮你按照CPU最多核来设置线程数。比如CPU有8核,Spark帮你自动设置8个线程计算。
        val users: RDD[String] = sc.textFile("hdfs://yang-HA/movie/users.dat")
    
        println("--------查看 users ADD 血缘依赖关系-----------")
        println(users.toDebugString)
        //users会重复使用,将数据缓存
        //users.cache()
    
        //使用persist方法更改存储级别cache()是rdd.persist(StorageLevel.MEMORY_ONLY)的简写
        users.persist(StorageLevel.MEMORY_AND_DISK_2)
    
        //设置检查点 如果checkpoint之后的出问题 ,避免数据从头开始计算,而且减少开销
        //会立即启动一个新的job来专门的做checkpoint运算,
        // 所以建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job
        // 只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
        sc.setCheckpointDir("hdfs://yang-HA/spark_checkpoint")
    
        val quotas: ListBuffer[MovieQuota] = ListBuffer()
    
        sc.setJobGroup("008", "使用rdd,计算用户总数")
        val cnt: Long = users.filter(!_.isEmpty).count() //count()为action算子,生成一个job => job_00
        sc.setJobGroup("008", "使用rdd,计算用户总数")
        println("用户总数: " + cnt)
        quotas.append(MovieQuota("001", "用户总数", cnt.toString))
    
    
        println("---------使用系统累加器,计算用户总数---------")
        sc.setJobGroup("007", "使用系统累加器,计算用户总数")
        val sum: LongAccumulator = sc.longAccumulator("sum")
        users.foreach(l=>
          sum.add(1)
        )
        sc.setJobGroup("007", "使用系统累加器,计算用户总数")
        println("用户总数:" + sum.value)
    
        sc.setJobGroup("2", "使用rdd算子,计算男女人数")
        val gender: RDD[(String, Int)] = users.map(line => {
          val lineArr: Array[String] = line.split("::")
          (lineArr(1), 1)
        }).reduceByKey(_ + _) //reduceByKey 自带缓存
    
        val genderCount: Array[String] = gender.map(x => { //foreach为action算子,生成一个job => job2
          if (x._1.equals("M")) {
            "男性人数:" + x._2
          } else {
            "女性人数:" + x._2
          }
        }).collect() //job_02
        sc.setJobGroup("2", "使用rdd算子,计算男女人数")
        quotas.append(MovieQuota("002", "男女统计", genderCount.mkString(",")))
    
        sc.setJobGroup("3", "使用自定义累加器,计算男女人数")
        val accumulator = new MyAccumulator()
        sc.register(accumulator)
        users.map(data => {
          val arr: Array[String] = data.split("::")
          accumulator.add(arr(1))
        }).collect() //job_03
        sc.setJobGroup("3", "使用自定义累加器,计算男女人数")
        println(accumulator.value)
    
        println("---------使用自定义累加器,计算进行职业统计---------")
    
        sc.setJobGroup("4", "使用自定义累加器,计算进行职业统计")
        val professionAcc = new MyAccumulator()
        sc.register(professionAcc)
        users.map(line => {
          professionAcc.add(line.split("::")(3))
        }).collect() //job_04
        sc.setJobGroup("4", "使用自定义累加器,计算进行职业统计")
        println(professionAcc.value)
    
    
        sc.setJobGroup("b", "checkpoint 容错机制开启一个job ,重新计算数据并存储在hdfs")
        val professionCount: RDD[(String, Int)] = users.map(line => {
          val arr: Array[String] = line.split("::")
          (arr(3), 1)
        }).reduceByKey(_ + _)
        professionCount.cache() //只缓存在内存中
        professionCount.checkpoint() //job_05
        sc.setJobGroup("b", "checkpoint 容错机制开启一个job ,重新计算数据并存储在hdfs")
    
        sc.setJobGroup("a", "使用rdd算子,计算进行职业统计")
        val profession: RDD[String] = sc.textFile("hdfs://yang-HA/movie/profession.dat")
        val professionRelation: RDD[(String, String)] = profession.map(line => {
          val arr: Array[String] = line.split(":")
          (arr(0), arr(1))
        })
        val quotas1: Array[MovieQuota] = professionCount.join(professionRelation).map(line => {
          MovieQuota("003", "职业统计", line._2._2.trim + ": " + line._2._1)
        }).collect() //job_06
        sc.setJobGroup("a", "使用rdd算子,计算进行职业统计")
    
        quotas.appendAll(quotas1)
    
        loadDataToHiveLocation(quotas,spark)
    
      }
    
      //使用sparkSQL分别统计用户,性别,职业的个数
      def demand_1_by_sparkSql(spark:SparkSession): Unit = {
    
        spark.sparkContext.setJobGroup("sparksql", "sparksql")
    
        val ds: Dataset[String] = spark.read.textFile("hdfs://yang-HA/movie/users.dat")
    
        import spark.implicits._
    
        val userDS: Dataset[user] = ds.map(line => {
          val lineArr: Array[String] = line.split("::")
          user(lineArr(0), lineArr(1), lineArr(2), lineArr(3), lineArr(4))
        })
        val professionDS: Dataset[profession] = spark.read.textFile("hdfs://yang-HA/movie/profession.dat").map(line => {
          val lineArr: Array[String] = line.split(":")
          profession(lineArr(0), lineArr(1).trim)
        })
    
        userDS.groupBy("professionId").count
          .join(professionDS, List("professionId"), "left")
          .orderBy("professionId")
          .createOrReplaceTempView("tmp1")
        userDS.createOrReplaceTempView("user")
    
        spark.sql(
          """
            |set hive.exec.dynamic.partition.mode=nonstrict
            |""".stripMargin)
    
        spark.sql(
          """
            |insert into table spark_data_analysis_quota.movie_quota partition(dt)
            |select '004','sparkSql职业统计',concat_ws(':',trim(professionName),count), current_date() dt from tmp1 ;
            |""".stripMargin)
    
      }
    
      //统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数:
      def demand_2_by_sparkRdd(spark:SparkSession): Unit = {
        var sc = spark.sparkContext
        //统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数:
        //每个用户的评分次数: 评分总次数 / 评分总人数(需去重)
        //最高评分=每部影片评分中取最大值,最低评分同理
        val rating: RDD[String] = sc.textFile("hdfs://yang-HA/movie/ratings.dat")
        val quotas = new ListBuffer[MovieQuota]
    
    
        /*var rank = 0
        var beforeVal = -1.0
    
    
        rating.map(line => {
          val arr: Array[String] = line.split("::")
          (arr(1), arr(2).toDouble)
        }).groupByKey().map {
          case (k, v) => {
            val sum1: Double = v.sum
            (k, sum1 / v.size)
          }
        }.collect().sortWith((kv1, kv2) => {
          kv1._2 > kv2._2
        }).map(kv=>{
          if(kv._2 != beforeVal){
            beforeVal = kv._2
            rank+=1
          }
          (kv,rank)
        }).filter(_._2==1).foreach(println)*/
    
        import spark.implicits._
        val ratingTuples: RDD[(String, String, String, String)] = rating.map(line => {
          val arr: Array[String] = line.split("::")
          (arr(0), arr(1), arr(2), arr(3))
        })
        ratingTuples.cache()
        //userId, movieID, rating, timestamp
        val ratingCnt: Long = ratingTuples.count()
        val NumberPeoples: Long = ratingTuples.map(_._1).distinct(8).count()
        val movieCnt: Long = ratingTuples.map(_._2).distinct(8).count()
    
        quotas.append(MovieQuota("005","平均每个用户的评分次数,平均每部影片被评分次数",(ratingCnt/NumberPeoples+","+ratingCnt/movieCnt)))
    
        ratingTuples.map(line => (line._2, line._3.toDouble)).groupByKey().map(kv => {
          //统计最高评分,最低评分,平均评分
          var median = 0
          if (kv._2.size % 2 == 1) {
            //奇数
            median = (kv._2.size + 1) / 2
          } else {
            //偶数
            median = kv._2.size / 2
          }
          // println(median)
          val medianVal: Double = kv._2.toList.sortWith((v1, v2) => {
            v1 > v2
          }).apply(median - 1)
    
          val avgVal: Double = kv._2.sum / kv._2.size
    
          MovieQuota("006", "最高评分,最低评分,平均评分,中位评分", (kv._1, kv._2.max, kv._2.min, f"$avgVal%.3f", medianVal).toString())
        }).toDS.createOrReplaceTempView("tmp2")
    
        spark.sql(
          """
            |insert into table spark_data_analysis_quota.movie_quota partition(dt)
            |select  *,current_date() dt from tmp2
            |""".stripMargin)
    
        loadDataToHiveLocation(quotas,spark)
    
      }
    
      //加载数据到hive表
      def loadDataToHiveLocation(quotas: Seq[MovieQuota],spark:SparkSession): Unit ={
    
        import spark.implicits._
        val sc: SparkContext = spark.sparkContext
    
        quotas.toDS.createOrReplaceTempView("quotas")
    
        spark.sql(
          """
            |msck repair table spark_data_analysis_quota.movie_quota;
            |""".stripMargin)
    
        spark.sql(
          s"""
            |insert into table spark_data_analysis_quota.movie_quota partition(dt)
            |select *, current_date() dt from quotas
            |""".stripMargin)
    
        sc.setJobGroup("c", "保存文件到hive表的location")
      }
    
    
    
    }
    
    object Run {
      def main(args: Array[String]): Unit = {
    
        // 设置访问HDFS集群的用户名
        System.setProperty("HADOOP_USER_NAME", "atguigu")
        System.setProperty("file.encoding", "UTF-8")
    
        // 1 创建上下文环境配置对象
        val conf: SparkConf = new SparkConf()
          .setAppName("movie_data_analysis")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //替换默认序列化机制
          .registerKryoClasses(Array(classOf[MovieQuota])) //注册使用kryo序列化的自定义类
          .setMaster("yarn")
    
        // 2 创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    
        spark.sql(
          """
            |set hive.exec.dynamic.partition.mode=nonstrict
            |""".stripMargin)
    
        //使用spark Rdd分别统计用户,性别,职业的个数
        MovieDataAnalysisBySparkRDD.demand_1_by_sparkRdd(spark)
        //使用sparkSQL分别统计用户,性别,职业的个数
        MovieDataAnalysisBySparkRDD.demand_1_by_sparkSql(spark)
        //使用spark Rdd统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数:
        MovieDataAnalysisBySparkRDD.demand_2_by_sparkRdd(spark)
    
        spark.close()
      }
    }
    
    case class MovieQuota(var quota_id: String, var quota_name: String, var quota_value: String) {
      override def toString: String = {
        quota_id + '\t' + quota_name + '\t' + quota_value
      }
    }
    
    
    case class profession(professionId: String, professionName: String)
    
    case class rating(userId: String, movieID: String, rating: String, timestamp: String)
    
    case class user(userId: String, gender: String, ageGrades: String, professionId: String, postalCode: String)
    
    case class movie(movieID: String, title: String, genres: String)
    
    
    //根据输入字段,统计字段总数
    class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
    
      private val genderCountMap: mutable.Map[String, Long] = mutable.Map[String, Long]()
    
      override def isZero: Boolean = genderCountMap.isEmpty
    
      override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
        new MyAccumulator
      }
    
      override def reset(): Unit = genderCountMap.clear
    
      override def add(v: String): Unit = {
    
        if (v.equals("M")) {
          genderCountMap("男性") = genderCountMap.getOrElse("男性", 0L) + 1L
        } else if (v.equals("F")) {
          genderCountMap("女性") = genderCountMap.getOrElse("女性", 0L) + 1L
        } else {
          genderCountMap(v) = genderCountMap.getOrElse(v, 0L) + 1L
        }
      }
    
      override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
        other.value.foreach { case (key, value) => {
          genderCountMap(key) = genderCountMap.getOrElse(key, 0L) + value
        }
        }
      }
    
      override def value: mutable.Map[String, Long] = this.genderCountMap
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343

    目标表ddl

    CREATE  TABLE `spark_data_analysis_quota.movie_quota`(
         `quota_id` string COMMENT '指标id',
         `quota_name` string COMMENT '指标名',
         `quota_value` string COMMENT '指标值')
        COMMENT '电影指标分析表'
        PARTITIONED BY (
            `dt` string)
        clustered by (quota_id) into 3 buckets
        stored as orc
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    使用maven install 打jar 包,放到spark 集群上。启动大数据各集群组件,执行 run_spark_job_byJar.sh

    $SPARK_HOME/bin/spark-submit \
    --class Run \
    --master yarn \
    --deploy-mode cluster \
    --queue spark \
    --conf spark.executor.extraJavaOptions="-Dfile.encoding=UTF-8" \
    --conf spark.driver.extraJavaOptions="-Dfile.encoding=UTF-8" \
    MovieDataAnalysisBySparkRDD.jar \
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    查看http://hadoop104:8088/cluster yarn历史服务器
    在这里插入图片描述
    在这里插入图片描述

    点击history,跳转到[spark 历史服务器(在hadoop102上启动sbin/start-history-server.sh)]http://hadoop102:4000

    查看 spark 作业日志
    在这里插入图片描述
    在这里插入图片描述

    附:

    集群启停脚本

    cat hadoopHA.sh 
    #!/bin/bash
    if [ $# -lt 1 ]
    then
        echo "No Args Input..."
        exit ;
    fi
    
    start_cluster(){
            echo " =================== 启动 hadoop集群 ==================="
            echo " --------------- 启动 hdfs ---------------"
            ssh hadoop102 "/opt/module/hadoopHA/sbin/start-dfs.sh"
            echo " --------------- 启动 yarn ---------------"
            ssh hadoop103 "/opt/module/hadoopHA/sbin/start-yarn.sh"
            echo " --------------- 启动 historyserver ---------------"
            ssh hadoop102 "/opt/module/hadoopHA/bin/mapred --daemon start historyserver"
    	echo "---------启动spark日志服务器----------"
    	ssh hadoop102 "/opt/module/spark/sbin/start-history-server.sh "
    	echo "-----启动hiveservice------"
    	ssh hadoop102 "/home/atguigu/bin/hiveservices.sh start"
    }
    
    
    stop_cluster(){
            echo " =================== 关闭 hadoop集群 ==================="
    	echo "----------关闭hiveservice-------------"
    	ssh hadoop102 "/home/atguigu/bin/hiveservices.sh stop"
            echo " --------------- 关闭 historyserver ---------------"
            ssh hadoop102 "/opt/module/hadoopHA/bin/mapred --daemon stop historyserver"
            echo " --------------- 关闭 yarn ---------------"
            ssh hadoop103 "/opt/module/hadoopHA/sbin/stop-yarn.sh"
            echo " --------------- 关闭 hdfs ---------------"
            ssh hadoop102 "/opt/module/hadoopHA/sbin/stop-dfs.sh"
    	echo "---------停止spark日志服务器----------"
    	ssh hadoop102 "/opt/module/spark/sbin/stop-history-server.sh "
    }
    
    case $1 in
    "start")
    	echo "--------启动zookeeper----------"
    	sh /home/atguigu/bin/dataCollectSystem/zk.sh start
    	echo "-------启动大数据高可用集群-------"
    	start_cluster	
    ;;
    "stop")
    	stop_cluster
    	echo "----------关闭zookeeper------------"
    	sh /home/atguigu/bin/dataCollectSystem/zk.sh stop
    ;;
    "restart")
    	echo "---------重启集群---------"
    	stop_cluster
    	start_cluster
    ;;
    "status")
    	echo " =================hadoopHA集群 各个节点状态==========="
    	echo " ==========hadoop102,nn1========="
    	n1_port=`ssh hadoop102 "jps | grep -v Jps | grep NameNode"` 
    	nn1=`hdfs haadmin -getServiceState nn1`
    	echo ${n1_port}" "${nn1}
    	
    	echo " ==========hadoop103,nn2,rm1========="
    	n2_port=`ssh hadoop103 "jps | grep -v Jps | grep NameNode"`
    	nn2=`hdfs haadmin -getServiceState nn2`
    	echo ${n2_port}" "${nn2}
    	rm1_port=`ssh hadoop103 "jps | grep -v Jps | grep ResourceManager"`
    	rm1=`yarn rmadmin -getServiceState rm1`
    	echo ${rm1_port}" "${rm1}
    	
    	echo " ==========hadoop104,rm2========="
    	rm2_port=`ssh hadoop104 "jps | grep -v Jps | grep ResourceManager"`
    	rm2=`yarn rmadmin -getServiceState rm2`
    	echo ${rm2_port}" "${rm2}
    ;;
    *)
        echo "Input Args Error..."
    ;;
    esac
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    cat hiveservices.sh 
    
    #!/bin/bash
    HIVE_LOG_DIR=$HIVE_HOME/logs
    if [ ! -d $HIVE_LOG_DIR ]
    then
    	mkdir -p $HIVE_LOG_DIR
    fi
    #检查进程是否运行正常,参数1为进程名,参数2为进程端口
    function check_process()
    {
        pid=$(ps -ef 2>/dev/null | grep -v grep | grep -i $1 | awk '{print $2}')
        ppid=$(netstat -nltp 2>/dev/null | grep $2 | awk '{print $7}' | cut -d '/' -f 1)
        echo $pid
        [[ "$pid" =~ "$ppid" ]] && [ "$ppid" ] && return 0 || return 1
    }
    
    function hive_start()
    {
        metapid=$(check_process HiveMetastore 9083)
        cmd="nohup hive --service metastore >$HIVE_LOG_DIR/metastore.log 2>&1 &"
        cmd=$cmd" sleep 4; hdfs dfsadmin -safemode wait >/dev/null 2>&1"
        [ -z "$metapid" ] && eval $cmd || echo "Metastroe服务已启动"
        server2pid=$(check_process HiveServer2 10000)
        cmd="nohup hive --service hiveserver2 >$HIVE_LOG_DIR/hiveServer2.log 2>&1 &"
        [ -z "$server2pid" ] && eval $cmd || echo "HiveServer2服务已启动"
    }
    
    function hive_stop()
    {
        metapid=$(check_process HiveMetastore 9083)
        [ "$metapid" ] && kill $metapid || echo "Metastore服务未启动"
        server2pid=$(check_process HiveServer2 10000)
        [ "$server2pid" ] && kill $server2pid || echo "HiveServer2服务未启动"
    }
    
    case $1 in
    "start")
        hive_start
        ;;
    "stop")
        hive_stop
        ;;
    "restart")
        hive_stop
        sleep 2
        hive_start
        ;;
    "status")
        check_process HiveMetastore 9083 >/dev/null && echo "Metastore服务运行正常" || echo "Metastore服务运行异常"
        check_process HiveServer2 10000 >/dev/null && echo "HiveServer2服务运行正常" || echo "HiveServer2服务运行异常"
        ;;
    *)
        echo Invalid Args!
        echo 'Usage: '$(basename $0)' start|stop|restart|status'
        ;;
    esac
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
  • 相关阅读:
    idea mybatis-plus之MybatisX插件小知识(代码生成 哦)
    (项目)ZHUZHU新闻
    html笔记
    PostgreSQL JIT(Just-In-Time Compilation)With LLVM 的实现原理
    node_fs模块常用API
    iPhone辐射超标,发布三年突然禁售了
    【SQLServer语句按月记录总数量】
    无胁科技-TVD每日漏洞情报-2022-9-19
    那些你必须要知道的程序员接单平台
    Java 注解
  • 原文地址:https://blog.csdn.net/m0_38109926/article/details/133878444