• spark集群环境下,实现人口平均年龄计算


    任务目标

    虚拟机上部署spark集群,给定renkou.txt文件,输出平均年龄

    renkou.txt:
    在这里插入图片描述

    集群运作spark
    在这里插入图片描述

    spark web界面显示结果
    在这里插入图片描述

    0. 版本信息

    信息版本
    Scala2.11.8
    Java1.8
    spark2.2.0

    hadoop安装
    尚硅谷Hadoop

    spark
    spark集群搭建

    tip: 按照上述spark博客集群搭建时,node1是虚拟机的域名,记得换成自己虚拟机的域名。如果没有,填写真实ip地址即可

    maven坐标

            
            <dependency>
                <groupId>org.apache.sparkgroupId>
                <artifactId>spark-core_2.11artifactId>
                <version>2.2.0version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    1. 计算生成renkou.txt

    因为数据量很庞大, 1000万行,因此采用Java多线程的方式生成数据

    package com.xhf.java;
    
    import com.xhf.java.entity.Person;
    
    import java.io.BufferedWriter;
    import java.io.File;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 生成renkou.txt
     */
    public class RenkouGen {
        static Random random = new Random();
    
        static Object lock = new Object();
    
        public static void main(String[] args) throws Exception {
            // 创建文件
            File file = new File("E:\\B站视频创作\\Java计算人口平均_spark\\src\\main\\resources\\renkou.txt");
            // 判断file是否存在
            if (file.exists()) {
                file.delete();
            }else {
                file.createNewFile();
            }
            // 创建流管到
            BufferedWriter bw = new BufferedWriter(new FileWriter(file, false));
    
            // 创建线程池 1000万, 100万(每个线程)
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 10; i++) {
                // 生成数据
                executorService.execute(() -> {
                    // 100万
                    for (int j = 0; j < 1000000; j++) {
                        Person person = new Person(j, random.nextInt(20) + 40);
                        // 数据写入文件
                        try {
    //                        synchronized (lock) {
                                // 加锁
                                bw.write(person.toString());
    //                            bw.newLine();
    //                        }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            // 关闭线程池
            executorService.shutdown();
            executorService.awaitTermination(10000L, TimeUnit.SECONDS);
    
            bw.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    tip:

    • 生成完的文件需要上传到hadoop文件系统中,这样便于spark程序部署时获取文件信息
    • 如果不上传至hadoop中,在集群环境下运行时,可以通过main的args参数指定路径,又或者将文件存放在resouce目录下,打jar包后,代码通过resource资源目录进行定位

    2. 文件上传至spark

    如果遇到问题,请往下看 3.上传文件时,可能出现的常见错误
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    文件访问url: ‘hdfs://hadoop102:8020/spark/renkou.txt’

    3. 上传文件时,可能出现的常见错误

    在上传的过程中,可能会遇到各种报错,我这里整理好参考资料

    4. 编写spark文件

    package com.xhf.spark
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object RenkouCal2_Cluster {
      def main(args: Array[String]): Unit = {
        // 设置配置, master记得输入你要提交的主节点地址,而不是local. 如果是local, 我的版本下能够运行, 但任务无法在UI界面上显示
        val conf: SparkConf = new SparkConf().setAppName("renkou").setMaster("spark://hadoop102:7077")
        // 连接spark
        val sparkContext = new SparkContext(conf)
        val filePath: String = "hdfs://hadoop102:8020/spark/renkou.txt";
        // 读取文件 List
        val lines: RDD[String] = sparkContext.textFile(filePath)
        val begin: Long = System.currentTimeMillis()
        val value: RDD[Long] = lines.map(line => {
          // String : "1 27"
          line.split(" ")(1).toLong
        });
        // 计算 (把所有年龄累加 -> 除以个数 -> 平均年龄)
        val sum: Long = value.reduce((x, y) => x + y)
        println(sum / (1.0 * lines.count()))
        println(System.currentTimeMillis() - begin)
        // 终止spark
        sparkContext.stop()
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    如果本地能够运行,通过maven打成jar包,上传运行
    在这里插入图片描述

    5. 上传集群

    笔者采用xshell + xftp的方式进行jar包上传,上传至如下位置
    在这里插入图片描述

    6. 集群环境下提交任务

    找到spark安装位置,进入bin目录
    在这里插入图片描述
    执行如下指令

    ./spark-submit \
    --class com.xhf.spark.RenkouCal2 \
    --master spark://hadoop102:7077 \
    --executor-memory 1G \
    --total-executor-cores 2 \
    /export/servers/spark_demo/java_spark-1.0-SNAPSHOT.jar
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • –class 指定运行jar包具体的启动类,笔者运行的时RenkouCal2这个类
    • –master 指定master节点的地址
    • /export/servers/spark_demo/java_spark-1.0-SNAPSHOT.jar 指定jar包路径,这个由自己决定
      其它参数见名知意,不在过多赘述

    spark,启动!
    在这里插入图片描述

  • 相关阅读:
    JS数据结构与算法-队列结构
    一篇带你了解如何使用纯前端类Excel表格构建现金流量表
    单链表的建立(头插法、尾插法)(数据结构与算法)
    Java进阶篇之接口
    requests接口自动化总结
    WebAssembly上手:基础指南
    c++ 函数的参数是否可以为auto
    Spring Boot配置多个Kafka数据源
    Kafka -- 架构、分区、副本
    ElasticSearch 实现分词全文检索 - SpringBoot 完整实现 Demo 附源码【完结篇】
  • 原文地址:https://blog.csdn.net/qq_62835094/article/details/133766584