• Spark读取文件性能优化


    在使用spark读取json文件的时候,数据文件有12G,数据读取就耗费了80秒,如果数据量更大,花费时间则更长。后来发现,时间主要浪费在数据读取的时候,类型推断上。为了提高读取速度,可以提前读取小批量的数据获取数据类型模式,然后将类型模式设置给spark的数据读取器,这样在40秒内就读完了所有的数据。

    与此同时,可以在代码中写入两条数据,推断出数据类型模式。或者,手动实现数据类型模式。

    总之,spark读取数据的时候,不要让它自动推断数据类型,就可以加速数据读取。

    0、先看不同的读取文件花费时间对比

    在这里插入图片描述

    1、直接读取数据,spark自行推断数据类型

        session
          .read
          .json(jsonDataPath)
          .select($"msg.id.userId" as "userId")
          .distinct()
          .show(10)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2、为spark的读取器设置数据类型模式

    2.1、读取一条数据来获取数据类型的模式

    取出一条样例数据,使用spark自动推断出数据类型模式,然后将数据类型设置给spark的读取器。

        val sampleData =
          """
            |{"version":1,"channel":"8xxxyyyy-uu-3A5","eventId":"FF","otherIds":[],"timestamp":1658257201,"node":"n11","nodeIp":"1.1.1.1","clientIp":"1.2.3.4","clientCountry":"","clientProvince":"","clientIsp":"","msg":{"id":{"wwId":"qwwwwwwwww","userId":"b3dhRQ==","serviceId":"111","stat":5,"credit":{"start":"1970-01-01T00:00:00Z"}},"wwId":"ww","type":3,"version":{"version":"3.11.6","channel":"827A3A5"},"eventNum":1,"events":[{"id":"re_condition1_N","time":"2022-07-19T19:00:00Z","parasNum":2,"paras":[{"key":"sid","value":"233432342"},{"key":"AB","value":"nil"}]}]}}
            |""".split('|').map(_.trim).filter(!_.equals(""))
    
        // 读取样例数据
        val tempData:RDD[String] = session.sparkContext.parallelize(sampleData)
    	// 获取数据类型模式
        val newSchema2 = session.read.json(tempData).schema
    
        session
          .read
          .schema(newSchema2)   // 将数据类型模式设置给DataFrameReader
          .json(jsonDataPath)
          .select($"msg.id.userId" as "userId")
          .distinct()
          .show(10)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    2.2、读取一个文件,来获取数据类型模式

        // 读取部分样例数据,获取自动推断出的数据类型模式
        // sampleDataPath 是一个原来数据文件的一部分
        val newSchema = session.read.json(sampleDataPath).schema
        session
          .read
          .schema(newSchema)  // 将数据类型模式设置给DataFrameReader
          .json(jsonDataPath)
          .select($"msg.id.userId" as "userId")
          .distinct()
          .show(true)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2.3、自定义数据类型模式

    自定义数据模式后,然后设置给spark的读取器。其中涉及到基本数据类型,对象类型,数组类型,数组中对象。

    // 手动设置相应的类型模式 
    val newSchema3 = StructType(Array(
          StructField("version", LongType),
          StructField("timestamp", LongType),
          StructField("nodeIp", StringType),
          StructField("node", StringType),
          StructField("eventId", StringType),
          StructField("clientProvince", StringType),
          StructField("clientIsp", StringType),
          StructField("clientIp", StringType),
          StructField("clientCountry", StringType),
          StructField("channel", StringType),
          StructField("otherIds", ArrayType(StringType, containsNull = true)),
          StructField("msg", StructType(Array(
            StructField("eventNum", LongType),
            StructField("events", ArrayType(StructType(Array(
              StructField("id", StringType),
              StructField("paras", ArrayType(StructType(Array(
                StructField("key", StringType, nullable = true),
                StructField("value", StringType, nullable = true)
              )), containsNull = true)),
              StructField("parasNum", LongType, nullable = true),
              StructField("time", StringType, nullable = true)
            )),  containsNull = true), nullable = true),
            StructField("id", StructType(Array(
              StructField("credit", StructType(Array(
                StructField("start", StringType, nullable = true)
              )), nullable = true),
              StructField("serviceId", StringType, nullable = true),
              StructField("stat", LongType, nullable = true),
              StructField("wwId", StringType, nullable = true),
              StructField("userId", StringType, nullable = true)
            )), nullable = true),
            StructField("wwId", StringType, nullable = true),
            StructField("type", LongType, nullable = true),
            StructField("version", StructType(Array(
              StructField("channel", StringType, nullable = true),
              StructField("version", StringType, nullable = true)
            )), nullable = true)
          )), nullable = true)
        ))
    
        session
          .read
          .schema(newSchema3) // 将数据类型模式设置给DataFrameReader
          .json(jsonDataPath)
          .select($"msg.id.userId" as "userId")
          .distinct()
          .show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    3、时间对比

    从时间对比可以看出,给数据读取器提前设置数据类型模式,可以将数据读取速度提高一倍。

    在这里插入图片描述

    4、注意

    • 对于给出部分数据,让spark提前推断数据类型模式,数据类型要包含全部需要处理的字段,否则会出现找不到字段的异常。
    • 可以通过适当调整spark-sumit的参数,增加资源利用率,提高spark执行效率。
    • 自定义数据类型模式的时候,需要注意使用的数据类型。
    • 这里只是给出了json的读取,csv类型格式也是类似处理方式。

    5、自定义数据类型模式

    根据官方提供的方法,有多种定义方式。下面的例子可以通过spark的源码中注释中看到。

    // 默认添加字段方式,默认字段可以为null
    val struct = (new StructType)
    .add("a", IntegerType)
    .add("b", LongType)
    .add("c", StringType)
    
    // 可以添加评论
    val struct = (new StructType)
    .add("a", IntegerType, true, "comment1")
    .add("b", LongType, false, "comment2")
    .add("c", StringType, true, "comment3")
    
    // 可以使用字符串类型
    val struct = (new StructType)
    .add("a", "int")
    .add("b", "long")
    .add("c", "string")
    
    // 通过StructType的构造方式创建
    val innerStruct =
      StructType(
        StructField("f1", IntegerType, true) ::
        StructField("f2", LongType, false) ::
        StructField("f3", BooleanType, false) :: Nil)
    
    val struct = StructType(
       StructField("a", innerStruct, true) :: Nil)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
  • 相关阅读:
    会计基础工作规范E卷
    第3章 动态SQL
    进程概念(跑路人笔记)
    【1990年-2022年】地级市人均GDP数据集(excel+shp)
    跳跃游戏(贪心思想)
    Google Earth Engine APP ——Forest Health监测APP(可下载)
    搜索店铺列表API 返回值说明
    计算机毕业设计选题推荐-社区团购管理系统-Python项目实战
    姓氏情侣家庭亲子谐音顽梗头像分销流量主微信抖音小程序开发
    网络安全(黑客)自学
  • 原文地址:https://blog.csdn.net/u013869488/article/details/126890081