* reduce
/**
* 行动算子: reduce
* 逻辑: 将RDD中的数据,按照指定的规则进行聚合运算
*/
@Test def reduceTest(): Unit = {
// 1. 通过集合,创建一个RDD
val rdd: RDD[Int] = sc.parallelize(1 to 100, 2)
// 2. 聚合运算
val res: Int = rdd.reduce(_ + _)
// 3. 输出结果
println(res)
}
* fold
/**
* 行动算子: fold
* 聚合运算,在分区内进行聚合的时候,以及分区之间进行聚合的时候都会带上zeroValue
*/
@Test def foldTest(): Unit = {
val rdd: RDD[Int] = sc.parallelize(1 to 100, 2)
val res: Int = rdd.fold(10)(_ + _)
println(res)
}
* aggregate
/**
* 行动算子: aggregate
*
* 定制分区内的计算逻辑和分区之间的计算逻辑
* 分区的计算逻辑,需要考虑到zeroValue
* 分区之间的计算逻辑,也需要考虑到zeroValue
*
*/
@Test def aggregateTest(): Unit = {
// 1. 准备一个集合
val rdd1: RDD[Int] = sc.parallelize(1 to 100, 2)
// 2. 聚合
val res: Int = rdd1.aggregate(80)(Math.max, _ + _)
println(res)
}
* collect
/**
* 行动算子: aggregate
*
* 定制分区内的计算逻辑和分区之间的计算逻辑
* 分区的计算逻辑,需要考虑到zeroValue
* 分区之间的计算逻辑,也需要考虑到zeroValue
*
*/
@Test def aggregateTest(): Unit = {
// 1. 准备一个集合
val rdd1: RDD[Int] = sc.parallelize(1 to 100, 2)
// 2. 聚合
val res: Int = rdd1.aggregate(80)(Math.max, _ + _)
println(res)
}
* collectAsMap
/**
* 行动算子: collectAsMap
* 注意事项: 作用在PairedRDD身上的
* 将PairedRDD的所有数据搜集起来,存入一个Map,返回到Driver端
*/
@Test def collectAsMap(): Unit = {
val rdd: RDD[(Int, String)] = sc.parallelize(Array("Lily", "Uncle Wang", "Polly")).keyBy(_.length)
val res: collection.Map[Int, String] = rdd.collectAsMap()
* count
/**
* 行动算子: count
* 统计RDD描述的数据中有多少个
*/
@Test def countTest(): Unit = {
val rdd: RDD[Int] = sc.parallelize(1 to 100)
val count: Long = rdd.count()
println(count)
}
* countByKey
/**
* 行动算子: countByKey
* 注意事项: 作用在PariedRDD
* 统计一个PairedRDD中,每一个键有多少个对应的值(键出现了多少次)
*/
@Test def countByKeyTest(): Unit = {
val rdd: RDD[(Int, String)] = sc.parallelize(Array("Jim", "Tom", "Lily", "Lucy", "Polly", "Snoppy")).keyBy(_.length)
val res: collection.Map[Int, Long] = rdd.countByKey()
println(res)
}
* take
/**
* 行动算子: take
* 逻辑: 获取RDD中前N个元素,获取由这些元素组成的集合
*/
@Test def takeTest(): Unit = {
val rdd: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7))
val res: Array[Int] = rdd.take(3)
res.foreach(println)
}
* takeSample
/**
* 行动算子: takeSample
*/
@Test def takeSample(): Unit = {
val rdd: RDD[Int] = sc.parallelize(1 to 100)
val res: Array[Int] = rdd.takeSample(withReplacement = true, 10)
res.foreach(println)
}
* takeOrdered
/**
* 行动算子: takeOrdered
* 逻辑: 从RDD中获取最小的几个元素
*/
@Test def takeOrdered(): Unit = {
val rdd: RDD[Int] = sc.parallelize(Array(1, 3, 5, 7, 9, 0, 8, 6, 4, 2))
val res: Array[Int] = rdd.takeOrdered(5)
res.foreach(println)
}
* top
/**
* 行动算子: top
* 逻辑: 从RDD中获取最大的几个元素
*/
@Test def top(): Unit = {
val rdd: RDD[Int] = sc.parallelize(Array(1, 3, 5, 7, 9, 0, 8, 6, 4, 2))
val res: Array[Int] = rdd.top(5)
res.foreach(println)
}
* first
/**
* 行动算子: first
* 逻辑: 获取RDD中第一个元素,相当于 take(1)
*/
@Test def first(): Unit = {
val res: Int = sc.parallelize(Array(1, 3, 5, 7, 9, 0, 8, 6, 4, 2)).first()
println(res)
}
* foreach
@Test def top(): Unit = {
val rdd: RDD[Int] = sc.parallelize(Array(1, 3, 5, 7, 9, 0, 8, 6, 4, 2))
//val res: Array[Int] = rdd.top(5)
rdd.foreach(println)
}
* saveAsTextFile
@Test def saveTest(): Unit = {
val rdd: RDD[Int] = sc.parallelize(1 to 100, 4)
rdd.saveAsTextFile("file/spark/out")
}