• spark写相同的HIVE表或者相同的HDFS路径时抛出异常


    1. 写相同的hive

    代码示例

    1. import os
    2. import time
    3. import logging
    4. from pyspark.sql import SparkSession
    5. spark = SparkSession \
    6. .builder \
    7. .appName("mytest") \
    8. .enableHiveSupport() \
    9. .getOrCreate()
    10. df = spark.sql("select * from xxx.xxx")
    11. print(df.show())
    12. df.write.mode("overwrite").saveAsTable("xxx.xxx")

    抛出异常

    ---------------------------------------------------------------------------
    AnalysisException                         Traceback (most recent call last)
    /tmp/ipykernel_36930/1113601968.py in 
         12 df = spark.sql("select * from dev_sztoc_crm.leon_test")
         13 print(df.show())
    ---> 14 df.write.mode("overwrite").saveAsTable("dev_sztoc_crm.leon_test")
    
    /usr/share/spark3/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy, **options)
       1158         if format is not None:
       1159             self.format(format)
    -> 1160         self._jwrite.saveAsTable(name)
       1161 
       1162     def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
    
    /usr/share/spark3/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
       1302 
       1303         answer = self.gateway_client.send_command(command)
    -> 1304         return_value = get_return_value(
       1305             answer, self.gateway_client, self.target_id, self.name)
       1306 
    
    /usr/share/spark3/python/pyspark/sql/utils.py in deco(*a, **kw)
        115                 # Hide where the exception came from that shows a non-Pythonic
        116                 # JVM exception message.
    --> 117                 raise converted from None
        118             else:
        119                 raise
    
    AnalysisException: Cannot overwrite table xxx.xxx that is also being read from
    

     查看hive表,数据还在!

    正确代码示例

    1. import os
    2. import time
    3. import logging
    4. from pyspark.sql import SparkSession
    5. spark = SparkSession \
    6. .builder \
    7. .appName("pyspark-test") \
    8. .enableHiveSupport() \
    9. .getOrCreate()
    10. df = spark.sql("select * from dev_sztoc_crm.leon_test")
    11. # 将结果保存在临时表中
    12. df.write.mode("Overwrite").saveAsTable("dev_sztoc_crm.leon_test_tmp")
    13. # 将临时表覆盖结果表
    14. spark.table("dev_sztoc_crm.leon_test_tmp").write.mode("overwrite").saveAsTable("dev_sztoc_crm.leon_test")
    15. # 删除临时表
    16. spark.sql("DROP TABLE IF EXISTS dev_sztoc_crm.leon_test_tmp")

    2. spark写相同的hdfs路径 

    1. import os
    2. import time
    3. import logging
    4. from pyspark.sql import SparkSession
    5. spark = SparkSession \
    6. .builder \
    7. .appName("mytest")
    8. .getOrCreate()
    9. df = spark.read.csv("xxx")
    10. df.write.mode("overwrite").save("xxx")

    抛出异常

    Caused by: java.io.FileNotFoundException: File does not exist: /user/leon/overwrite/xxx.csv
    

    ……

    It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

     查看原来的hdfs路径,发现文件已经被清空!

    正确代码示例

    1. import os
    2. import time
    3. import logging
    4. from pyspark.sql import SparkSession
    5. spark = SparkSession \
    6. .builder \
    7. .appName("mytest") \
    8. .getOrCreate()
    9. jvm = spark._jvm
    10. jsc = spark._jsc
    11. fs = jvm.org.apache.hadoop.fs.FileSystem.get(jsc.hadoopConfiguration())
    12. ori_path = "/user/leon/overwrite/"
    13. tmp_path = "/user/leon/overwrite_tmp/"
    14. df = spark.read.csv(ori_path)
    15. df.write.mode("overwrite").csv(tmp_path)
    16. df = spark.read.csv(tmp_path)
    17. df.write.mode("overwrite").csv(ori_path)
    18. fs.delete(spark.sparkContext._jvm.org.apache.hadoop.fs.Path(tmp_path), True)

    3. 结论

    spark读hive表并回写hive表,spark读hdfs路径并回写hdfs路径,均会抛出异常,前者数据还在,后者数据将会丢失。回写hive表,需要借助临时表;回写hdfs路径,需要借助临时目录。

  • 相关阅读:
    python小项目:实现C语言在线编译器
    neo4j数据库导出
    Python之办公自动化SFTP
    21. CSS 优化和提高性能的方法有哪些?
    # 利刃出鞘_Tomcat 核心原理解析(五)
    vr航天探索科普展vr航天科普亲子嘉年华
    Python---列表 集合 字典 推导式(本文以 字典 为主)
    mybatis中Insert如何返回主键呢?
    编程语言的未来 1、JavaScript 到 TypeScript2、Java 到 kotlin3、C++ 到 Carbon
    HiTek电源维修X光机高压发生器维修XR150-603-02
  • 原文地址:https://blog.csdn.net/L13763338360/article/details/126480629