Spark RDD算子之分区操作 -- HashPartitioner、RangePartitioner、自定义分区

1、HashPartitioner

使用pairRdd.partitionBy(new spark.HashPartitioner(n)),可以分为 n 个区

Scala构建RDD分区查看方法

    def mapPartIndexFunc(i1:Int,iter: Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={
    
    
      var res = List[(Int,(Int,Int))]()
      while(iter.hasNext){
    
    
        var next = iter.next()
        res=res.::(i1,next)
      }
      res.iterator
    }
    def printRDDPart(rdd:RDD[(Int,Int)]): Unit ={
    
    
      var mapPartIndexRDDs = rdd.mapPartitionsWithIndex(mapPartIndexFunc)
      mapPartIndexRDDs.foreach(println( _))
    }

Java构建RDD分区查看方法

    private static void printPartRDD(JavaPairRDD<Integer, Integer> pairRDD) {
    
    
        JavaRDD<Tuple2<Integer, Tuple2<Integer, Integer>>> mapPartitionIndexRDD = pairRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer, Integer>>, Iterator<Tuple2<Integer, Tuple2<Integer, Integer>>>>() {
    
    
            @Override
            public Iterator<Tuple2<Integer, Tuple2<Integer, Integer>>> call(Integer partIndex, Iterator<Tuple2<Integer, Integer>> tuple2Iterator) {
    
    
                ArrayList<Tuple2<Integer, Tuple2<Integer, Integer>>> tuple2s = new ArrayList<>();

                while (tuple2Iterator.hasNext()) {
    
    
                    Tuple2<Integer, Integer> next = tuple2Iterator.next();
                    tuple2s.add(new Tuple2<Integer, Tuple2<Integer, Integer>>(partIndex, next));
                }
                return tuple2s.iterator();
            }
        }, false);

        mapPartitionIndexRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Integer, Integer>>>() {
    
    
            @Override
            public void call(Tuple2<Integer, Tuple2<Integer, Integer>> integerTuple2Tuple2) throws Exception {
    
    
                System.out.println(integerTuple2Tuple2);
            }
        });
    }


Scala版本

    val conf = new SparkConf().setMaster("local[*]").setAppName("HashPartitionerScala")
    val sc = new SparkContext(conf)
    var pairRdd = sc.parallelize(List((1,1), (1,2), (2,3), (2,4), (3,5), (3,6),(4,7), (4,8),(5,9), (5,10)))
    //未分区的输出
    printRDDPart(pairRdd)
    println("=========================")
    val partitioned = pairRdd.partitionBy(new spark.HashPartitioner(3))
    //分区后的输出
    printRDDPart(partitioned)

Java版本

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("HashPartitioner Java");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<Tuple2<Integer, Integer>> tupRdd = sc.parallelize(Arrays.asList(new Tuple2<Integer, Integer>(1, 1), new Tuple2<Integer, Integer>(1, 2)
                , new Tuple2<Integer, Integer>(2, 3), new Tuple2<Integer, Integer>(2, 4)
                , new Tuple2<Integer, Integer>(3, 5), new Tuple2<Integer, Integer>(3, 6)
                , new Tuple2<Integer, Integer>(4, 7), new Tuple2<Integer, Integer>(4, 8)
                , new Tuple2<Integer, Integer>(5, 9), new Tuple2<Integer, Integer>(5, 10)
        ));
        JavaPairRDD<Integer, Integer> pairRDD = JavaPairRDD.fromJavaRDD(tupRdd);

        JavaPairRDD<Integer, Integer> partitioned = pairRDD.partitionBy(new HashPartitioner(3));
        System.out.println("============HashPartitioner==================");
        printPartRDD(partitioned);

2、RangePartitioner

使用一个范围,将范围内的键分配给相应的分区。这种方法适用于键中有自然排序,键不为负

Scala版本

	val conf = new SparkConf().setMaster("local[*]").setAppName("RangePartitionerScala")
    val sc = new SparkContext(conf)
    var pairRdd = sc.parallelize(List((1,1), (5,10), (5,9), (2,4), (3,5), (3,6),(4,7), (4,8),(2,3), (1,2)))
    printRDDPart(pairRdd)
    println("=========================")
    val partitioned = pairRdd.partitionBy(new RangePartitioner(3,pairRdd))
    printRDDPart(partitioned)

-------------------输出------------------
(0,(1,2))
(0,(2,3))
(0,(4,8))
(0,(4,7))
(0,(3,6))
(0,(3,5))
(0,(2,4))
(0,(5,9))
(0,(5,10))
(0,(1,1))
=========================
(0,(1,2))
(0,(2,3))
(0,(2,4))
(0,(1,1))
(1,(4,8))
(1,(4,7))
(1,(3,6))
(1,(3,5))
(2,(5,9))
(2,(5,10))

上面的RDD生成的时候是乱的,但是我们让他分成三个范围,按照范围,key值为1,2的划分到第一个分区,key值为3,4的划分到第二个分区,key值为5的划分到第三个分区

3、自定义分区

要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner类并实现下面三个方法

  • numPartitions: Int:返回创建出来的分区数
  • getPartition(key: Any): Int:返回给定键的分区编号( 0 到 numPartitions-1)

下面我自定义一个分区,让key大于等于4的落在第一个分区,key>=2并且key<4的落在第二个分区,其余的落在第一个分区。

Scala版本

自定义分区器

class CustomPartitioner(numParts: Int) extends Partitioner{
    
    
  override def numPartitions: Int = numParts

  override def getPartition(key: Any): Int = {
    
    
    if(key.toString.toInt>=4){
    
    
       0
    }else if(key.toString.toInt>=2&&key.toString.toInt<4){
    
    
      1
    }else{
    
    
      2
    }
  }
}

调用分区

	val conf = new SparkConf().setMaster("local[*]").setAppName("PartitionerScala")
    val sc = new SparkContext(conf)
    var pairRdd = sc.parallelize(List((1,1), (5,10), (5,9), (2,4), (3,5), (3,6),(4,7), (4,8),(2,3), (1,2)))
    val partitionedRdd = pairRdd.partitionBy(new CustomPartitioner(3))
    printRDDPart(partitionedRdd)

----------输出-----------------
(0,(4,8))
(0,(4,7))
(0,(5,9))
(0,(5,10))
(1,(2,3))
(1,(3,6))
(1,(3,5))
(1,(2,4))
(2,(1,2))
(2,(1,1))

Java版本
自定义分区器
key大于4的落在第一个分区,[2,4)之间的落在第二个分区,其余的落在第三个分区

public class JavaCustomPart  extends Partitioner {
    
    
    int i = 1;
    public JavaCustomPart(int i){
    
    
        this.i=i;
    }
    public JavaCustomPart(){
    
    }
    @Override
    public int numPartitions() {
    
    
        return i;
    }

    @Override
    public int getPartition(Object key) {
    
    
        int keyCode = Integer.parseInt(key.toString());
        if(keyCode>=4){
    
    
            return 0;
        }else if(keyCode>=2&&keyCode<4){
    
    
            return 1;
        }else {
    
    
            return 2;
        }
    }
}

调用分区

        JavaPairRDD<Integer, Integer> customPart = pairRDD.partitionBy(new JavaCustomPart(3));
        printPartRDD(customPart);

--------------打印---------------
============CustomPartition==================
(0,(5,10))
(0,(4,8))
(0,(4,7))
(0,(5,9))
(1,(2,4))
(1,(3,5))
(1,(3,6))
(1,(2,3))
(2,(1,2))
(2,(1,1))

猜你喜欢

转载自blog.csdn.net/qq_42578036/article/details/109667713