• Spark概述及快速上手


    目录

    1. Spark概述

    Hadoop与spark的区别

    2. 创建Maven项目

    (1)创建Maven项目

    (2)增加scala

    (3)开发scala

    3. WordCount

       (1) 环境搭建

       (2) 编写代码


    1. Spark概述

    spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

    spark将计算结果放在了内存中

    Spark和Hadoop的根本差异是多个作业之间的数据通信问题: Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘

    Hadoop与spark的区别

    1. 原理比较

    Hadoop和Spark都是并行计算框架,两者都可以使用MR模型进行计算(spark有内置的独立的计算引擎)

    2. 数据的存储与处理

    Hadoop:是一个分布式系统的基础架构,它可以独立完成数据的存储(HDFS)和处理(MR)工作

    spark:是一个专门用来对那些分布式存储的大数据进行处理的工具,他没有提供文件管理系统,自身也不会对数据进行存储。它需要借助其他分布式文件存储系统才能够动作,最经典的就是Hadoop的HDFS

    3. 处理速度

    Hadoop:在磁盘级计算,在计算时需要读取磁盘中的数据,它只能用来处理离线(静态)数据

    spark:在内存中以接近“实时”的时间完成所有的数据分析,Spark的批处理速度比MR近快10倍,内存中的数据分析速度快近100倍

    4. 恢复性

    Hadoop:将每次处理后的数据写入到磁盘中,对应系统错误具有天生优势

    Spark:数据对象存储在分布式数据集(RDD)当中。RDD也提供了完整的灾难恢复功能

    5. 处理数据 Hadoop:一般用来处理离线(静态)数据,对于流式(实时)数据处理能力较差

    Spark:通常适合处理流式数据

    6. 中间结果

    Hadoop:将中间结果保存到HDFS中,每次MR都需要进行读取-调用

    Spark:将中间结果保存到内存中,如果内存不够再放入到磁盘中,不放入HDFS,避免大量的IO和读写操作

    2. 创建Maven项目

    (1)创建Maven项目

    创建模块

     

    (2)增加scala

    安装插件

    从官网中下载,官网地址 http://www.scala-lang.org/downloads 下载 Scala 二进制包(页面底部)

    下载之后解压,然后配置环境变量

    然后加到Path中: %SCALA_HOME%\bin

    回到idea,File-->Project Structure-->Global Libraries , 选择刚刚下载的scala解压路径,并应用到spark-core上

    我们还可以通过右击项目, 选择Add Frameworks Support, 然后找到Scala所在的位置, 引用就可以了

    (3)开发scala

    在java目录下创建一个包,再创建一个Scala Class

     

     选择object

    创建完成后,写入代码并运行,在控制台能成功打印出结果即说明环境搭建完成

    1. package com.bigdata.spark.core
    2. object Test {
    3. def main(args: Array[String]): Unit = {
    4. print("hello spark")
    5. }
    6. }

    3. WordCount

       (1) 环境搭建

       导入依赖

    1.        
    2.            org.apache.spark
    3.            spark-core_2.12
    4.            3.0.0
    5.        

       编写代码

    1. import org.apache.spark.{SparkConf, SparkContext}
    2. object WordCount {
    3. def main(args: Array[String]): Unit = {
    4. //application
    5. //spark框架
    6. //TODO 建立和spark框架的连接
    7. //JDBC : Connection (setMaster表述的是spark框架运行的环境,local环境,即本地环境,setappname给应用起名
    8. val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    9. val sc = new SparkContext(sparConf)
    10. //TODO 执行业务操作
    11. //TODO 关闭连接
    12. sc.stop()
    13. }
    14. }

       运行代码,在控制台打印出日志信息, 说明代码没有问题

       (2) 编写代码

       准备文件

       执行业务操作部分的代码

    1. //TODO 执行业务操作
    2.     //1. 读取文件,获取一行一行的数据  hello world
    3.     val lines : RDD[String] = sc.textFile("datas")
    4.     //2. 将一行数据进行拆分,形成一个一个的单词(分词)  "hello world" ==> hello, world, hello world
    5.     val words : RDD[String] = lines.flatMap(_.split(" "))
    6.     //3. 将数据根据单词进行分组,便于统计  (hello, hello, hello), (world, world)
    7.     val wordGroup : RDD[(String, Iterable[String])] = words.groupBy(word=>word)
    8.     //4. 对分组后的数据进行转换  (hello, hello, hello), (world, world) ==> (hello, 3),(world, 2)
    9.     val wordToCount = wordGroup.map{
    10.       case (word, list) =>
    11.         (word, list.size)
    12.     }
    13.     //5. 将转换结果采集到控制台打印
    14.     val array: Array[(String, Int)] = wordToCount.collect()
    15.     array.foreach(println)

       运行代码

    不同的实现

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.{SparkConf, SparkContext}
    3. object WordCount2 {
    4.   def main(args: Array[String]): Unit = {
    5.     //application
    6.     //spark框架
    7.     //TODO 建立和spark框架的连接
    8.     //JDBC : Connection (setMaster表述的是spark框架运行的环境,local环境,即本地环境,setappname给应用起名
    9.     val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    10.     val sc = new SparkContext(sparConf)
    11.     //TODO 执行业务操作
    12.     val lines : RDD[String] = sc.textFile("datas")
    13.     val words : RDD[String] = lines.flatMap(_.split(" "))
    14.     val wordToOne = words.map(
    15.       word => (word, 1)
    16.     )
    17.     val wordGroup : RDD[(String, Iterable[(String, Int)])] = wordToOne.groupBy(t => t._1)
    18.     val wordToCount = wordGroup.map{
    19.       case (_, list) =>
    20.         list.reduce(
    21.           (t1, t2) => {(t1._1, t1._2 + t2._2)}
    22.         )
    23.     }
    24.     val array: Array[(String, Int)] = wordToCount.collect()
    25.     array.foreach(println)
    26.     //TODO 关闭连接
    27.     sc.stop()
    28.   }
    29. }

    使用spark实现:
     

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.{SparkConf, SparkContext}
    3. object WordCount3 {
    4.   def main(args: Array[String]): Unit = {
    5.     //TODO 建立和spark框架的连接
    6.     val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    7.     val sc = new SparkContext(sparConf)
    8.     //TODO 执行业务操作
    9.     val lines : RDD[String] = sc.textFile("datas")
    10.     val words : RDD[String] = lines.flatMap(_.split(" "))
    11.     val wordToOne = words.map(
    12.       word => (word, 1)
    13.     )
    14.     //Spark框架提供了更多的功能, 可以将分组和聚合使用一个方法实现
    15.     //reduceByKey: 相同的key的数据, 可以对value进行reduce聚合
    16.     val wordToCount = wordToOne.reduceByKey(_ + _)
    17.     val array: Array[(String, Int)] = wordToCount.collect()
    18.     array.foreach(println)
    19.     //TODO 关闭连接
    20.     sc.stop()
    21.   }
    22. }

    在执行的过程中, 会产生大量的执行日志, 为了更好的查看程序的执行结果, 在项目的resources目录中创建log4j.properties文件, 添加日志配置信息:
     

    1. log4j.rootCategory=ERROR, console
    2. log4j.appender.console=org.apache.log4j.ConsoleAppender
    3. log4j.appender.console.target=System.err
    4. log4j.appender.console.layout=org.apache.log4j.PatternLayout
    5. log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    6. # Set the default spark-shell log level to ERROR. When running the spark-shell, the
    7. # log level for this class is used to overwrite the root logger's log level, so that
    8. # the user can have different defaults for the shell and regular Spark apps.
    9. log4j.logger.org.apache.spark.repl.Main=ERROR
    10. # Settings to quiet third party logs that are too verbose
    11. log4j.logger.org.spark_project.jetty=ERROR
    12. log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
    13. log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
    14. log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
    15. log4j.logger.org.apache.parquet=ERROR
    16. log4j.logger.parquet=ERROR
    17. # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
    18. log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
    19. log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

  • 相关阅读:
    Lombok的hashCode方法
    linux安装Ftp
    python LeetCode 刷题记录 94
    centos7基础操作
    [WPF] 使用 HandyControl 的 CirclePanel 画出表盘刻度
    自动化运维工具Ansible教程(二)【进阶篇】
    2022-9-7合并k个已排序的链表---困难
    全光谱台灯对孩子眼睛好吗有辐射吗?普通台灯和LED灯哪个辐射大
    金融口译,口译中金融高频词有哪些
    【Linux进程间通信】mmap共享存储映射
  • 原文地址:https://blog.csdn.net/Lilianach/article/details/127476932