• SparkSql读取外部Hql文件的公共类开发


    SparkSql读取外部Hql文件的公共类开发

    Spark SQL 与 Hive 的区别简介

    一、什么是 Spark SQL? (官方定义)

    Spark SQL
    • A Spark module for structured data processing(known set of fields for each record - schema) ;
    1. Spark SQL是Spark中专门用来处理结构化数据(每一行数据都遵循Schema信息 —— 建表时表的字段及其类型)的一个模块;
    • Provides DataFrames/Dataset as an abstraction for distributed data processing ;
    1. 提供了 DataFrame/Dataset 的对分布式数据处理的基本抽象;
    • Acts as a distributed SQL engine ;
    1. 其实之上是一个分布式的 SQL 引擎。

    二、什么是 Hive? (官方定义)

    Hive
    • The Apache Hive data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storage using SQL.
    1. 数据仓库,能使用 SQL 读取、写入和管理存在于分布式存储架构上的大数据集;
    • Structure can be projected onto data already in storage.
    1. 结构可以映射到已经存储的数据上;
    • A command line tool and JDBC driver are provided to connect users to Hive.
    1. 用户连接 Hive 可以使用命令行工具和 JDBC 驱动。

    三、两者的区别

    都支持ThriftServer服务,为JDBC提供解决方案,区别如下:

    Spark SQL

    => 是Spark的一个库文件;

    => Spark SQL 元数据可有可无;

    => Spark SQL 中 schema 是自动推断的;

    => 支持标准 SQL 语句,也支持 HQL 语句等(可以用普通话、方言来对比理解);

    => 从开发角度来讲,即支持SQL方式开发,也支持HQL开发,还支持函数式编程(DSL)实现SQL语句。

    Hive

    => 是一个框架;

    => Hive中必须有元数据,一般由 MySql 管理,必须开启 metastore 服务;

    => Hive 中在建表时必须明确使用 DDL 声明 schema;

    => 只支持 HQL 语句。

    Hive:处理海量数据,比如一个月、一个季度、一年的数据量,依然可以处理,虽然很慢;

    Spark SQL:这种情况下 Spark SQL 不支持,无法处理;所以在企业中,Hive 和 Spark SQL 能够共存,互为弥补。


    Spark on Hive 环境配置

    搭建准备

    ① 准备 HadoopHive 环境

    Hadoop 介绍及集群搭建参考

    Hive 搭建参考

    ② 准备 Spark on Yarn 环境

    环境搭建-Spark on YARN

    ③ 启动 Hivemetastore 服务

    # 进入 Hive 安装目录
    cd /opt/server/hive-2.1.0
    nohup bin/hive --service metastore &
    
    • 1
    • 2
    • 3

    配置修改

    修改 hive-site.xml 配置文件:在 3 台 Spark 服务器上都操作

    # 进入 Spark 安装目录
    cd /opt/server/spark/conf
    
    # 增加 hive-site.xml 配置文件
    vim hive-site.xml
    # 增加以下配置信息
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    
    
    <configuration>
      
        <property>
          <name>hive.metastore.warehouse.dirname>
          <value>/user/hive/warehousevalue>
        property>
      
        <property>
          <name>hive.metastore.localname>
          <value>falsevalue>
        property>
      
        <property>
          <name>hive.metastore.urisname>
          <value>thrift://node1:9083value>
        property>
      
    configuration>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    启动及测试

    ① 启动

    # 第一种: Local 方式启动 Spark
    cd /opt/server/spark
    bin/spark-sql --master local[2] --conf spark.sql.shuffle.partitions=2
    
    # 第二种:Standalone 方式启动 Spark
    cd /opt/server/spark
    bin/spark-sql --master spark://node1:7077 --executor-memory 512m --total-executor-cores 1
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ② 测试

    show databases;
    show tables;
    
    • 1
    • 2

    Hive SQL 的交互方式

    Distributed SQL Engine - Spark 2.4.5 Documentation (apache.org)

    开启 Spark 的 ThriftServer

    Spark Thrift Server 将 Spark Applicaiton 当做一个服务运行,提供 Beeline 客户端和JDBC方式访问,与 Hive 中 HiveServer2 服务一样的

    ① 启动 ThriftServer 服务

    # 进入 Spark 目录
    cd /opt/server/spark
    
    # 启动服务
    sbin/start-thriftserver.sh \
    --hiveconf hive.server2.thrift.port=10001 \
    --hiveconf hive.server2.thrift.bind.host=node1 \
    --master local[2]
    
    # 停止使用
    sbin/stop-thriftserver.sh
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    ② 查看 WEB-UI 界面

    http://node1:4040/jobs/

    ③ 使用 SparkSQLbeeline 客户端命令行连接 ThriftServer

    # 进入 Spark 目录
    cd /opt/server/spark
    # 启动 beeline
    bin/beeline
    # 输入连接信息
    !connect jdbc:hive2://node1:10001
    # 依次输入用户名和密码
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    spark-sql执行hivesql

    spark提交命令有spark-shell、spark-submit、spark-sql。我们执行hive命令一般都是

    hive -e 'select * from xx'
    或者 
    hive -f /home/hadoop/xx.hql -d dt=2018-01-01
    
    • 1
    • 2
    • 3

    但是hive底层使用mr执行速度实在不忍直视,安装hive on spark又太麻烦了,怎么办呢?其实,spark也有基于hive执行sql脚本的提交任务方式,就是spark-sql

    spark-sql --master yarn-client -e 'select * from xx'
    spark-sql --master yarn-client  dt=2018-01-01 -f '/home/hadoop/xx.hql'
    
    • 1
    • 2

    不过spark对机器内存性能要求很高,容易执行失败,如果spark-sql执行失败,出现内存溢出的情况,还是使用hive比较稳定。这里spark-sql能查询到hive表是怎么配置的呢?只需要把hive-sit.xml复制到spark安装目录的conf目录下即可。
    spark-sql缺点:执行语句insert overwrite table xx…在结果目录会有大量小文件,容易内存溢出执行失败

    spark提交任务的三种的方法

    Spark Job的方式主要有三种:

    **1、**使用spark 自带的spark-submit工具提交任务

    通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这种方式提交的,提交命令示例如下:
    ./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 …/spark-demo.jar
    参数含义就不解释了,请参考官网资料。

    2、通过JAVA API编程的方式提交有两种方式

    提交方式是已JAVA API编程的方式提交,这种方式不需要使用命令行,直接可以在IDEA中点击Run 运行包含Job的Main类就行,Spark 提供了以SparkLanuncher 作为唯一入口的API来实现。这种方式很方便(试想如果某个任务需要重复执行,但是又不会写linux 脚本怎么搞?我想到的是以JAV API的方式提交Job, 还可以和Spring整合,让应用在tomcat中运行),官网的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html

    spark-launcher_2.11-2.3.4.jar 下载地址:https://mvnrepository.com/artifact/org.apache.spark/spark-launcher_2.11/2.3.4

    根据官网的示例,通过JAVA API编程的方式提交有两种方式:

    **2.1、方式一:**new SparkLauncher().startApplicaiton(监听器) 返回一个SparkAppHandler,并(可选)传入一个监听器

    调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:

    package com.xxx.utils;
     
    /**
     * @author yyz
     * @class LanuncherAppV
     * @date 2021/04/22 15:27
     * 第一种是调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,
     * 所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:
     * 注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。
     **/
     
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.spark.launcher.SparkAppHandle;
    import org.apache.spark.launcher.SparkLauncher;
     
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.concurrent.CountDownLatch;
     
    public class LanuncherAppV {
        private static Log log = LogFactory.getLog(LanuncherAppV.class);
        public static void main(String[] args) throws IOException, InterruptedException {
     
            HashMap env = new HashMap();
            //这两个属性必须设置
            env.put("HADOOP_CONF_DIR", "/opt/soft/client/hadoop/xxx/etc/hadoop");
            env.put("JAVA_HOME", "/opt/soft/jdk");
            //可以不设置
            //env.put("YARN_CONF_DIR","");
            log.info("init spark env complete");
            CountDownLatch countDownLatch = new CountDownLatch(10);
            //这里调用setJavaHome()方法后,JAVA_HOME is not set 错误依然存在
            SparkAppHandle handler = new SparkLauncher(env)
                    .setSparkHome("/opt/soft/client/spark_install_home")
                    .setAppResource("/opt/soft/client/spark/xjprc-hadoop-spark2.3/spark_install_home/examples/jars/spark-examples_xxxxx.jar")
                    .setMainClass("org.apache.spark.examples.SparkPi")
                    .setMaster("local")
                    .setAppName("LanuncherAppV_yyz")
    //                    .setMaster("yarn")
    //                    .setDeployMode("cluster")
    //                    .setConf("spark.app.id", "")
    //                    .setConf("spark.driver.memory", "2g")
    //                    .setConf("spark.akka.frameSize", "")
    //                    .setConf("spark.executor.memory", "1g")
    //                    .setConf("spark.executor.instances", "")
    //                    .setConf("spark.executor.cores", "")
    //                    .setConf("spark.default.parallelism", "")
    //                    .setConf("spark.driver.allowMultipleContexts", "true")
                    .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                        //这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false
                        @Override
                        public void stateChanged(SparkAppHandle sparkAppHandle) {
                            if (sparkAppHandle.getState().isFinal()) {
                                countDownLatch.countDown();
                            }
                            System.out.println("state:" + sparkAppHandle.getState().toString());
                            System.out.println("AppId    " + sparkAppHandle.getAppId());
                        }
     
                        @Override
                        public void infoChanged(SparkAppHandle sparkAppHandle) {
                            System.out.println("Info:" + sparkAppHandle.getState().toString());
                            System.out.println("AppId    " + sparkAppHandle.getAppId());
                        }
                    });
            log.info("start spark SparkLauncher ……");
     
            System.out.println("The task is executing, please wait ....");
            //线程等待任务结束
            countDownLatch.await();
            System.out.println("The task is finished!");
     
            log.info("finish spark SparkLauncher task");
     
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。

    调用命令如下:

    [work@hadoop01 testSparkLanuncher]$ java -Djava.ext.dirs=/home/work/xxx/project/testSparkLanuncher -cp TestJavaSpark-1.0-SNAPSHOT-jar-with-dependencies.jar:spark-launcher_2.11-2.3.4.jar com.xxx.utils.LanunchAppV
    或者
    [work@hadoop01 testSparkLanuncher]$ java -classpath /home/work/xxx/project/testSparkLanuncher/TestJavaSpark-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/work/xxx/project/testSparkLanuncher/spark-launcher_2.11-2.3.4.jar com.xxx.utils.LanunchAppV
     
    
    • 1
    • 2
    • 3
    • 4

    2.2、方式二:new SparkLauncher().launch() 直接启动一个Process

    通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:

    package com.xxx.utils;
     
    /**
     * @author yyz
     * @class LauncherApp
     * @date 2021/04/23 10:30
     * 通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,需要自定义InputStreamReaderRunnable类实现
     * 好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:
     **/
     
    import org.apache.spark.launcher.SparkLauncher;
     
    import java.io.IOException;
    import java.util.HashMap;
     
    public class LauncherApp {
     
        public static void main(String[] args) throws IOException, InterruptedException {
     
            HashMap env = new HashMap();
            //这两个属性必须设置
              env.put("HADOOP_CONF_DIR", "/opt/soft/client/hadoop/xxx/etc/hadoop");
            env.put("JAVA_HOME", "/opt/soft/jdk");
            //env.put("YARN_CONF_DIR","");
     
            SparkLauncher handle = new SparkLauncher(env)
                    .setSparkHome("/opt/soft/client/spark_install_home")
                    .setAppResource("/opt/soft/client/spark/xjprc-hadoop-spark2.3/spark_install_home/examples/jars/spark-examples_xxxxx.jar")
                    .setMainClass("org.apache.spark.examples.SparkPi")
                    .setMaster("local")
                    .setAppName("LauncherApp_yyz")
    //                .setMaster("yarn")
    //                .setDeployMode("cluster")
    //                .setConf("spark.app.id", "")
    //                .setConf("spark.driver.memory", "2g")
    //                .setConf("spark.akka.frameSize", "")
    //                .setConf("spark.executor.memory", "1g")
    //                .setConf("spark.executor.instances", "")
    //                .setConf("spark.executor.cores", "")
    //                .setConf("spark.default.parallelism", "")
    //                .setConf("spark.driver.allowMultipleContexts","true")
                    .setVerbose(true);
     
            Process process = handle.launch();
            InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
            Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
            inputThread.start();
     
            InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
            Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
            errorThread.start();
     
            System.out.println("Waiting for finish...");
            int exitCode = process.waitFor();
            System.out.println("Finished! Exit code:" + exitCode);
     
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    使用的自定义InputStreamReaderRunnable类实现如下:

    package com.xxx.utils;
     
    /**
     * @author yyz
     * @class InputStreamReaderRunnable
     * @date 2021/04/23 10:31
     * 使用的自定义InputStreamReaderRunnable类实现如下:
     **/
     
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
     
    public class InputStreamReaderRunnable implements Runnable {
        private BufferedReader reader;
        private String name;
     
        public InputStreamReaderRunnable(InputStream is, String name) {
            this.reader = new BufferedReader(new InputStreamReader(is));
            this.name = name;
        }
     
        public void run() {
     
            System.out.println("InputStream " + name + ":");
            try {
                String line = reader.readLine();
                while (line != null) {
                    System.out.println(line);
                    line = reader.readLine();
                }
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    调度方式:

    [work@hadoop01 testSparkLanuncher]$ java -Djava.ext.dirs=/home/work/xxx/project/testSparkLanuncher -cp TestJavaSpark-1.0-SNAPSHOT.jar:spark-launcher_2.11-2.3.4.jar com.xxx.utils.LauncherApp 
     
    或者
     
    [work@hadoop01 testSparkLanuncher]$ java -classpath /home/work/xxx/project/testSparkLanuncher/TestJavaSpark-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/work/xxx/project/testSparkLanuncher/spark-launcher_2.11-2.3.4.jar com.xxx.utils.LauncherApp
     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.3、总结

    老版本

    老版本任务提交是基于启动本地进程,执行脚本spark-submit xxx ** 的方式做的。其中一个关键的问题就是获得提交Spark任务的Application-id,因为这个id是跟任务状态的跟踪有关系的。如果你的资源管理框架用的是yarn,应该知道每个运行的任务都有一个applicaiton_id,这个id的生成规则是:

    appplication_时间戳_数字
    
    • 1

    老版本的spark通过修改SparkConf参数spark.app.id就可以手动指定id,新版本的代码是直接读取的taskBackend中的applicationId()方法,这个方法具体的实现是根据实现类来定的。在yarn中,是通过Yarn的YarnClusterSchedulerBackend实现的。

    感兴趣的同学可以看一下,生成applicaiton_id的逻辑在hadoop-yarn工程的ContainerId中定义。

    总结一句话就是,想要自定义id,甭想了!!!!

    于是当时脑袋瓜不灵光的我,就想到那就等应用创建好了之后,直接写到数据库里面呗。怎么写呢?

    • 我事先生成一个自定义的id,当做参数传递到spark应用里面;
    • 等spark初始化后,就可以通过sparkContext取得对应的application_id以及url
    • 然后再driver连接数据库,插入一条关联关系

    新版本

    还是归结于互联网时代的信息大爆炸,我看到群友的聊天,知道了SparkLauncer这个东西,调查后发现他可以基于Java代码自动提交Spark任务。SparkLauncher支持两种模式:

    • new SparkLauncher().launch() 直接启动一个Process,效果跟以前一样
    • new SparkLauncher().startApplicaiton(监听器) 返回一个SparkAppHandler,并(可选)传入一个监听器

    当然是更倾向于第二种啦,因为好处很多:

    • 自带输出重定向(Output,Error都有,支持写到文件里面),超级爽的功能
    • 可以自定义监听器,当信息或者状态变更时,都能进行操作(对我没啥用)
    • 返回的SparkAppHandler支持 暂停、停止、断连、获得AppId、获得State等多种功能,我就想要这个!!!!

    2.4、代码示例:

    package com.xxx.utils;
     
    /**
     * @author yyz
     * @class Person
     * @date 2021/04/23 17:52
     **/
    public class Person{
        private String name;
        private int age;
        private  String sex;
     
        public Person(String name, int age, String sex) {
            this.name = name;
            this.age = age;
            this.sex = sex;
        }
     
        public String getName() {
            return name;
        }
     
        public void setName(String name) {
            this.name = name;
        }
     
        public int getAge() {
            return age;
        }
     
        public void setAge(int age) {
            this.age = age;
        }
     
        public String getSex() {
            return sex;
        }
     
        public void setSex(String sex) {
            this.sex = sex;
        }
     
    }
     
     
     
    package com.xxx.utils;
     
    /**
     * @author yyz
     * @class HelloWorld
     * @date 2021/04/23 17:07
     **/
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.SparkSession;
    import java.util.ArrayList;
    import java.util.List;
     
    public class HelloWorld {
        private static Log log = LogFactory.getLog(HelloWorld.class);
     
        public static void main(String[] args) throws InterruptedException {
     
            SparkSession spark = SparkSession.builder().master("local[2]")
                    .appName("HelloWorld_from_yyz")
                    .config("spark.sql.warehouse.dir", "/tmp")
                    .enableHiveSupport()
                    .getOrCreate();
     
     
            List<Person> persons = new ArrayList<>();
     
            persons.add(new Person("zhangsan", 22, "male"));
            persons.add(new Person("lisi", 25, "male"));
            persons.add(new Person("wangwu", 23, "female"));
     
     
            Dataset ds= spark.createDataFrame(persons, Person.class);
            ds.show(false);
            log.info("数据总条数为:"+ds.count());
     
            spark.close();
     
        }
    }
     
     
    package com.xxx.utils;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.spark.launcher.SparkAppHandle;
    import org.apache.spark.launcher.SparkLauncher;
     
    import java.io.IOException;
    /**
     * @author yyz
     * @class Launcher
     * @date 2021/04/23 17:13
     **/
    public class Launcher {
        private static Log log = LogFactory.getLog(Launcher.class);
     
        public static void main(String[] args) throws IOException {
            SparkAppHandle handler = new SparkLauncher()
                    .setAppName("hello-world")
    //              .setSparkHome(args[0])
                    .setSparkHome("/opt/soft/client/spark_install_home")
                    .setMaster(args[0])
    //              .setDeployMode("client")
                    .setConf("spark.yarn.job.owners",args[1])
                    .setConf("spark.driver.memory", "2g")
                    .setConf("spark.executor.memory", "1g")
                    .setConf("spark.executor.cores", "3")
                    .setAppResource("/home/work/xxx/project/testSparkLanuncher/TestJavaSpark-1.0-SNAPSHOT.jar")
                    //此处应写类的全限定名
                    .setMainClass("com.xxx.utils.HelloWorld")
                    .addAppArgs("I come from Launcher")
                    .startApplication(new SparkAppHandle.Listener(){
                        @Override
                        public void stateChanged(SparkAppHandle handle) {
                            System.out.println(handle.getAppId()+": **********  state  changed  **********: "+handle.getState().toString());
                            log.info(handle.getAppId()+": **********  state  changed  **********: "+handle.getState().toString());
                        }
     
                        @Override
                        public void infoChanged(SparkAppHandle handle) {
                            System.out.println(handle.getAppId()+": **********  info  changed  **********: "+handle.getState().toString());
                            log.info(handle.getAppId()+": **********  info  changed  **********: "+handle.getState().toString());
                        }
                    });
     
     
            while(!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())){
                System.out.println("id    "+handler.getAppId());
                System.out.println("state "+handler.getState());
     
                System.out.println(handler.getAppId()+": **********  info  changed  **********: "+handler.getState().toString());
                log.info(handler.getAppId()+": **********  info  changed  **********: "+handler.getState().toString());
     
                try {
                    Thread.sleep(100000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149

    打包完成后上传到部署Spark的服务器上。由于Spark Launcher所在的类引用了SparkLauncher,所以还需要把这个jar也上传到服务器上。

    spark-launcher_2.11-2.3.4.jar 下载地址:https://mvnrepository.com/artifact/org.apache.spark/spark-launcher_2.11/2.3.4

    综上,我们需要的是:

    • 一个自定义的Jar,里面包含Spark应用和SparkLauncher类
    • 一个SparkLauncher的jar,spark-launcher_2.11-2.2.0.jar 版本根据你自己的来就行
    • 一个当前目录的路径
    • 一个SARK_HOME环境变量指定的目录

    然后执行命令启动测试:

    [work@hadoop01 testSparkLanuncher]$ java -Djava.ext.dirs=/home/work/xxx/project/testSparkLanuncher -cp TestJavaSpark-1.0-SNAPSHOT.jar:spark-launcher_2.11-2.3.4.jar com.xxx.utils.Launcher local test_onwer
    或者
    [work@hadoop01 testSparkLanuncher]$ java -classpath /home/work/xxx/project/testSparkLanuncher/TestJavaSpark-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/work/xxx/project/testSparkLanuncher/spark-launcher_2.11-2.3.4.jar com.xxx.utils.Launcher local test_onwer
    
    • 1
    • 2
    • 3

    说明:

    1. -Djava.ext.dirs 设置当前目录为java类加载的目录
    2. 传入两个参数,一个是启动模式,一个是 程序owner

    观察发现成功启动运行了:

    [work@hadoop01 testSparkLanuncher]$ java -Djava.ext.dirs=/home/work/xxx/project/testSparkLanuncher -cp TestJavaSpark-1.0-SNAPSHOT.jar:spark-launcher_2.11-2.3.4.jar com.xxx.utils.Launcher local test_onwer
    id    null
    state UNKNOWN
    null: **********  info  changed  **********: UNKNOWN
    2021-04-25 15:18:41,927  INFO Launcher:51 - null: **********  info  changed  **********: UNKNOWN
    Apr 25, 2021 3:18:42 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
    Apr 25, 2021 3:18:42 PM org.apache.spark.launcher.OutputRedirector redirect
    ……
    INFO: 2021-04-25 15:18:43,834 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    ……
    INFO: 2021-04-25 15:18:43,943 INFO scheduler.FIFOSchedulableBuilder: Adding pool poolName:system_reserve maxSize:0 schedulingMode:FIFO maxConcurrency:2147483647
    Apr 25, 2021 3:18:43 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:18:43,944 INFO scheduler.FIFOSchedulableBuilder: Adding pool poolName:user maxSize:2147483645 schedulingMode:FIFO maxConcurrency:2147483647
    null: **********  state  changed  **********: CONNECTED
    2021-04-25 15:18:43,945  INFO Launcher:35 - null: **********  state  changed  **********: CONNECTED
    Apr 25, 2021 3:18:43 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:18:43,956 INFO executor.Executor: Starting executor ID driver on host localhost
    local-1619335123924: **********  info  changed  **********: CONNECTED
    2021-04-25 15:18:43,993  INFO Launcher:41 - local-1619335123924: **********  info  changed  **********: CONNECTED
    local-1619335123924: **********  state  changed  **********: RUNNING
    2021-04-25 15:18:43,995  INFO Launcher:35 - local-1619335123924: **********  state  changed  **********: RUNNING
    ……
    Apr 25, 2021 3:19:00 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: +---+--------+------+
    INFO: |age|name    |sex   |
    INFO: +---+--------+------+
    INFO: |22 |zhangsan|male  |
    INFO: |25 |lisi    |male  |
    INFO: |23 |wangwu  |female|
    INFO: +---+--------+------+
    INFO:Apr 25, 2021 3:19:00 PM org.apache.spark.launcher.OutputRedirector redirect
    ……
    Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:19:01,116 INFO utils.HelloWorld: 数据总条数为:3
    INFO: 2021-04-25 15:19:01,122 INFO status.AppStatusListener: Write local-1619335123924 with attempts: success...
    Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
    ……
    local-1619335123924: **********  state  changed  **********: FINISHED
    2021-04-25 15:19:01,160  INFO Launcher:35 - local-1619335123924: **********  state  changed  **********: FINISHED
    Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:19:01,164 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:19:01,171 INFO memory.MemoryStore: MemoryStore cleared
    Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:19:01,171 INFO storage.BlockManager: BlockManager stopped
    Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:19:01,172 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:19:01,174 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:19:01,177 INFO spark.SparkContext: Successfully stopped SparkContext
    Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:19:01,179 INFO util.ShutdownHookManager: Shutdown hook called
    Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:19:01,180 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-be25cfa7-9b93-4214-b6a4-ad81d3d4122b
    Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 2021-04-25 15:19:01,180 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-4c8ab12b-5835-4484-a63a-3d010a0e2559
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    这样就实现了基于Java应用提交Spark任务,并获得其Appliation_id和状态进行定位跟踪的需求了。

    3、通过yarn的rest api的方式提交

    第三种方式是通过yarn的rest api的方式提交(不太常用但在这里也介绍一下):

    Post请求示例: * http:///ws/v1/cluster/apps

    请求所带的参数列表:

    ItemData TypeDescription
    application-idstringThe application id
    application-namestringThe application name
    queuestringThe name of the queue to which the application should be submitted
    priorityintThe priority of the application
    am-container-specobjectThe application master container launch context, described below
    unmanaged-AMbooleanIs the application using an unmanaged application master
    max-app-attemptsintThe max number of attempts for this application
    resourceobjectThe resources the application master requires, described below
    application-typestringThe application type(MapReduce, Pig, Hive, etc)
    keep-containers-across-application-attemptsbooleanShould YARN keep the containers used by this application instead of destroying them
    application-tagsobjectList of application tags, please see the request examples on how to speciy the tags
    log-aggregation-contextobjectRepresents all of the information needed by the NodeManager to handle the logs for this application
    attempt-failures-validity-intervallongThe failure number will no take attempt failures which happen out of the validityInterval into failure count
    reservation-idstringRepresent the unique id of the corresponding reserved resource allocation in the scheduler
    am-black-listing-requestsobjectContains blacklisting information such as “enable/disable AM blacklisting” and “disable failure threshold”

    参考:https://www.cnblogs.com/itboys/p/9998666.html
    https://www.cnblogs.com/itboys/p/9958933.html

    spark-submit提交任务及参数说明

    spark-submit 可以提交任务到 spark 集群执⾏,也可以提交到 hadoop 的 yarn 集群执⾏。

    1. 例⼦
      ⼀个最简单的例⼦,部署 spark standalone 模式后,提交到本地执⾏。
    ./bin/spark-submit \
    --master spark://localhost:7077 \
    examples/src/main/python/pi.py
    
    • 1
    • 2
    • 3

    如果部署 hadoop,并且启动 yarn 后,spark 提交到 yarn 执⾏的例⼦如下。
    注意,spark 必须编译成⽀持 yarn 模式,编译 spark 的命令为:
    build/mvn -Pyarn -Phadoop-2.x -Dhadoop.version=2.x.x -DskipTests clean package其中, 2.x 为 hadoop 的版本号。编译完成后,可执⾏下⾯的命令,提交任务到 hadoop yarn 集群执⾏。

    ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 1g \
    --executor-memory 1g \
    --executor-cores 1 \
    --queue thequeue \
    examples/target/scala-2.11/jars/spark-examples*.jar 10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. spark-submit 详细参数说明

    参数名 参数说明

    --master master 的地址,提交任务到哪⾥执⾏,例如 spark://host:port, yarn, local
    
    --deploy-mode 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client
    
    --class 应⽤程序的主类,仅针对 java 或 scala 应⽤
    --name 应⽤程序的名称
    
    --jars ⽤逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下
    --packages 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标
    
    --exclude-packages 为了避免冲突 ⽽指定不包含的 package
    --repositories 远程 repository
    
    --conf PROP=VALUE
    
    指定 spark 配置属性的值,
    例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m"
    
    --properties-file 加载的配置⽂件,默认为 conf/spark-defaults.conf
    --driver-memory Driver内存,默认 1G
    
    --driver-java-options 传给 driver 的额外的 Java 选项
    
    --driver-library-path 传给 driver 的额外的库路径
    
    --driver-class-path 传给 driver 的额外的类路径
    --driver-cores Driver 的核数,默认是1。在 yarn 或者 standalone 下使⽤
    
    --executor-memory 每个 executor 的内存,默认是1G
    
    --total-executor-cores 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使⽤
    
    --num-executors 启动的 executor 数量。默认为2。在 yarn 下使⽤
    
    --executor-core 每个 executor 的核数。在yarn或者standalone下使⽤
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    sparkSql 直接执行外部 sql/hql文件

    yarn-client模式,local模式,配置文件直接在本地就可以直接运行了。

    ​ yarn-cluster在读取配置文件的时候让运维兄弟在yarn的nodeManager所有计算节的磁盘上挂载了一个hdfs共享盘(resourceManager节点上没挂),直接把配置文件和sql文件丢进去,直接cluster模式跑就和client,local模式一样。

    pom文件

    =======pom文件如下=

    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
    
        <groupId>hx.examplegroupId>
        <artifactId>sparkDwdFilterartifactId>
        <version>1.0-SNAPSHOTversion>
    
        <properties>
            <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
            <maven.compiler.encoding>UTF-8maven.compiler.encoding>
            <encoding>UTF-8encoding>
    
    
            <hadoop.version>3.0.0-cdh6.3.2hadoop.version>
    
            <hive.version>2.1.1-cdh6.3.2hive.version>
            <hbase.version>2.1.0-cdh6.3.2hbase.version>
            <scala.version>2.11.12scala.version>
            <spark.version>2.4.0-cdh6.3.2spark.version>
    
    
    
        properties>
    
        <dependencies>
    
            <dependency>
                <groupId>junitgroupId>
                <artifactId>junitartifactId>
                <version>4.11version>
                
            dependency>
            
            <dependency>
                <groupId>org.apache.hadoopgroupId>
                <artifactId>hadoop-clientartifactId>
                <version>${hadoop.version}version>
                
                <exclusions>
                    <exclusion>
                        <groupId>io.nettygroupId>
                        <artifactId>nettyartifactId>
                    exclusion>
                exclusions>
            dependency>
    
            
            <dependency>
                <groupId>io.nettygroupId>
                <artifactId>netty-allartifactId>
                <version>4.1.18.Finalversion>
            dependency>
    
            <dependency>
                <groupId>org.apache.hadoopgroupId>
                <artifactId>hadoop-commonartifactId>
                <version>${hadoop.version}version>
            dependency>
    
            
            <dependency>
                <groupId>org.scala-langgroupId>
                <artifactId>scala-libraryartifactId>
                <version>${scala.version}version>
            dependency>
            
            <dependency>
                <groupId>org.apache.sparkgroupId>
                <artifactId>spark-sql_2.11artifactId>
                <version>${spark.version}version>
                <exclusions>
                    <exclusion>
                        <groupId>com.google.guavagroupId>
                        <artifactId>guavaartifactId>
                    exclusion>
                exclusions>
            dependency>
            
            <dependency>
                <groupId>org.apache.sparkgroupId>
                <artifactId>spark-hive_2.11artifactId>
                <version>${spark.version}version>
            dependency>
    
            <dependency>
                <groupId>org.apache.hivegroupId>
                <artifactId>hive-hbase-handlerartifactId>
                <version>${hive.version}version>
            dependency>
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
            
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
            
    
            
    
    
    
    
    
            
    
    
    
    
    
            
    
    
    
    
    
    
            <dependency>
                <groupId>log4jgroupId>
                <artifactId>log4jartifactId>
                <version>1.2.15version>
                <exclusions>
                    <exclusion>
                        <groupId>javax.jmsgroupId>
                        <artifactId>jmsartifactId>
                    exclusion>
                    <exclusion>
                        <groupId>com.sun.jdmkgroupId>
                        <artifactId>jmxtoolsartifactId>
                    exclusion>
                    <exclusion>
                        <groupId>com.sun.jmxgroupId>
                        <artifactId>jmxriartifactId>
                    exclusion>
                exclusions>
            dependency>
    
        dependencies>
    
        <repositories>
            <repository>
                <id>clouderaid>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
                <releases>
                    <enabled>trueenabled>
                releases>
                <snapshots>
                    <enabled>falseenabled>
                snapshots>
            repository>
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
        repositories>
    
        <build>
            <sourceDirectory>src/main/javasourceDirectory>
    
            <plugins>
    
                <plugin>
                    <groupId>net.alchim31.mavengroupId>
                    <artifactId>scala-maven-pluginartifactId>
                    <version>3.2.2version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compilegoal>
                                <goal>testCompilegoal>
                            goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfilearg>
                                    <arg>${project.build.directory}/.scala_dependenciesarg>
                                args>
                            configuration>
                        execution>
                    executions>
                plugin>
                <plugin>
    
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-shade-pluginartifactId>
                    <version>3.1.0version>
                    <executions>
    
                        <execution>
    
                            <phase>packagephase>
                            <goals>
                                <goal>shadegoal>
                            goals>
                            <configuration>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                transformers>
                                <relocations>
                                    <relocation>
                                        <pattern>org.apache.httppattern>
                                        <shadedPattern>org.apache.myhttpshadedPattern>
                                    relocation>
                                relocations>
                                <filters>
                                    <filter>
                                        <artifact>*:*artifact>
                                        <excludes>
                                            <exclude>META-INF/maven/**exclude>
                                            <exclude>META-INF/*.SFexclude>
                                            <exclude>META-INF/*.DSAexclude>
                                            <exclude>META-INF/*.RSAexclude>
                                        excludes>
                                    filter>
                                filters>
                            configuration>
                        execution>
                    executions>
                plugin>
    
                <plugin>
                    <groupId>org.codehaus.mojogroupId>
                    <artifactId>exec-maven-pluginartifactId>
                    <version>1.2.1version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>execgoal>
                            goals>
                        execution>
                    executions>
                    <configuration>
                        <executable>javaexecutable>
                        <includeProjectDependencies>trueincludeProjectDependencies>
                        <includePluginDependencies>falseincludePluginDependencies>
                        <classpathScope>compileclasspathScope>
                        <mainClass>mainClass>
                    configuration>
                plugin>
    
                
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-compiler-pluginartifactId>
                    <configuration>
                        <source>1.8source>
                        <target>1.8target>
                    configuration>
                plugin>
            plugins>
        build>
    
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341

    代码1ods层数据清洗落地到dwd层,工具类读取外部Hql

    =代码1如下======

    package hx.com
    
    import hx.com.constant.PropConstants
    import hx.com.util.PropertieUtil
    import org.apache.hadoop.security.UserGroupInformation
    import org.apache.spark.sql.SparkSession
    
    import java.io.File
    import java.util.Properties
    import scala.io.{BufferedSource, Source}
    
    /**
     * ods层数据清洗落地到dwd层
     */
    object Ods2DwdFilterSql {
    
      def main(args: Array[String]): Unit = {
    
        val filePath: String = args(0)
    
    
        //读取集群配置文件
        val prop: Properties = PropertieUtil.load("config.properties")
    
        //本地测试读文件
    //    val prop: Properties = PropertieUtil.getProperties("/config.properties")
        System.setProperty("java.security.krb5.conf", prop.getProperty(PropConstants.KRB5_CONF_PATH))
        System.setProperty("HADOOP_USER_NAME", prop.getProperty(PropConstants.HADOOP_USER_NAME))
        System.setProperty("user.name", prop.getProperty(PropConstants.USER_NAME))
        UserGroupInformation.loginUserFromKeytab(
          prop.getProperty(PropConstants.KEYTAB_NAME), prop.getProperty(PropConstants.KEYTAB_FILE_PATH)
        )
    
        System.out.println(UserGroupInformation.getLoginUser)
    
    
        val session: SparkSession = SparkSession.builder()//.master("local[2]")
          .appName("SparkSeesionApp")
          .enableHiveSupport() //支持hive
          .getOrCreate()
    //    session.sparkContext.setLogLevel("WARN")
    
        val sql: String = doFile(filePath)
        val strings: Array[String] = sql.split(";")
        var i = 0;
        strings.foreach(sql=>{
          val startTime: Long = System.currentTimeMillis()
          println("==============第 "+(i+1)+" 次===sql开始=================")
          println(sql)
          session.sql(sql).show()
          val stopTime: Long = System.currentTimeMillis()
          val processTime: Long = (startTime - stopTime) / 1000
          println("===============第 "+(i+1)+" 次==sql结束====耗时=="+processTime+" 秒==========")
          i = i+1
        })
    
        session.stop()
      }
    
      //读取外部sql文件文件
      def doFile(fileName: String): String = {
        val file: File = new File(fileName)
        import java.io.FileInputStream
        val stream: FileInputStream = new FileInputStream(file)
        val buff: BufferedSource = Source.fromInputStream(stream,"UTF-8")
        //读取拼装SQL
        val sql: String = buff.getLines().mkString("\n")
        sql
      }
    
    
    }
    // ===================代码1结束===============
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    PropertieUtil

    =====代码2开始=

    package hx.com.util;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    
    public class PropertieUtil {
    
        public static Properties getProperties(String path){
            Properties prop = new Properties();
            InputStream inputStream = Object.class.getResourceAsStream(path);
            try {
                prop.load(inputStream);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            return prop;
        }
    
        public static Properties load(String path){
            Properties prop = new Properties();
            try {
                prop.load(new FileInputStream(path));
            } catch (Exception e) {
                e.printStackTrace();
            }
            return prop;
        }
    }
    // ===================代码2结束===============
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    conf.proerties

    namespace=ods_membership_prd
    column_family=cf
    krb5_conf_path=/etc/krb5.conf
    #krb5_conf_path=D:/workspace/canal-kafka2hbase/src/main/resources/krb5.ini
    keytab_file_path=/opt/etl/config/etl_admin.keytab
    #keytab_file_path=hdfs://HDFS0525/user/etl_admin/etl_admin.keytab
    #keytab_file_path=D:/soft/kerbros/etl_admin.keytab
    hadoop_user_name=etl_admin
    user_name=etl_admin
    keytab_name=etl_admin@xxx.com
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    使用Spark-submit提交任务封装shell脚本

    =====集群上local 启动模式======
    #!/bin/bash
    
    if [ $# -eq 1 ];then
            spark-submit --master local[4]  --class hx.com.Ods2DwdFilterSql --files /home/etl_admin/spark/config.properties sparkDwdFilter-1.0-SNAPSHOT.jar $1
    else
      echo "Please input command. eg: ./$0 filename.sql(hql)"
    fi
    =====yarn-client 启动模式=======
    
    #!/bin/bash
    
    if [ $# -eq 1 ];then
            spark-submit \
            --master yarn \
            --deploy-mode client \
            --queue default \
            --driver-memory 2g \
            --num-executors 3 \
            --executor-memory 2g \
            --executor-cores 2 \
            --class hx.com.Ods2DwdFilterSql \
            --files /home/etl_admin/spark/config.properties \
            sparkDwdFilter-1.0-SNAPSHOT.jar /opt/etl/sqlFiles/$1 
    else
      echo "Please input command. eg: ./$0 filename.sql(hql)"
    fi
    
    =======yarn-cluster 启动模式=======
    
    #!/bin/bash
    
    if [ $# -eq 1 ];then
            spark-submit \
            --master yarn \
            --deploy-mode cluster \
            --queue default \
            --driver-memory 2g \
            --num-executors 3 \
            --executor-memory 2g \
            --executor-cores 2 \
            --class hx.com.Ods2DwdFilterSql \
            --files /home/etl_admin/spark/config.properties \
            sparkDwdFilter-1.0-SNAPSHOT.jar /opt/etl/sqlFiles/$1 
    else
      echo "Please input command. eg: ./$0 filename.sql(hql)"
    fi
    =========================================================
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 提交任务
    /opt/module/spark/bin/spark-submit --class org.example.SparkReadHqlTest  --master lo
    cal[2] /opt/jar/SparkReadHql_Test3-1.0-SNAPSHOT.jar  /opt/sql/test_03.sql 2022-03-17
    
    • 1
    • 2

    spark-submit传递参数以及任务如何解析参数

    1.传参

    spark-submit传递参数有两种方式:

    1. –conf k1=v1 --conf k2=v2
    2. cli args,在jar包后追加

    详见官方文档:

    在这里插入图片描述

    2.解析
    –conf方式解析:

    sparkContext.getConf.get("k1")
    
    • 1

    cli args方式解析:

      parse(args.toList)
    
      ... ...
    
      def parse(list: List[String]): Unit = list match {
        case "--input" :: value :: tail =>
          input = value
          parse(tail)
        case "--output" :: value :: tail =>
          output = value
          parse(tail)
        case "--tmpOutputDir" :: value :: tail =>
          tmpOutputDir = value
          parse(tail)
        case "--sql" :: value :: tail =>
          sql = URLDecoder.decode(value)
          parse(tail)
        case _ :: tail =>
          parse(tail)
        case Nil =>
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    2. spark submit给main类传递参数

    如果想要给main类传递参数需要在submit脚本最后一行
    ${1}
    ${2}
    即可

    参考博客:
    https://blog.csdn.net/qq_34009542/article/details/118366474?spm=1001.2014.3001.5502
    https://blog.csdn.net/totally123/article/details/117224169

  • 相关阅读:
    Ubuntu20.04安装gRPC-go
    2022年最新阿里Java高级岗200+面试题,掌握80%进阿里没问题
    第一课:ASP.NET Core入门之简单快速搭建ASP.NET Core项目结构
    微信小程序自定义顶部导航栏
    C#Linq中的GroupBy
    spring boot 自定义 starter
    java 实现字典树(Trie)
    【计算机网络】数据链路层:使用点对点信道的数据链路层
    Java版分布式微服务云开发架构 Spring Cloud+Spring Boot+Mybatis 电子招标采购系统功能清单
    数据库联查json解析后排序失败
  • 原文地址:https://blog.csdn.net/m0_46168848/article/details/126904684