目录
spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
spark将计算结果放在了内存中
Spark和Hadoop的根本差异是多个作业之间的数据通信问题: Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘
1. 原理比较
Hadoop和Spark都是并行计算框架,两者都可以使用MR模型进行计算(spark有内置的独立的计算引擎)
2. 数据的存储与处理
Hadoop:是一个分布式系统的基础架构,它可以独立完成数据的存储(HDFS)和处理(MR)工作
spark:是一个专门用来对那些分布式存储的大数据进行处理的工具,他没有提供文件管理系统,自身也不会对数据进行存储。它需要借助其他分布式文件存储系统才能够动作,最经典的就是Hadoop的HDFS
3. 处理速度
Hadoop:在磁盘级计算,在计算时需要读取磁盘中的数据,它只能用来处理离线(静态)数据
spark:在内存中以接近“实时”的时间完成所有的数据分析,Spark的批处理速度比MR近快10倍,内存中的数据分析速度快近100倍
4. 恢复性
Hadoop:将每次处理后的数据写入到磁盘中,对应系统错误具有天生优势
Spark:数据对象存储在分布式数据集(RDD)当中。RDD也提供了完整的灾难恢复功能
5. 处理数据 Hadoop:一般用来处理离线(静态)数据,对于流式(实时)数据处理能力较差
Spark:通常适合处理流式数据
6. 中间结果
Hadoop:将中间结果保存到HDFS中,每次MR都需要进行读取-调用
Spark:将中间结果保存到内存中,如果内存不够再放入到磁盘中,不放入HDFS,避免大量的IO和读写操作
创建模块
安装插件
从官网中下载,官网地址 http://www.scala-lang.org/downloads 下载 Scala 二进制包(页面底部)
下载之后解压,然后配置环境变量
然后加到Path中: %SCALA_HOME%\bin
回到idea,File-->Project Structure-->Global Libraries , 选择刚刚下载的scala解压路径,并应用到spark-core上
我们还可以通过右击项目, 选择Add Frameworks Support, 然后找到Scala所在的位置, 引用就可以了
在java目录下创建一个包,再创建一个Scala Class
选择object
创建完成后,写入代码并运行,在控制台能成功打印出结果即说明环境搭建完成
- package com.bigdata.spark.core
-
- object Test {
-
- def main(args: Array[String]): Unit = {
- print("hello spark")
- }
- }
导入依赖
-
-
org.apache.spark -
spark-core_2.12 -
3.0.0 -
编写代码
- import org.apache.spark.{SparkConf, SparkContext}
-
- object WordCount {
- def main(args: Array[String]): Unit = {
- //application
- //spark框架
- //TODO 建立和spark框架的连接
- //JDBC : Connection (setMaster表述的是spark框架运行的环境,local环境,即本地环境,setappname给应用起名
- val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
- val sc = new SparkContext(sparConf)
-
- //TODO 执行业务操作
- //TODO 关闭连接
- sc.stop()
- }
- }
运行代码,在控制台打印出日志信息, 说明代码没有问题
准备文件
执行业务操作部分的代码
- //TODO 执行业务操作
- //1. 读取文件,获取一行一行的数据 hello world
- val lines : RDD[String] = sc.textFile("datas")
- //2. 将一行数据进行拆分,形成一个一个的单词(分词) "hello world" ==> hello, world, hello world
- val words : RDD[String] = lines.flatMap(_.split(" "))
- //3. 将数据根据单词进行分组,便于统计 (hello, hello, hello), (world, world)
- val wordGroup : RDD[(String, Iterable[String])] = words.groupBy(word=>word)
- //4. 对分组后的数据进行转换 (hello, hello, hello), (world, world) ==> (hello, 3),(world, 2)
- val wordToCount = wordGroup.map{
- case (word, list) =>
- (word, list.size)
- }
- //5. 将转换结果采集到控制台打印
- val array: Array[(String, Int)] = wordToCount.collect()
- array.foreach(println)
运行代码
不同的实现
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object WordCount2 {
- def main(args: Array[String]): Unit = {
- //application
- //spark框架
- //TODO 建立和spark框架的连接
- //JDBC : Connection (setMaster表述的是spark框架运行的环境,local环境,即本地环境,setappname给应用起名
- val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
- val sc = new SparkContext(sparConf)
-
- //TODO 执行业务操作
- val lines : RDD[String] = sc.textFile("datas")
- val words : RDD[String] = lines.flatMap(_.split(" "))
- val wordToOne = words.map(
- word => (word, 1)
- )
- val wordGroup : RDD[(String, Iterable[(String, Int)])] = wordToOne.groupBy(t => t._1)
- val wordToCount = wordGroup.map{
- case (_, list) =>
- list.reduce(
- (t1, t2) => {(t1._1, t1._2 + t2._2)}
- )
- }
- val array: Array[(String, Int)] = wordToCount.collect()
- array.foreach(println)
- //TODO 关闭连接
- sc.stop()
- }
- }
使用spark实现:
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object WordCount3 {
- def main(args: Array[String]): Unit = {
- //TODO 建立和spark框架的连接
- val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
- val sc = new SparkContext(sparConf)
-
- //TODO 执行业务操作
- val lines : RDD[String] = sc.textFile("datas")
- val words : RDD[String] = lines.flatMap(_.split(" "))
- val wordToOne = words.map(
- word => (word, 1)
- )
- //Spark框架提供了更多的功能, 可以将分组和聚合使用一个方法实现
- //reduceByKey: 相同的key的数据, 可以对value进行reduce聚合
- val wordToCount = wordToOne.reduceByKey(_ + _)
- val array: Array[(String, Int)] = wordToCount.collect()
- array.foreach(println)
- //TODO 关闭连接
- sc.stop()
- }
- }
在执行的过程中, 会产生大量的执行日志, 为了更好的查看程序的执行结果, 在项目的resources目录中创建log4j.properties文件, 添加日志配置信息:
- log4j.rootCategory=ERROR, console
- log4j.appender.console=org.apache.log4j.ConsoleAppender
- log4j.appender.console.target=System.err
- log4j.appender.console.layout=org.apache.log4j.PatternLayout
- log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
-
- # Set the default spark-shell log level to ERROR. When running the spark-shell, the
- # log level for this class is used to overwrite the root logger's log level, so that
- # the user can have different defaults for the shell and regular Spark apps.
- log4j.logger.org.apache.spark.repl.Main=ERROR
-
- # Settings to quiet third party logs that are too verbose
- log4j.logger.org.spark_project.jetty=ERROR
- log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
- log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
- log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
- log4j.logger.org.apache.parquet=ERROR
- log4j.logger.parquet=ERROR
-
- # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
- log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
- log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR