函数签名 def reduce(f: (T, T) => T): T
代码:
- /**
- * reduce()聚合
- */
- object ActionDemo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(12,13,15))
- println(rdd.reduce(_+_))
- }
- }
函数签名 def collect(): Array[T]
代码:
- /**
- *count()返回RDD中元素个数
- */
- object ActionDemo2{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(12,13,15))
- rdd.collect().foreach(println)
- }
- }
函数签名 def count(): Long
代码:
- /**
- *count()返回RDD中元素个数
- */
- object ActionDemo2{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(12,13,15))
- val rdd2 =rdd.count()
- println(rdd2)
- }
- }
函数签名 def first(): T
返回 RDD 中的第一个元素
代码:
- /**
- *first()返回RDD中的第一个元素
- */
- object ActionDemo3{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(12,13,15))
- val rdd2 =rdd.first()
- println(rdd2)
- }
- }
函数签名 def take(num: Int): Array[T]
代码:
- /**
- *take()返回由RDD前n个元素组成的数组
- */
- object ActionDemo4{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(12,13,15))
- val rdd2 =rdd.take(2)
- rdd2.foreach(println)
- }
- }
函数签名 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
代码:
- /**
- *takeOrdered()返回该RDD排序后前n个元素组成的数组
- */
- object ActionDemo5{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
- val rdd2 =rdd.takeOrdered(2)
- rdd2.foreach(println)
- }
- }
函数签名
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
代码:
- /**
- * aggregate()案例
- */
- object ActionDemo6{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
- //22 23 25 9 19 110
- //空分区也有默认值
- val rdd2 =rdd.aggregate(10)(_+_,_+_)
- println(rdd2)//318
- }
- }
函数签名 def fold(zeroValue: T)(op: (T, T) => T): T
代码:
- /**
- * fold()案例
- */
- object ActionDemo7{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
- //22 23 25 9 19 110
- //空分区也有默认值
- val rdd2 =rdd.fold(10)(_+_)
- println(rdd2)//318
- }
- }
函数签名 def countByKey(): Map[K, Long]
代码:
- /**
- * countByKey()统计每种key的个数
- */
- object ActionDemo8 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
- val rdd2 = rdd.countByKey()
- println(rdd2)
-
- }
- }
函数签名
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile( path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
将数据保存到不同格式的文件中
- // 保存成 Text 文件
- rdd.saveAsTextFile("output")
- // 序列化成对象保存到文件
- rdd.saveAsObjectFile("output1")
- // 保存成 Sequencefile 文件
- rdd.map((_,1)).saveAsSequenceFile("output2")
函数签名
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
分布式遍历 RDD 中的每一个元素,调用指定函数