• SparkSQL入门


    概述

    两种模式

    Spark on Hive: 语法是Spark SQL语法,实际上是在IDEA上编写java叠加SQL的代码。

    Hive on Spark: 只是替换了Hadoop的MR,改为了Spark的计算引擎。

    发展历史

    RDD => DataFrame => DataSet:

    1. 都有惰性机制,遇到行动算子才会执行。
    2. 三者都会根据Spark的内存情况自动缓存运算
    3. 三者都有分区的概念

    特点

    1. 易整合:无缝的整合了SQL查询和Spark编程
    2. 统一的数据访问方式:使用相同的方式连接不同的数据源
    3. 兼容Hive:在已有的仓库上直接运行SQL或者HQL
    4. 标准的数据连接:通过JDBC或者ODBC来连接

    数据的加载和保存

    json文件:spark数据读取时,读取后会自动解析JSON,并且附加上列名和属性类型。并且兼容RDD的算子操作,

    public class SQL_Test {
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("SparkSQL").setMaster("local[*]");
            SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
    
            DataFrameReader read = spark.read();
    
            //读取后会自动解析JSON,并且附加上列名和属性类型
            Dataset<Row> userJSON = read.json("input/user.json");
    
            //打印数据类型
            userJSON.printSchema();
    
            userJSON.show();//即收集又打印
    
            spark.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    自定义函数

    UDF操作单个数据,产生单个数据

    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.api.java.UDF2;
    import org.apache.spark.sql.types.DataTypes;
    
    public class SQL_UDF {
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("SparkSQL").setMaster("local[*]");
            SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
    
            DataFrameReader read = spark.read();
    
            //读取后会自动解析JSON,并且附加上列名和属性类型
            Dataset<Row> userJSON = read.json("input/user.json");
    
            userJSON.createOrReplaceTempView("t1");
    
            //注册函数
            spark.udf().register("myudf", new UDF2<String, Long, String>() {
                @Override
                public String call(String s, Long integer) throws Exception {
                    if(integer >= 18){
                        return s+"大侠";
                    }else{
                        return s+"小虾米";
                    }
                    //return null;
                }
            }, DataTypes.StringType);//
    
            spark.sql("select myudf(name,age) from t1").show();
    
            spark.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    UDAF操作多个数据,产生单个数据

    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.api.java.UDF2;
    import org.apache.spark.sql.types.DataTypes;
    import static org.apache.spark.sql.functions.udaf;
    
    public class SQL_UDAF {
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("SparkSQL").setMaster("local[*]");
            SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
    
            DataFrameReader read = spark.read();
    
            //读取后会自动解析JSON,并且附加上列名和属性类型
            Dataset<Row> userJSON = read.json("input/user.json");
    
            userJSON.createOrReplaceTempView("t1");
    
            //注册函数
            spark.udf().register("ageAVG", udaf(new AgeAvg(), Encoders.LONG()));//
    
            spark.sql("select name,ageAVG(age) from t1 group by name").show();
    
            spark.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    文件的读取和转换保存

    1. json格式
    2. csv格式
    3. parquet、orc格式

    Hive交互

    1. 开启Hive支持enableHiveSupport()
    2. 用户权限造假System.setProperty("HADOOP_USER_NAME","atguigu");
    3. 添加hive-site.xml到resource目录下
    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.SparkSession;
    
    /**
     * title:
     *
     * @Author 浪拍岸
     * @Create 19/10/2023 下午3:35
     * @Version 1.0
     */
    public class HiveTest {
        public static void main(String[] args) {
            System.setProperty("HADOOP_USER_NAME","atguigu");
    
            SparkConf sparkConf = new SparkConf().setAppName("SparkSQL").setMaster("local[*]");
            SparkSession spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate();
    
    //        spark.sql("show tables").show();
    
            spark.sql("select * from stu where id = 1").createOrReplaceTempView("t1");
    
            spark.sql("select * from t1").show();
    
            spark.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
  • 相关阅读:
    Linux进程相关管理(ps、top、kill)
    用已安装好的系统级别PsychoPy软件配置Python虚拟环境
    js中scrollIntoView第一次不生效,第二次生效
    2022 【SPDK原理最新视频讲解】
    集合&Set集合详解
    论文阅读笔记 | 三维目标检测——VeloFCN算法
    时间工具类-- LocalDateTimeUtil详解
    【新书推荐】大模型赛道如何实现华丽的弯道超车 —— 《分布式统一大数据虚拟文件系统 Alluxio原理、技术与实践》
    如何在 Flutter 中集成华为云函数服务
    【Spring Cloud Alibaba】seata分布式事务官方入门案例(实战版)
  • 原文地址:https://blog.csdn.net/qq_44273739/article/details/133921393