• 【大数据】Spark使用大全:下载安装、RDD操作、JAVA编程、SQL


    目录

    前言

    1.下载安装

    2.RDD操作

    3.JAVA编程示例

    4.Spark SQL


    前言

    本文是作者大数据系列中的一文,专栏地址:

    https://blog.csdn.net/joker_zjn/category_12631789.html?spm=1001.2014.3001.5482

    该系列会成体系的聊一聊整个大数据的技术栈,绝对干货,欢迎订阅。

    1.下载安装

    前置环境:

    • Hadoop 3.1.3
    • Java JDK 1.8

    下载地址:

    Downloads | Apache Spark

    往下拉找到Spark release archives.

    由于前面我们已经搭建好了hadoop环境,所以这里选择with out hadoop的版本。

    配置config目录下有一个配置模板spark-env.sh.template:

    将这个模板修改或者复制为spark-env.sh然后在里面:

    export SPARK_DIST_CLASSPATH=${Hadoop的安装路径/bin classpath}

    因为Spark只是个计算引擎,具体要去操作对应的分部署文件系统的,所以将Spark的类路径指向了hadoop。也就是通过这个配置将Spark要操作的数据源设置为了HDFS。

    启动:

    bin目录下:

    ./run-exmaple SparkPi

    这是一个Spark自带的demo,如果跑起来不报错,说明就没什么问题了。

    2.RDD操作

    可以用Spark自带的Spark  shell来进行RDD操作:

    ./bin/spark-shell

    RDD操作分为两类:

    • 转换,就是只是返回中间数据集的操作。
    • 动作,就是有具体单个返回值的操作。

    map - 应用于RDD的每个元素,产生一个新的RDD。

    val numbersRdd = spark.sparkContext.parallelize(Array(1, 2, 3, 4))
    val squaredRdd = numbersRdd.map(x => x * x) 

    filter - 根据函数条件过滤RDD中的元素。

    val evenNumbersRdd = numbersRdd.filter(_ % 2 == 0)

    flatMap - 对RDD中的每个元素应用函数并展平结果。

    val wordsRdd = spark.sparkContext.textFile("hdfs://path/to/textfile.txt")
    val wordsFlatMapped = wordsRdd.flatMap(line => line.split(" "))

    mapPartitions - 对每个分区应用一个函数。

    val incrementedRdd = numbersRdd.mapPartitions(iter => iter.map(x => x + 1))

    union - 合并两个RDD。

    val rdd1 = spark.sparkContext.parallelize(Array(1, 2))
    val rdd2 = spark.sparkContext.parallelize(Array(3, 4))
    val combinedRdd = rdd1.union(rdd2)

    distinct - 返回RDD中不重复的元素。

    val uniqueNumbers = numbersRdd.distinct()

    join - 对两个键值对RDD进行内连接。

    val rddA = spark.sparkContext.parallelize(Array((1, "a"), (2, "b")))
    val rddB = spark.sparkContext.parallelize(Array((1, "x"), (3, "y")))
    val joinedRdd = rddA.join(rddB)

    reduce - 通过函数聚合RDD中的所有元素。

    val sum = numbersRdd.reduce(_ + _)

    collect - 返回RDD的所有元素到Driver作为数组。

    val allElements = numbersRdd.collect()

    count - 返回RDD中元素的数量。

    val count = numbersRdd.count()

    first - 返回RDD的第一个元素。

    val firstElement = numbersRdd.first()

    take(n) - 返回RDD的前n个元素。

    val topThree = numbersRdd.take(3)

    saveAsTextFile - 将RDD的内容保存为文本文件。

    wordsRdd.saveAsTextFile("hdfs://path/to/output")

    foreach - 对RDD的每个元素应用函数,常用于副作用操作。

    numbersRdd.foreach(println)

    3.JAVA编程示例

    依赖:

    1. <dependencies>
    2. <dependency> <!-- Spark dependency -->
    3. <groupId>org.apache.spark</groupId>
    4. <artifactId>spark-core_2.11</artifactId>
    5. <version>2.4.0</version>
    6. </dependency>
    7. </dependencies>

     编码:

    1. import org.apache.spark.SparkConf;
    2. import org.apache.spark.api.java.JavaRDD;
    3. import org.apache.spark.api.java.JavaSparkContext;
    4. import scala.Tuple2;
    5. import java.util.Arrays;
    6. import java.util.List;
    7. import java.util.stream.Collectors;
    8. public class WordCountFromHDFS {
    9. public static void main(String[] args) {
    10. if (args.length != 1) {
    11. System.err.println("Usage: WordCountFromHDFS ");
    12. System.exit(1);
    13. }
    14. // 初始化Spark配置
    15. SparkConf conf = new SparkConf().setAppName("WordCountFromHDFS").setMaster("local"); // 本地模式运行,根据实际情况可改为yarn等
    16. // 创建SparkContext实例
    17. JavaSparkContext sc = new JavaSparkContext(conf);
    18. // HDFS文件路径,这里直接从命令行参数获取
    19. String inputPath = args[0];
    20. // 从HDFS读取文件内容
    21. JavaRDD lines = sc.textFile(inputPath);
    22. // 每行分割成单词,然后扁平化,最后统计每个单词出现的次数
    23. JavaRDD words = lines.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator());
    24. JavaPairRDD wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
    25. .reduceByKey((a, b) -> a + b);
    26. // 收集结果并打印
    27. List> results = wordCounts.collect();
    28. for (Tuple2 result : results) {
    29. System.out.println(result._1() + ": " + result._2());
    30. }
    31. // 停止SparkContext
    32. sc.stop();
    33. }
    34. }

    4.Spark SQL

    park SQL是Spark的一个组件,它从Spark 1.3.0版本开始被引入,并在后续版本中不断得到增强和发展。Spark SQL允许用户使用SQL或者DataFrame API来处理结构化和半结构化的数据。下面做个小小的演示。

    假设我们有一个CSV文件位于HDFS上,我们可以用以下命令加载它:

       val df = spark.read
         .option("header", "true")
         .csv("hdfs://localhost:9000/path/to/yourfile.csv")

    创建临时视图:

       df.createOrReplaceTempView("my_table")

    执行sql:

    val result = spark.sql("SELECT column_name FROM my_table WHERE condition")

    joinResult.show()

    连表查询:

    // 假设dfOrders和dfCustomers分别是orders和customers的DataFrame
    dfOrders.createOrReplaceTempView("orders")
    dfCustomers.createOrReplaceTempView("customers")

    val joinResult = spark.sql(
      """
        SELECT orders.order_id, customers.customer_name
        FROM orders
        INNER JOIN customers
        ON orders.customer_id = customers.customer_id
      """
    )

    joinResult.show()

    当然Spark SQL也有对应的JAVA API,支持编程的方式来操作,用到的时候查一下就是,此处就不展开了。

  • 相关阅读:
    随机产生一个1-100之间的整数,看能几次猜中
    AI实战营第二期 第五节 《目标检测与MMDetection》——笔记6
    Tomcat 线程模型&性能调优
    (附源码)计算机毕业设计Java巴音学院学生资料管理系统
    【SQL语法基础】什么是事务处理,如何使用COMMIT和ROLLBACK进行操 作?
    Vue3中全局配置 axios 的两种方式
    Mysql-----Innodb引擎行锁变为表锁
    16.Hystrix 实例(springcloud)
    【云原生监控系列第二篇】Prometheus进阶——PromQL数据类型
    考研:研究生考试(五天学完)之《线性代数与空间解析几何》研究生学霸重点知识点总结之第一课行列式
  • 原文地址:https://blog.csdn.net/Joker_ZJN/article/details/139648646