spark job由action算子触发
![]()
![]()
输出

foreach和foreachPartition类似,只不过对一个分区使用一次f函数
输出

返回rdd元素的个数

输出:4
![]()
返回值为数组,将RDD所有分区中的数据通过网络传输的方式拉取到Driver端,在使用collect之前一定要确定该RDD中数据量的大小,避免造成Driver端内存的溢出。
避免内存溢出:在使用collect之前,使用filter将数据进行过滤。

输出: List(hello world, hello pang pang)
1)![]()
从RDD中拉取前num条记录到Driver端(可避免内存溢出的情况),返回值的类型为Array,如果RDD中的数据是有序的,那么take[N]代表topN。
2)![]()
返回RDD中的第一个元素,相当于take(1)

a)写到本地
![]()


b)写到hdfs上
- package tarns_result.others.classify
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.SparkContext
- import common.Tools.{getHDFSDirFiles, getSC, ifDirExistsDelete, jsonFormatREC}
- object NAVIFREQ_ALL_CLASSIFY_WORLDWEBJsonToREC {
- def main(args: Array[String]): Unit = {
- val sc: SparkContext =getSC("NAVIFREQ_ALL_CLASSIFY_WORLDWEBJsonToREC")
- val inputpath_sql="/tmp/InternationalData_Result/NAVIFREQ_ALL_CLASSIFY_WORLDWEB/"
- //该路径下只有一个文件,但是该文件名不规则
- //6.将json串转化成REC格式
- val inpaths: Array[String] = getHDFSDirFiles(inputpath_sql)
- val json_file: RDD[String] = sc.textFile(inpaths.mkString(","))
- val REClines:RDD[String] = jsonFormatREC(json_file)
- val outpath_result="hdfs://node01:8020/tmp/InternationalData_Result/NAVIFREQ_ALL_CLASSIFY_WORLDWEB_RESULT"
- ifDirExistsDelete(outpath_result)
- //7.将结果写入到hdfs
- REClines.repartition(1).saveAsTextFile(outpath_result)
- sc.stop()
- }
- }
可以将spark sql查出来的结果以json串的形式直接写到hdfs上
- package tarns_result.others.classify
-
- import common.Tools.{getSpark, ifDirExistsDelete}
- import org.apache.spark.sql.DataFrame
-
- object NAVIFREQ_ALL_CLASSIFY_WORLDWEB {
- //按照Indicator进行去重
- def main(args: Array[String]): Unit = {
- val spark = getSpark("NAVIFREQ_ALL_CLASSIFY_WORLDWEB")
- val inputpath_json="hdfs://node01:8020/tmp/InternationalData_Result/HDFSToJson"
- //3用sparkSQL加载这个json文件生成一个DataFrame
- val dataframe=spark.read.json(inputpath_json+"/part-00000")
- dataframe.createTempView("table")
- spark.sql("select * from table").show()
- //4.从这个DataFrame中选择想要的字段
- val spark_sql=
- """
- |select
- |Indicator,
- |IndicatorCategory,
- |Unit,
- |IndicatorLength,
- |WebCode,
- |WebName,
- |SourceCode,
- |SourceName
- |from
- |(
- |select
- |Indicator,
- |IndicatorCategory,
- |Unit,
- |IndicatorLength,
- |WebCode,
- |WebName,
- |SourceCode,
- |SourceName,
- |row_number() over(partition by Indicator order by Unit) num
- |from
- |table
- |)t
- |where num=1
- |""".stripMargin
- val result: DataFrame = spark.sql(spark_sql)
-
-
- //5.将数据以json串的形式写入到hdfs
- val outpath_sql="hdfs://node01:8020/tmp/InternationalData_Result/NAVIFREQ_ALL_CLASSIFY_WORLDWEB"
- ifDirExistsDelete(outpath_sql)
- result.repartition(1).write.json(outpath_sql)
- spark.stop()
- }
- }
也可以用write.其他格式,存成其他的格式

![]()
