常用算子
1、MapPartition
遍历的单位是每一个partition。
遍历原理:将每一个partition的数据先加载到内存,然后再一条一条遍历。
rdd.mapPartitions((elems:Iterator[Int]) => {
println("创建连接")
while(elems.hasNext){
println("拼接SQL语句 " + elems.next)
}
println("提交")
elems
})
2、Map
遍历单位是每一条记录。
3、MapPartitionWithIndex
在遍历每一个partition的时候能够拿到每一个分区的ID号,这个算子一般用于测试环境。
rdd.mapPartitionsWithIndex((index,iterator) =>{
println("partitonId: " + index)
while(iterator.hasNext){
println(iterator.next)
}
}).count()
4、getNumPartitions
获取RDD的分区数
val partitionNum1 = rdd.
val partitionNum2 = rdd.partitions.length
5、coalesce
coalesce(…,true)若参数为true,说明分区的时候需要产生shuffle,若参数为false说明不需要产生shuffle。
增加RDD的分区数使用coalesce(…,true)或者repartition
val coalesceRDD1 = facePowerRDD.coalesce(6, true)
println("coalesceRDD1.getNumPartitions:" + coalesceRDD1.getNumPartitions)
coalesceRDD1.mapPartitionsWithIndex((index,iterator)=>{
println("partitionId" + index)
while(iterator.hasNext){
println(iterator.next)
}
iterator
}).count()
减少RDD的分区数,使用coalesce(…,false),也可以使用coalesce(…,true)但是效率会降低。
facePowerRDD
.coalesce(2, false)
.mapPartitionsWithIndex((index,iterator)=>{
println("partitionId" + index)
while(iterator.hasNext){
println(iterator.next)
}
iterator
}).count()
6、union
合并,他只是将rdd1与rdd2在逻辑上进行合并,并不会真正进行数据的合并以传输。
val rdd1 = sc.parallelize(1 to 10, 3)
val rdd2 = sc.makeRDD(11 to 20,3)
val unionRDD = rdd1.union(rdd2)
println(unionRDD.getNumPartitions)
7、zip
将两个RDD进行横向合并,但是zip是对应位置合并。
比如:非KV格式的RDD1、RDD2 zip KV格式的RDD
val zipRDD = rdd1.zip(rdd2)
zipRDD.foreach(println)
注意:
- 要进行zip的两个RDD的元素数必须一致。
- 要进行zip的两个RDD的分区数必须一致。
8、zipWitIndex
给RDD中的每一个元素加上一个唯一的索引号,非KV的RDD变成了KV格式的RDD。
val zipWithIndexRDD = rdd1.zipWithIndex()
zipWithIndexRDD.foreach(println)
zipWithIndexRDD.map(_.swap).lookup(2).foreach(println)
9、zipWithUniqueId
给RDD中的每一个元素加上一个唯一的索引号,非KV的RDD变成了KV格式的RDD。
每一个分区的第一个元素的索引号就是当前分区的分区号;
每一个分区的第二个元素的索引号就是第一个元素+分区数。
rdd1
.zipWithUniqueId()
.mapPartitions(iterator=>{
while(iterator.hasNext){
println(iterator.next)
}
iterator
}).count()
10、take(n)
取这个RDD中前n个元素,是action类算子。
11、first
取这个RDD中第一个元素,与task(1)一样,也是action类算子。
rdd1.take(5).foreach(println)
//first = take(1)
println(rdd1.first())
12、combineByKey
rdd.combineByKey(初始化函数,combiner聚合函数,reduce大聚合函数)
combineByKey作用步骤:
- 分组完成后,初始化函数会作用到每组数据的第一个元素上。
- combiner聚合函数作用到每组数据上,得到最终的combiner小聚合结果。
- 将reduce大聚合函数作用在每组数据上。
总结:
val conf = new SparkConf().setMaster(“local”)
- local:使用1个线程来模拟。
- local[10]:代码在本机使用10个线程来模拟spark的执行。
- local[*]:电脑还剩下几个core,那么就启动多少个线程来模拟。