• [spark]action算子


    spark job由action算子触发

    1.foreach、foreachPartition

    1)foreach一般用于打印

     输出

    2)foreachPartition

     

    foreach和foreachPartition类似,只不过对一个分区使用一次f函数

     

     输出

    2.count

      返回rdd元素的个数

     

    输出:4

    3.collect

    返回值为数组,将RDD所有分区中的数据通过网络传输的方式拉取到Driver端,在使用collect之前一定要确定该RDD中数据量的大小,避免造成Driver端内存的溢出。

    避免内存溢出:在使用collect之前,使用filter将数据进行过滤。

    输出: List(hello world, hello pang pang)

    4.take、first

    1)

     从RDD中拉取前num条记录到Driver端(可避免内存溢出的情况),返回值的类型为Array,如果RDD中的数据是有序的,那么take[N]代表topN。

    2)

     返回RDD中的第一个元素,相当于take(1)

    5.saveAsTextFile、write.json、saveAsHadoopFile

    1)saveAsTextFile 可以写到本地、也可以写到hdfs上

    a)写到本地

     

     

     

    b)写到hdfs上

    1. package tarns_result.others.classify
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.SparkContext
    4. import common.Tools.{getHDFSDirFiles, getSC, ifDirExistsDelete, jsonFormatREC}
    5. object NAVIFREQ_ALL_CLASSIFY_WORLDWEBJsonToREC {
    6. def main(args: Array[String]): Unit = {
    7. val sc: SparkContext =getSC("NAVIFREQ_ALL_CLASSIFY_WORLDWEBJsonToREC")
    8. val inputpath_sql="/tmp/InternationalData_Result/NAVIFREQ_ALL_CLASSIFY_WORLDWEB/"
    9. //该路径下只有一个文件,但是该文件名不规则
    10. //6.将json串转化成REC格式
    11. val inpaths: Array[String] = getHDFSDirFiles(inputpath_sql)
    12. val json_file: RDD[String] = sc.textFile(inpaths.mkString(","))
    13. val REClines:RDD[String] = jsonFormatREC(json_file)
    14. val outpath_result="hdfs://node01:8020/tmp/InternationalData_Result/NAVIFREQ_ALL_CLASSIFY_WORLDWEB_RESULT"
    15. ifDirExistsDelete(outpath_result)
    16. //7.将结果写入到hdfs
    17. REClines.repartition(1).saveAsTextFile(outpath_result)
    18. sc.stop()
    19. }
    20. }

    2)write.json(path)

    可以将spark sql查出来的结果以json串的形式直接写到hdfs上

    1. package tarns_result.others.classify
    2. import common.Tools.{getSpark, ifDirExistsDelete}
    3. import org.apache.spark.sql.DataFrame
    4. object NAVIFREQ_ALL_CLASSIFY_WORLDWEB {
    5. //按照Indicator进行去重
    6. def main(args: Array[String]): Unit = {
    7. val spark = getSpark("NAVIFREQ_ALL_CLASSIFY_WORLDWEB")
    8. val inputpath_json="hdfs://node01:8020/tmp/InternationalData_Result/HDFSToJson"
    9. //3用sparkSQL加载这个json文件生成一个DataFrame
    10. val dataframe=spark.read.json(inputpath_json+"/part-00000")
    11. dataframe.createTempView("table")
    12. spark.sql("select * from table").show()
    13. //4.从这个DataFrame中选择想要的字段
    14. val spark_sql=
    15. """
    16. |select
    17. |Indicator,
    18. |IndicatorCategory,
    19. |Unit,
    20. |IndicatorLength,
    21. |WebCode,
    22. |WebName,
    23. |SourceCode,
    24. |SourceName
    25. |from
    26. |(
    27. |select
    28. |Indicator,
    29. |IndicatorCategory,
    30. |Unit,
    31. |IndicatorLength,
    32. |WebCode,
    33. |WebName,
    34. |SourceCode,
    35. |SourceName,
    36. |row_number() over(partition by Indicator order by Unit) num
    37. |from
    38. |table
    39. |)t
    40. |where num=1
    41. |""".stripMargin
    42. val result: DataFrame = spark.sql(spark_sql)
    43. //5.将数据以json串的形式写入到hdfs
    44. val outpath_sql="hdfs://node01:8020/tmp/InternationalData_Result/NAVIFREQ_ALL_CLASSIFY_WORLDWEB"
    45. ifDirExistsDelete(outpath_sql)
    46. result.repartition(1).write.json(outpath_sql)
    47. spark.stop()
    48. }
    49. }

    也可以用write.其他格式,存成其他的格式

    3)saveAsHadoopFile(没用过)

     

     

  • 相关阅读:
    【面试】IO多路复用
    服务器免密登录设置
    http https socket rpc grpc有啥区别联系
    【软件安装】docker 安装 elasticsearch 和 kibana
    【R语言数据科学】:变量选择(二)Lasso回归
    go 流程控制之switch 语句介绍
    DataGrip数据仓库工具
    学习 Python 编程的 11 个受用终身的技巧
    vue+springboot+websocket实时聊天通讯功能
    C高级day5
  • 原文地址:https://blog.csdn.net/qq_35896718/article/details/127547301