源码
/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
demo
public class MapPartitionsOperator {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("MapPartitionsOperator").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> names = Arrays.asList("zhangsan", "lisi", "wangwu");
JavaRDD<String> nameRDD = sc.parallelize(names);
final Map<String, Integer> scoreMap = new HashMap<String, Integer>();
scoreMap.put("zhangsan", 150);
scoreMap.put("lisi", 100);
scoreMap.put("wangwu", 90);
/**
* map算子,一次处理一个partition的一条数据!
*
* mapParatitions算子,一次处理一个partition中所有的数据!
*
* 使用场景
*
* 如果RDD数据不是特别多,采用mapPartitions算子代替map算子可以加快处理速度
*
* 如果RDD数据过多,不建议使用,容易内存溢出
*/
JavaRDD<Integer> scoreRDD = nameRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Integer> call(Iterator<String> iter) throws Exception {
List<Integer> list = new ArrayList<Integer>();
while (iter.hasNext()) {
String name = iter.next();
Integer score = scoreMap.get(name);
list.add(score);
}
return list;
}
});
scoreRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer score) throws Exception {
System.out.println(score);
}
});
sc.close();
}
}
输出
150
100
90