• Spark TPCDS Data Gen


    开启 Spark-Shell

    $SPARK_HOME/bin/spark-shell --master local[10] --jars {PATH}/spark-sql-perf-1.2/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar

    Gen Data

    Gen TCPDS Parquet
    val tools_path = "/opt/Beaver/tpcds-kit/tools"
    val data_path = "hdfs://{IP}:9000/tpcds_parquet_tpcds_kit_1_0/1"
    val database_name = "tpcds_parquet_tpcds_kit_1_0_scale_1_db"
    val scale = "1"
    val p = scale.toInt / 2048.0
    val catalog_returns_p = (263 * p + 1).toInt
    val catalog_sales_p = (2285 * p * 0.5 * 0.5 + 1).toInt
    val store_returns_p = (429 * p + 1).toInt
    val store_sales_p = (3164 * p * 0.5 * 0.5 + 1).toInt
    val web_returns_p = (198 * p + 1).toInt
    val web_sales_p = (1207 * p * 0.5 * 0.5 + 1).toInt
    val format = "parquet"
    val codec = "snappy"
    
    val useDoubleForDecimal = false
    
    val partitionTables = false
    val clusterByPartitionColumns = partitionTables
    
    import com.databricks.spark.sql.perf.tpcds.TPCDSTables
    spark.sqlContext.setConf(s"spark.sql.$format.compression.codec", codec)
    
    val tables = new TPCDSTables(spark, spark.sqlContext, tools_path, scale, useDoubleForDecimal)
    
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "call_center", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_page", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer", 6)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer_address", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer_demographics", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "date_dim", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "household_demographics", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "income_band", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "inventory", 6)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "item", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "promotion", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "reason", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "ship_mode", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "time_dim", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "warehouse", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_page", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_site", 1)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_sales", catalog_sales_p)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_returns", catalog_returns_p)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store_sales", store_sales_p)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store_returns", store_returns_p)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_sales", web_sales_p)
    tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_returns", web_returns_p)
    
    tables.createExternalTables(data_path, format, database_name, overwrite = true, discoverPartitions = partitionTables)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    Gen TPCH ORC
    import com.databricks.spark.sql.perf.tpch._
    
    val tools_path = "/opt/Beaver/tpch-dbgen"
    val format = "orc"
    val useDoubleForDecimal = false
    val partitionTables = false
    val scaleFactor = "1"
    
    val data_path = s"hdfs://{IP}:9000/tpch_${format}_${scaleFactor}"
    
    val numPartitions =1
    val databaseName = s"tpch_${format}_${scaleFactor}_db"
    
    
    val clusterByPartitionColumns = partitionTables
    
    val tables = new TPCHTables(spark, spark.sqlContext,
        dbgenDir = tools_path,
        scaleFactor = scaleFactor,
        useDoubleForDecimal = useDoubleForDecimal,
        useStringForDate = false)
    
    spark.sqlContext.setConf("spark.sql.files.maxRecordsPerFile", "200000000")
    
    tables.genData(
        location = data_path,
        format = format,
        overwrite = true, // overwrite the data that is already there
        partitionTables, // do not create the partitioned fact tables
        clusterByPartitionColumns, // shuffle to get partitions coalesced into single files.
        filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value
        tableFilter = "", // "" means generate all tables
        numPartitions = numPartitions) // how many dsdgen partitions to run - number of input tasks.
    
    // Create the specified database
    sql(s"drop database if exists $databaseName CASCADE")
    sql(s"create database $databaseName")
    // Create metastore tables in a specified database for your data.
    // Once tables are created, the current database will be switched to the specified database.
    tables.createExternalTables(data_path, format, databaseName, overwrite = true, discoverPartitions = false)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    创建Metadata

    Parquet create database/tables
    val tools_path = "/opt/Beaver/tpcds-kit/tools"
    val data_path = "hdfs://10.1.2.206:9000/user/sparkuser/part_tpcds_decimal_1000/"
    val database_name = "sr242_parquet_part_tpcds_decimal_1000"
    val scale = "1000"
    val useDoubleForDecimal = false
    val format = "parquet"
    val partitionTables = true
    import com.databricks.spark.sql.perf.tpcds.TPCDSTables
    
    val tables = new TPCDSTables(spark, spark.sqlContext, tools_path, scale, useDoubleForDecimal)
    tables.createExternalTables(data_path, format, database_name, overwrite = true, discoverPartitions = partitionTables)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    Arrow create database/tables
    val data_path= "hdfs://{IP}:9000/{PATH}/part_tpcds_decimal_1000/"
    val databaseName = "arrow_part_tpcds_decimal_1000"
    val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")
    val partitionTables = true
    spark.sql(s"DROP database if exists $databaseName CASCADE")
    if (spark.catalog.databaseExists(s"$databaseName")) {
    	println(s"$databaseName has exists!")
    }else{
    	spark.sql(s"create database if not exists $databaseName").show
    	spark.sql(s"use $databaseName").show
    	for (table <- tables) {
    		if (spark.catalog.tableExists(s"$table")){
    			println(s"$table has exists!")
    		}else{
    			spark.catalog.createTable(s"$table", s"$data_path/$table", "arrow")
    		}
    	}
    	if (partitionTables) {
                for (table <- tables) {
                    try{
                        spark.sql(s"ALTER TABLE $table RECOVER PARTITIONS").show
                    }catch{
                            case e: Exception => println(e)
                    }
                }
            }
    }
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    使用ALTER 修改meta 信息
    val data_path= "hdfs://{IP}:9000/{PATH}/part_tpcds_decimal_1000/"
    val databaseName = "parquet_part_tpcds_decimal_1000"
    val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")
    
    spark.sql(s"use $databaseName").show
    for (table <- tables) {
        try{
            spark.sql(s"ALTER TABLE $table SET LOCATION '$data_path/$table'").show
        }catch{
                case e: Exception => println(e)
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    【冰糖Python】TensorFlow 占位符 placeholder
    ACWing 198. 反素数 题解
    postgresql14-模式的管理(三)
    【高德地图在React项目中的使用——(二)各种配置的使用】
    js生成json文件
    数字图像处理(入门篇)三 灰度化
    软考中级系统集成项目管理工程师自学好不好过?
    c++ string用法 入门必看 超详细
    前端如何开始深度学习,那不妨试试JAX
    电子学会C/C++编程等级考试2021年09月(一级)真题解析
  • 原文地址:https://blog.csdn.net/zhixingheyi_tian/article/details/125616430