• Spark - 介绍及使用 Scala、Java、Python 三种语言演示


    一、Spark

    在这里插入图片描述

    Apache Spark 是一个快速的,多用途的集群计算系统, 相对于 Hadoop MapReduce 将中间结果保存在磁盘中, Spark 使用了内存保存中间结果, 能在数据尚未写入硬盘时在内存中进行运算。

    Spark 只是一个计算框架, 不像 Hadoop 一样包含了分布式文件系统和完备的调度系统, 如果要使用 Spark, 需要搭载其它的文件系统。

    Hadoop 之父 Doug Cutting 指出:Use of MapReduce engine for Big Data projects will decline, replaced by Apache Spark (大数据项目的 MapReduce 引擎的使用将下降,由 Apache Spark 取代)。

    当然现在有了更为发展趋势,更好处理流式数据的 Flink ,但 Spark 在大数据处理领域仍有一席之地。

    1. Spark的优点:

    • 速度快Spark 在内存时的运行速度是 Hadoop MapReduce100倍,基于硬盘的运算速度大概是 Hadoop MapReduce10倍,并且Spark 实现了一种叫做 RDDsDAG 执行引擎, 其数据缓存在内存中可以进行迭代处理。

    • 易上手Spark 支持 Java、Scala、Python、R,、SQL 等多种语言的API,并且支持超过80个高级运算符使得用户非常轻易的构建并行计算程序,同时Spark 也可以使用基于 Scala, Python, R, SQLShell 交互式查询。

    • 通用性强Spark 提供一个完整的技术栈,,包括 SQL执行, Dataset命令式API, 机器学习库MLlib, 图计算框架GraphX, 流计算SparkStreaming等。

    • 兼容性好Spark 可以运行在 Hadoop Yarn、Apache Mesos、 Kubernets、 Spark Standalone等集群中,可以访问 HBase、 HDFS、Hive、 Cassandra 在内的多种数据库。

    2. Spark中的组件

    • Spark-Core:整个 Spark 的基础,,提供了分布式任务调度和基本的 I/O 功能,并且Spark 最核心的功能是 RDDsRDDs 就存在于这个包内。同时 RDDs 简化了编程复杂性,操作 RDDs 类似 Jdk8Streaming 操作本地数据集合。

    • Spark SQL:在 spark-core 基础之上带出了 DataSetDataFrame 的数据抽象化的概念,提供了在 DatasetDataFrame 之上执行 SQL 的能力,提供了 DSL, 可以通过 Scala, Java, Python 等语言操作 DataSetDataFrame,还支持使用 JDBC/ODBC 服务器操作 SQL 语言。

    • Spark Streaming:利用 spark-core 的快速调度能力来运行流分析,通过时间窗口截取小批量的数据并可以对之运行 RDD Transformation

    • MLlib:分布式机器学习的框架,可以使用许多常见的机器学习和统计算法,例如:支持向量机、 回归、 线性回归、 逻辑回归、 决策树、 朴素贝叶斯、汇总统计、相关性、分层抽样、 假设检定、随机数据生成等,简化大规模机器学习。

    • GraphX:分布式图计算框架, 提供了一组可以表达图计算的 API,还对这种抽象化提供了优化运行。

    3. Spark 和 Hadoop 对比

    对比项Sparkhadoop
    类型分布式计算工具基础平台, 包含计算, 存储, 调度
    延迟中间运算结果存在内存中,延迟小中间计算结果存在 HDFS 磁盘上,延迟大
    场景迭代计算, 交互式计算, 流计算大规模数据集上的批处理
    易用性RDD 组成 DAG 有向无环图, API 较为顶层, 方便使用Map+Reduce, API 较为底层, 算法适应性差
    硬件要求对内存有要求对机器要求低

    4. Spark 中的 RDD

    RDDSpark 的核心,在 RDD 之前 MapReduce 的处理过程如下:

    在这里插入图片描述
    多个任务之间通过磁盘来共享数据,RDD 出现后的处理过程:

    在这里插入图片描述
    整个过程共享内存,不需要将中间结果存放在磁盘中。

    5. RDD 的分区

    RDD 使用分区来分布式并行处理数据,做到尽量少的在不同的 Executor 之间使用网络交换数据,使用 RDD 读取数据的时候,会尽量的在物理上靠近数据源,比如在读取 Cassandra 或者 HDFS 中数据的时候,会尽量的保持 RDD 的分区和数据源的分区数对应。

    6. RDD 的 Shuffle

    分区的主要作用是用来实现并行计算,但是往往在进行数据处理的时候,例如 reduceByKey, groupByKey 等聚合操作时, 需要把 Key 相同的 Value 拉取到一起进行计算, 这个时候有可能这些 Key 相同的 Value 会坐落于不同的分区,因此需要进行 Shuffle 对数据处理。

    SparkShuffle 操作的特点:

    • 只有 Key-Value 型的 RDD 才会有 Shuffle 操作,但是有一个特例,就是 repartition 算子可以对任何数据类型 Shuffle

    • 早期版本 SparkShuffle 算法是 Hash base shuffle,后来改为 Sort base shuffle, 更适合大吞吐量的场景

    7. Spark 运行模式

    HadoopMapreduce 类似,Spark 也有本地模式,和线上集群模式,不过不同的是,Spark 有自己的调度集群 standalone,并且支持 Hadoopyarn,一般情况下本地开发使用 local 本地模式,生产环境可以使用 standalone-HA 或者 on yarn

    二、Spark WordCount 演示

    WordCount 是大数据中的 和 hello word ,前面在学习 Hadopp Mapreduce 时,使用 Mapreduce 的方式进行了实现,下面我们基于 Spark 分别从 Scala语言、Java语言、Python语言进行实现,下面是 Mapreduce 讲解时的实现文章:

    https://blog.csdn.net/qq_43692950/article/details/127195121

    下面我在本地 D:/test/input 下,创建了一个 txt 文件,内容如下:

    hello map reduce abc
    apple spark map
    
    reduce abc hello
    
    spark map
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    1. Scala 语言

    Spark 源码是使用 Scala 语言开发的,因此使用 Scala 开发是首选方案,如果对 Scala 语言还不是很了解的,可以看下下面的教程学习下:

    https://www.cainiaojc.com/scala/scala-tutorial.html

    下面创建一个 Maven 项目,在 pom 中加入 scalaspark 的依赖:

     
      <dependency>
          <groupId>org.scala-langgroupId>
          <artifactId>scala-libraryartifactId>
          <version>2.12.11version>
      dependency>
    
      
      <dependency>
          <groupId>org.apache.sparkgroupId>
          <artifactId>spark-core_2.12artifactId>
          <version>3.0.1version>
      dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    创建 object WordCountScala

    object WordCountScala {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
        //读取数据
        val textFile = sc.textFile("D:/test/wordcount/")
        //处理统计
        textFile.filter(StringUtils.isNotBlank) //过滤空内容
          .flatMap(_.split(" ")) //根据空格拆分
          .map((_, 1)) // 构建减值,value 固定 1
          .reduceByKey(_ + _) // 同一个 key 下面的 value 相加
          .foreach(s => println(s._1 + "  " + s._2))
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    直接运行查看结果:

    在这里插入图片描述

    2. Java 语言

    由于 JavaScala 都是运行在 JVM 之上的编程语言,这里可以直接在上面 Scala 的项目中创建 Java 类进行测试:

    创建 WordCountJava 测试类:

    public class WordCountJava {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setAppName("spark").setMaster("local[*]");
            JavaSparkContext sc = new JavaSparkContext(conf);
            sc.setLogLevel("WARN");
    
            //读取数据
            JavaRDD<String> textFile = sc.textFile("D:/test/wordcount/");
            //处理统计
            textFile.filter(StringUtils::isNoneBlank) //过滤空内容
                    .flatMap(s -> Arrays.asList(s.split(" ")).iterator())//根据空格拆分
                    .mapToPair(s -> new Tuple2<>(s, 1))// 构建减值,value 固定 1
                    .reduceByKey(Integer::sum) // 同一个 key 下面的 value 相加
                    .foreach(s-> System.out.println(s._1 + "  " + s._2));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    直接运行查看结果:
    在这里插入图片描述

    3. Python 语言

    使用 pyspark 前,先安装相关依赖:

    pip install pyspark
    pip install psutil
    pip install findspark
    
    • 1
    • 2
    • 3

    创建 WordCountPy 测试脚本:

    from pyspark import SparkConf, SparkContext
    import findspark
    
    if __name__ == '__main__':
        findspark.init()
        conf = SparkConf().setAppName('spark').setMaster('local[*]')
        sc = SparkContext(conf=conf)
        sc.setLogLevel("WARN")
    
        # 读取数据
        textFile = sc.textFile("D:/test/wordcount/")
        # 处理统计
        textFile.filter(lambda s: s and s != '') \
            .flatMap(lambda s: s.split(" ")) \
            .map(lambda s: (s, 1)) \
            .reduceByKey(lambda v1, v2: v1 + v2) \
            .foreach(lambda s: print(s[0] + "  " + str(s[1])))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    运行查看结果:

    在这里插入图片描述

  • 相关阅读:
    同步 -- 自旋锁
    一文掌握 Apache SkyWalking
    Vue3 - 实现动态获取菜单路由和按钮权限控制指令
    PostgreSQL 数据库使用 psql 导入 SQL
    小程序如何获取code
    Slurm作业管理系统常用命令和教程
    elasticsearch-head安装及详细配置
    oracle11g安装图解
    Linux环境下conda虚拟环境的迁移
    极智AI | 三谈昇腾 auto tune
  • 原文地址:https://blog.csdn.net/qq_43692950/article/details/128063998