• Pyspark读写csv,txt,json,xlsx,xml,avro等文件


    1. Spark读写txt文件

    读:

    1. df = spark.read.text("/home/test/testTxt.txt").show()
    2. +-------------+
    3. | value|
    4. +-------------+
    5. | a,b,c,d|
    6. |123,345,789,5|
    7. |34,45,90,9878|
    8. +-------------+

    2. Spark读写csv文件

    读:

    1. # 文件在hdfs上的位置
    2. file_path = r"/user/lanyue/data.csv"
    3. # 方法一
    4. # 推荐这种,指定什么文件格式都可以,只需要修改参数format即可
    5. # 不同的格式其load函数会有不同,用的时候请自行搜索。
    6. df = spark.read.format("csv").load(file_path, header=True, inferSchema=True, encoding="utf-8", sep=',')
    7. # sep=',',表示指定分隔符为逗号,同参数delimiter。
    8. # header=TRUE,表示数据的第一行为列名
    9. # inferSchema,表示是否对字段类型进行推测。=False,默认读取后都按照文本字符处理。=True表示自动推断schema。
    10. # 或者下面这种形式。这两种形式都可以
    11. df = spark.read.format("csv").option("encoding","utf-8").option("header",True).load(file_path, schema=schema) # 使用指定的schema
    12. # 方法二
    13. df = spark.read.csv(file_path, encoding='utf-8', header=True, inferSchema=True)
    14. df = spark.read.csv(file_path, encoding='utf-8', header=True, schema=schema)
    15. # 如果想指定文件格式是json,那就是spark.read.json,其他类似

    写:

    1. # 保存在【hdfs上】,以csv文件的格式。指定什么文件格式都可以,只需要修改参数format即可
    2. df.repartition(1).write.mode('append').format("csv").option("encoding","utf-8").option("header",True).save("/lanyue/data.csv")
    3. # mode,保存模式:ovewriter重写、append文件末尾追加、error如果文件存在抛出异常、ignore如果文件存在忽略不更新
    4. # repartition, 在yarn模式下,Spark会根据hdfs文件的块数据大小来划分默认的分区数目,但是我们也可以自己设置分区数目,使用参数repartition。=1表示只保存成一个数据块
    5. # 或者
    6. df.write.csv("/lanyue/data.csv", sep="\t", encoding="utf-8", mode='overwrite')
    7. # 如果想指定文件格式是json,那就是df.write.json,其他类似
    8. # 通过指定参数sep,来指定分隔符,可以是",", "\t","\x01"等。同参数delimiter。

    3. Spark读写parquet文件

    读:

    1. file = "/user/muzili/data.parquet"
    2. spark_df=spark.read.parquet(file)
    3. df.show()

    写:

    spark_df.write.parquet(path=file,mode='overwrite')

    4. Spark读写json文件

    读:

    1. file = "/user/muzili/data.json"
    2. df = spark.read.json(file)
    3. df.show()

    写:

    df.repartition(1).write.mode('append').format("json").option("encoding","utf-8").option("header",True).save("/user/muzili/data.json")

    5. Spark读写excel文件

    读:

    写:

    6. Spark读写xml文件

    读:

    写:

    7. Spark读写orc文件

    读:

    写:

    8. Spark读写avro文件

    读:

    写:

    9. Spark读写mysql中的表

    读:

    1. url="jdbc:mysql://host:port/database"
    2. table="table_name"
    3. driver="com.mysql.jdbc.Driver"
    4. user="XXX"
    5. password="XXX"
    6. df = spark.read.format("jdbc")
    7. .option("url",url) # database地址,格式为jdbc:mysql://主机:端口/数据库
    8. .option("dbtable",table) # 表名
    9. .option("user",user)
    10. .option("password",password)
    11. .option("driver",driver)
    12. .load()
    13. # 或者以下形式
    14. df = spark.read.format('jdbc').options(url="jdbc:mysql://host:port/database", # database地址
    15. driver="com.mysql.jdbc.Driver",
    16. dbtable="table_name",
    17. user="XXX",
    18. password="XXX").load()
    19. # 或者以下形式
    20. # mysql的相关配置
    21. prop = {'user': 'xxx',
    22. 'password': 'xxx',
    23. 'driver': 'com.mysql.jdbc.Driver'}
    24. url = 'jdbc:mysql://host:port/database' # database地址
    25. df = spark.read.jdbc(url=url, table='mysql_table_name', properties=prop)

    写:

    1. # 会自动对齐字段,也就是说,spark_df 的列不一定要全部包含MySQL的表的全部列才行
    2. prop = {'user': 'xxx',
    3. 'password': 'xxx',
    4. 'driver': 'com.mysql.jdbc.Driver'}
    5. url = 'jdbc:mysql://host:port/database' # database地址
    6. df.write.jdbc(url=url, table='table_name', mode='append', properties=prop)
    7. # append 追加方式
    8. # 或者以下形式
    9. df.write.format("jdbc")
    10. .option("url","jdbc:mysql://host:port/database") # database地址
    11. .option("dbtable","table_name")
    12. .option("user",user)
    13. .option("password",password)
    14. .option("driver",driver)
    15. .option("batchsize","1000").mode("overwrite") # overwrite 清空表再导入
    16. .save()

  • 相关阅读:
    java计算机毕业设计智慧物业管理系统源码+数据库+系统+部署+lw文档
    远程导入MySQL数据量大速度慢问题
    设计模式-结构型-06-桥接模式
    十大排序算法C++实现
    C#项目实战|人脸识别考勤
    RV1126编译ROS Kinetic环境
    H. Huge Boxes of Animal Toys
    开源SPL助力JAVA处理公共数据文件(txtcsvjsonxmlxsl)
    前端通过Blob或File文件获取二进制数据
    java计算机毕业设计面试刷题系统源码+系统+mysql数据库+lw文档
  • 原文地址:https://blog.csdn.net/qq_56870570/article/details/133356197