1、HashPartitioner
使用pairRdd.partitionBy(new spark.HashPartitioner(n))
,可以分为 n 个区
Scala构建RDD分区查看方法
def mapPartIndexFunc(i1:Int,iter: Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={
var res = List[(Int,(Int,Int))]()
while(iter.hasNext){
var next = iter.next()
res=res.::(i1,next)
}
res.iterator
}
def printRDDPart(rdd:RDD[(Int,Int)]): Unit ={
var mapPartIndexRDDs = rdd.mapPartitionsWithIndex(mapPartIndexFunc)
mapPartIndexRDDs.foreach(println( _))
}
Java构建RDD分区查看方法
private static void printPartRDD(JavaPairRDD<Integer, Integer> pairRDD) {
JavaRDD<Tuple2<Integer, Tuple2<Integer, Integer>>> mapPartitionIndexRDD = pairRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer, Integer>>, Iterator<Tuple2<Integer, Tuple2<Integer, Integer>>>>() {
@Override
public Iterator<Tuple2<Integer, Tuple2<Integer, Integer>>> call(Integer partIndex, Iterator<Tuple2<Integer, Integer>> tuple2Iterator) {
ArrayList<Tuple2<Integer, Tuple2<Integer, Integer>>> tuple2s = new ArrayList<>();
while (tuple2Iterator.hasNext()) {
Tuple2<Integer, Integer> next = tuple2Iterator.next();
tuple2s.add(new Tuple2<Integer, Tuple2<Integer, Integer>>(partIndex, next));
}
return tuple2s.iterator();
}
}, false);
mapPartitionIndexRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Integer, Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<Integer, Integer>> integerTuple2Tuple2) throws Exception {
System.out.println(integerTuple2Tuple2);
}
});
}
Scala版本
val conf = new SparkConf().setMaster("local[*]").setAppName("HashPartitionerScala")
val sc = new SparkContext(conf)
var pairRdd = sc.parallelize(List((1,1), (1,2), (2,3), (2,4), (3,5), (3,6),(4,7), (4,8),(5,9), (5,10)))
//未分区的输出
printRDDPart(pairRdd)
println("=========================")
val partitioned = pairRdd.partitionBy(new spark.HashPartitioner(3))
//分区后的输出
printRDDPart(partitioned)
Java版本
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("HashPartitioner Java");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tuple2<Integer, Integer>> tupRdd = sc.parallelize(Arrays.asList(new Tuple2<Integer, Integer>(1, 1), new Tuple2<Integer, Integer>(1, 2)
, new Tuple2<Integer, Integer>(2, 3), new Tuple2<Integer, Integer>(2, 4)
, new Tuple2<Integer, Integer>(3, 5), new Tuple2<Integer, Integer>(3, 6)
, new Tuple2<Integer, Integer>(4, 7), new Tuple2<Integer, Integer>(4, 8)
, new Tuple2<Integer, Integer>(5, 9), new Tuple2<Integer, Integer>(5, 10)
));
JavaPairRDD<Integer, Integer> pairRDD = JavaPairRDD.fromJavaRDD(tupRdd);
JavaPairRDD<Integer, Integer> partitioned = pairRDD.partitionBy(new HashPartitioner(3));
System.out.println("============HashPartitioner==================");
printPartRDD(partitioned);
2、RangePartitioner
使用一个范围,将范围内的键分配给相应的分区。这种方法适用于键中有自然排序,键不为负
Scala版本
val conf = new SparkConf().setMaster("local[*]").setAppName("RangePartitionerScala")
val sc = new SparkContext(conf)
var pairRdd = sc.parallelize(List((1,1), (5,10), (5,9), (2,4), (3,5), (3,6),(4,7), (4,8),(2,3), (1,2)))
printRDDPart(pairRdd)
println("=========================")
val partitioned = pairRdd.partitionBy(new RangePartitioner(3,pairRdd))
printRDDPart(partitioned)
-------------------输出------------------
(0,(1,2))
(0,(2,3))
(0,(4,8))
(0,(4,7))
(0,(3,6))
(0,(3,5))
(0,(2,4))
(0,(5,9))
(0,(5,10))
(0,(1,1))
=========================
(0,(1,2))
(0,(2,3))
(0,(2,4))
(0,(1,1))
(1,(4,8))
(1,(4,7))
(1,(3,6))
(1,(3,5))
(2,(5,9))
(2,(5,10))
上面的RDD生成的时候是乱的,但是我们让他分成三个范围,按照范围,key值为1,2的划分到第一个分区,key值为3,4的划分到第二个分区,key值为5的划分到第三个分区
3、自定义分区
要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner
类并实现下面三个方法
- numPartitions: Int:返回创建出来的分区数
- getPartition(key: Any): Int:返回给定键的分区编号( 0 到 numPartitions-1)
下面我自定义一个分区,让key大于等于4的落在第一个分区,key>=2并且key<4的落在第二个分区,其余的落在第一个分区。
Scala版本
自定义分区器
class CustomPartitioner(numParts: Int) extends Partitioner{
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
if(key.toString.toInt>=4){
0
}else if(key.toString.toInt>=2&&key.toString.toInt<4){
1
}else{
2
}
}
}
调用分区
val conf = new SparkConf().setMaster("local[*]").setAppName("PartitionerScala")
val sc = new SparkContext(conf)
var pairRdd = sc.parallelize(List((1,1), (5,10), (5,9), (2,4), (3,5), (3,6),(4,7), (4,8),(2,3), (1,2)))
val partitionedRdd = pairRdd.partitionBy(new CustomPartitioner(3))
printRDDPart(partitionedRdd)
----------输出-----------------
(0,(4,8))
(0,(4,7))
(0,(5,9))
(0,(5,10))
(1,(2,3))
(1,(3,6))
(1,(3,5))
(1,(2,4))
(2,(1,2))
(2,(1,1))
Java版本
自定义分区器
key大于4的落在第一个分区,[2,4)之间的落在第二个分区,其余的落在第三个分区
public class JavaCustomPart extends Partitioner {
int i = 1;
public JavaCustomPart(int i){
this.i=i;
}
public JavaCustomPart(){
}
@Override
public int numPartitions() {
return i;
}
@Override
public int getPartition(Object key) {
int keyCode = Integer.parseInt(key.toString());
if(keyCode>=4){
return 0;
}else if(keyCode>=2&&keyCode<4){
return 1;
}else {
return 2;
}
}
}
调用分区
JavaPairRDD<Integer, Integer> customPart = pairRDD.partitionBy(new JavaCustomPart(3));
printPartRDD(customPart);
--------------打印---------------
============CustomPartition==================
(0,(5,10))
(0,(4,8))
(0,(4,7))
(0,(5,9))
(1,(2,4))
(1,(3,5))
(1,(3,6))
(1,(2,3))
(2,(1,2))
(2,(1,1))