• CDH6.3.2 的pyspark读取excel表格数据写入hive中的问题汇总


    需求:内网通过Excel文件将数据同步到外网的CDH服务器中,将CDH中的文件数据写入hive中。

    CDH版本为:6.3.2
    spark版本为:2.4
    python版本:2.7.5
    操作系统:CentOS Linux 7
    集群方式:yarn-cluster

    一、在linux中将excel文件转换成CSV文件,然后上传到hdfs中。
    为何要先转csv呢?主要原因是pyspark直接读取excel的话,涉及到版本的冲突问题。commons-collections-3.2.2.jar 在CDH6.3.2中的版本是3.2.2.但是pyspark直接读取excel要求collections4以上的版本,虽然也尝试将4以上的版本下载放进去,但是也没效果,因为时间成本的问题,所以没有做过多的尝试了,直接转为csv后再读吧。
    spark引用第三方包

    1.1 转csv的python代码(python脚本)

    #-*- coding:utf-8 -*-
    import pandas as pd
    import os, xlrd ,sys
    
    def xlsx_to_csv_pd(fn):
        path1="/home/lzl/datax/"+fn+".xlsx"
        path2="/home/lzl/datax/"+fn+".csv"
        data_xls = pd.read_excel(path1, index_col=0)
        data_xls.to_csv(path2, encoding='utf-8')
    
    if __name__ == '__main__':
        fn=sys.argv[1]
        print(fn)
        try:
    		xlsx_to_csv_pd(fn)
    		print("转成成功!")
        except Exception as e:
    		print("转成失败!")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    1.2 数据中台上的代码(shell脚本):

    #!/bin/bash
    #@description:这是一句描述
    #@author: admin(admin)
    #@email: 
    #@date: 2023-09-26 14:44:3
    
    # 文件名称
    fn="项目投运计划"
    
    # xlsx转换成csv格式
    ssh root@cdh02 " cd /home/lzl/shell; python xlsx2csv.py $fn" 
    
    # 将文件上传到hfds上
    ssh root@cdh02 "cd /home/lzl/datax; hdfs dfs -put $fn.csv /origin_data/sgd/excel/"
    echo "上传成功~!"
    
    # 删除csv文件
    ssh root@cdh02 "cd /home/lzl/datax; rm -rf $fn.csv"
    echo "删除成功~!"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    二、pyspark写入hive中
    2.1 写入过程中遇到的问题点
    2.1.1 每列的前后空格、以及存在换行符等问题。采取的措施是:循环列,采用trim函数、regexp_replace函数处理。

    # 循环对每列去掉前后空格,以及删除换行符
    import pyspark.sql.functions as F
    from pyspark.sql.functions import col, regexp_replace
    
    for name in df.columns:
        df = df.withColumn(name, F.trim(df[name]))
        df = df.withColumn(name, regexp_replace(col(name), "\n", ""))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.1.2 个别字段存在科学计数法,需要用cast转换

    from pyspark.sql.types import *
    
    # 取消销售订单号的科学记数法
    col="销售订单号"
    df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))
    
    • 1
    • 2
    • 3
    • 4
    • 5

    去掉换行符另一种方法:换行符问题也可以参照这个

    2.2 数据中台代码(pyspark)

    # -*- coding:utf-8
    # coding=UTF-8
    
    # 引入sys,方便输出到控制台时不是乱码
    import  sys   
    reload(sys)
    sys.setdefaultencoding( "utf-8" )
    
    # 引入模块
    from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField
    from pyspark.sql import SparkSession
    from pyspark import SparkContext, SparkConf, SQLContext 
    import pandas as pd
    import pyspark.sql.functions as F
    from pyspark.sql.functions import col, regexp_replace
    from pyspark.sql.types import *
    
    # 设定资源大小
    conf=SparkConf()\
        .set("spark.jars.packages","com.crealytics:spark-excel_2.11:0.11.1")\
        .set("spark.sql.shuffle.partitions", "4")\
        .set("spark.sql.execution.arrow.enabled", "true")\
        .set("spark.driver.maxResultSize","6G")\
        .set('spark.driver.memory','6G')\
        .set('spark.executor.memory','6G')
    
    # 建立SparkSession
    spark = SparkSession \
        .builder\
        .config(conf=conf)\
        .master("local[*]")\
        .appName("dataFrameApply") \
        .enableHiveSupport() \
        .getOrCreate()
    
    # 读取cvs文件
    # 文件名称和文件位置
    fp= r"/origin_data/sgd/excel/项目投运计划.csv"
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("multiLine", "true") \
        .option("delimiter", ",") \
        .format("csv") \
        .load(fp)
    
    # 查看数据类型
    # df.printSchema()
    
    # 循环对每列去掉前后空格,以及删除换行符
    for name in df.columns:
        df = df.withColumn(name, F.trim(df[name]))
        df = df.withColumn(name, regexp_replace(col(name), "\n", ""))
    
    # 取消销售订单号的科学记数法
    col="销售订单号"
    df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))
    
    df.show(25,truncate = False) # 查看数据,允许输出25行
    
    # 设置日志级别 (这两个没用)
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    
    # 写入hive中
    spark.sql("use sgd_dev")  # 指定数据库
    
    # 创建临时表格 ,注意建表时不能用'/'和''空格分隔,否则会影响2023/9/4和2023-07-31 00:00:00这样的数据
    spark.sql("""
    CREATE TABLE IF NOT EXISTS ods_sgd_project_operating_plan_info_tmp (
        project_no                string         ,
        sale_order_no             string         ,
        customer_name             string         ,
        unoperating_amt           decimal(19,2)  , 
        expected_operating_time   string         ,
        operating_amt             decimal(19,2)  ,  
        operating_progress_track  string         ,
        is_Supplied               string         ,
        operating_submit_time     string         ,
        Signing_contract_time     string         ,
        remake                    string  
        )
     ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'    
    """)
    
    # 注册临时表
    df.createOrReplaceTempView("hdfs_df")
    # spark.sql("select * from hdfs_df limit 5").show() #查看前5行数据
    
    # 将数据插入hive临时表中
    spark.sql("""
        insert overwrite table ods_sgd_project_operating_plan_info_tmp select * from hdfs_df
    """)
    
    # 将数据导入正式环境的hive中
    spark.sql("""
        insert overwrite table ods_sgd_project_operating_plan_info select * from ods_sgd_project_operating_plan_info_tmp
    """)
    
    # 查看导入后的数据
    spark.sql("select * from ods_sgd_project_operating_plan_info limit 20").show(20,truncate = False)
    
    # 删除注册的临时表
    spark.sql("""
        drop table hdfs_df
    """)
    
    # 删除临时表
    spark.sql("""
        drop table ods_sgd_project_operating_plan_info_tmp
    """)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111

    关于spark的更多知识,可以参看Spark SQL总结

  • 相关阅读:
    金九银十复习这份美团架构师的Java核心面试宝典就够了
    【算法100天 | 19】链表拆分、深拷贝
    ABAP 报表中如何以二进制方式上传本地文件
    一种涉密场所物理信息泄漏风险评估办法
    spring boot 项目中的application不能执行是什么问题
    shell 命令
    升降 ubuntu 内核的好工具 mainline
    嵌入式学习-FreeRTOS-Day3
    全球AI人工智能领袖:Anthropic联合创始人丹妮拉·阿莫迪!
    单例模式:饿汉式
  • 原文地址:https://blog.csdn.net/m0_48830183/article/details/133317462