Spark
Hadoop
Spark
由上面的信息可以获知,Spark出现的时间相对较晚,并且主要功能主要是用于数据计算,所以其实Spark一直被认为是Hadoop框架的升级版,但其实不是
NameNode是单点的,容易出现单点故障,制约了集群的发展
NameNode是单点的,受到了硬件的制约,制约了集群的发展
MapReduce运行速度太慢,主要因为设计理念的问题,MR早期就是用于单一数据计算,在当前数据挖掘和数据迭代计算情景中不适用
单一数据计算:MR主要应用于数据的一次性计算,从存储介质中读取数据,经过处理后,再存储回介质中
迭代计算:计算得出的结果传递给下一次计算,也就是上一次的输出是下一次的输入
为什么慢:如果MR想进行迭代计算,它的每一次计算结果都需要落盘,是基于文件运算的
MR框架和Hadoop耦合性非常强,无法分离
增加了一个资源调度框架,将计算和资源解耦合,Yarn,也将Hadoop 2.x 称为Yarn版本
解决了
NameNode单点问题,使其变为了高可用的
将资源调度和计算解耦合,耦合性低意味着扩展性强
RM,NM(MRAppMaster),NM(MRTask)
可以替换MR计算框架,但MR自己的计算速度并没有提升
Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
Spark是基于MR框架的,但是优化了其中的计算过程,使用内存代替了计算结果的传输
Spark是基于Scala语言开发的,函数式编程语言,更适合迭代计算和数据挖掘计算
Spark中计算模型十分丰富
MR中的计算模型只有两个:Mapper和Reducer,这时对于一些复杂需求,一个job完成不了,就需要作业的调度
Scala中的计算模型:map,filter,groupBy,sortBy,在一个job中就可以完成
Yarn中的RM,NM(MRAppMaster),NM(MRTask)在Spark中分别称为
Master,Worker(Driver),Worker(Executor)
实际情况下,Spark和Yarn是共同使用的,Spark(计算) On Yarn(资源),主要使用到以下几个组件
RM,NM(container(Driver)),NM(container(Executor))
将NM理解为当前使用的电脑,container理解为VMWare,Driver理解为虚拟机内部的操作系统,通过container解耦合
经过上面的比较,可以看出在绝大多数的数据计算场景中,Spark确实会比MapReduce更有优势。但是Spark是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致Job执行失败,此时,MapReduce其实是一个更好的选择,所以Spark并不能完全替代MR
Spark Core
Spark Core中提供了Spark最基础与最核心的功能,Spark其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib都是在Spark Core的基础上进行扩展的
Spark SQL
Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据
Spark Streaming
Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API
Spark MLlib
MLlib是Spark提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语
Spark GraphX
GraphX是Spark面向图计算提供的框架与算法库
创建一个Maven项目WordCount,包名为com.hike.spark
Spark是一个计算框架,需要通过导入依赖找到它
2.12为scala版本
3.0.0为spark版本
<dependencies>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_2.12artifactId>
<version>3.0.0version>
dependency>
dependencies>
//创建SparkConf并设置App名称
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
//创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//关闭连接
sc.stop()
//读取指定位置文件
val lines = sc.textFile("data/word.txt")
//将文件中的数据进行分词
val words = lines.flatMap(_.split(" "))
//将分词后的数据进行分组
val wordGroup = words.gropuBy(word => word)
//将分词后的数据进行统计分析
val wordCount = wordGroup.mapValues(_.size)
//7.将统计结果采集到控制台打印
wordCount.collect().foreach(println)
统计分析部分并没有体现出数据运算的过程,修改统计分析部分代码
// 读取指定位置文件
val lines = sc.textFile("data/word.txt")
// 将文件中的数据进行分词
// 将word改变形式:word => (word,1)
val words = lines.flatMap(_.split(" "))
val wordToOne = words.map((_,1))
// 将分词后的数据进行分组
// word => list((word,1),(word,1) => List((word,2)))
val wordGroup = wordToOne.gropuBy(word => word)
// 将分词后的数据进行统计分析
val wordCount = wordGroup.mapValues(
list => {
list.reduce(
(t1,t2) => {
(t1._1,t1._2 + t2._2)
}
)._2
}
)
// 将统计结果采集到控制台打印
wordCount.collect().foreach(println)
使用Spark框架内的功能,修改分组,统计分析部分代码
// 读取指定位置文件
val lines = sc.textFile("data/word.txt")
// 将文件中的数据进行分词
// 将word改变形式:word => (word,1)
val words = lines.flatMap(_.split(" "))
val wordToOne = words.map((_,1))
// reduceByKey:按照key分组,对象同key的V进行reduce
// (word,1)(word,1)(word,1)(word,1)(word,1)
// reduce(1,1,1,1,1)
val wordCount = wordToOne.reduceByKey(_+_)
// 将统计结果采集到控制台打印
wordCount.collect().foreach(println)
框架的核心就是封装
执行过程中,会产生大量的执行日志,如果为了能够更好的查看程序的执行结果,可以在项目的resources目录中创建log4j.properties文件,并添加日志配置信息:
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR