• Spark SQL 从入门到精通 - Spark SQL 行转列、列转行案例


    Spark SQL 行转列、列转行案例

    需求分析:

    ⾏列之间的互相转换是ETL中的常见需求,在Spark SQL中,⾏转列有内建的PIVOT函数可⽤,没什么特别之处。
    ⽽列转⾏要稍微⿇烦点。

    Execution Environment

    Spark 3.2.0

    以下将通过 Spark SQL DSLSpark SQL 两种方案进行 案例分享。

    1、行转列 -> PIOVT

    data-file : city.csv
    city,yearm,count
    北京,202001,1000
    北京,202004,1023
    北京,202007,1980
    北京,202010,1098
    北京,202101,988
    北京,202104,976
    北京,202107,1098
    北京,202110,1221
    上海,202001,1222
    上海,202004,800
    上海,202007,908
    上海,202010,1009
    上海,202101,709
    上海,202104,799
    上海,202107,980
    上海,202110,897
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
       // 创建执行环境
       val session: SparkSession = SparkSession.builder()
         .appName(this.getClass.getSimpleName)
         .master("local[2]").getOrCreate()
       // 加载数据文件,注册DF
       val piovtCsvDF = session.read.format("csv")
         .option("header", true)
         .load("/Users/zhoulei/Documents/workspaces/zholei-core/ToolsLibrary/src/main/resources/city.csv")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    Spark SQL DSL

       // Spark DSL Method
       piovtCsvDF
         .groupBy("city")
         .pivot("yearm")
         .agg(sum("count"))
         .show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Spark SQL

       // 注册临时视图
       piovtCsvDF.createOrReplaceTempView("city_main")
       // Spark SQL Method
       session.sql(
         """
           |select * from city_main
           |pivot(
           |    SUM(count) for yearm in (
           |        '202001' as Q202001,
           |        '202004' as Q202004,
           |        '202007' as Q202007,
           |        '202010' as Q202010,
           |        '202101' as Q202101,
           |        '202104' as Q202104,
           |        '202107' as Q202107,
           |        '202110' as Q202110
           |        )
           |    )
           |""".stripMargin).show(false)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    2、列转行 -> UNPIOVT

    studentscore.csv
    
    un,chinese,math,English
    张三,91,92,93
    李四,80,81,32
    王五,70,78,80
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
       val unpiovtCsvDF = session.read.format("csv")
         .option("header", true)
         .load(getClass.getResource("/studentscore.csv"))
    
    • 1
    • 2
    • 3

    Spark SQL DSL

    stack(n, expr1, …, exprk) - 会将expr1, …, exprk 分割为n⾏

       unpiovtCsvDF
         .selectExpr("un", "stack(3,'chinese',chinese,'math',math,'english',english) as (subject,score)")
         .show(false)
    
    • 1
    • 2
    • 3

    Spark SQL

       // 注册临时视图
       unpiovtCsvDF.createOrReplaceTempView("student_score_main")
    
    • 1
    • 2
       // Spark SQL Method (stack function)
       session.sql(
         """
           |select
           |un,stack(3,'chinese',chinese,'math',math,'english',english) as (subject,score)
           |from student_score_main
           |""".stripMargin).show(false)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    // Spark SQL Method (lateral view explode)
    session.sql(
      """
        |select
        |un,split(temp1,':')[0] as subject,split(temp1,':')[1] as score
        |from (
        |select un,concat('chinese:',chinese,',','math:',math,',','english:',english) temp
        |from student_score_main
        |) lateral view explode(split(temp,',')) sub as temp1
        |""".stripMargin).show(false)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    知识小课堂:

    • MySQL无unpiovt(列转行)、pivot(行转列)这两个函数,unpiovt 用 union all 实现,pivot用聚合函数+ case when/if 等判断语句 实现。
  • 相关阅读:
    服务器挂机
    个人和企业如何做跨境电商?用API电商接口教你选平台选品决胜跨境电商
    Oracle-通过(RECOVER STANDBY DATABASE FROM SERVICE)方式修复DataGuard
    【动态规划】爬楼梯爬的不仅仅是楼梯
    关于Transfomer的思考
    USB接口静电整改
    [附源码]计算机毕业设计JAVAjsp疫情防控期间人员档案追寻系统
    (下)苹果有开源,但又怎样呢?
    CMake Tutorial 巡礼(3)_添加库的使用需求
    tcpdump使用大全
  • 原文地址:https://blog.csdn.net/m0_49447718/article/details/125516720