一:方法
JavaRDD => JavaPairRDD:通过mapToPair函数
JavaPairRDD => JavaRDD:通过map函数转换
二:代码实例如下
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
*java版JavaRDD和JavaPairRDD之间的转换
*/
public class JavaRDDToJavaPairRDD {
public static void main(String[] args) {
//创建SparkConf对象
SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> line = sc.parallelize(Arrays.asList("1 语文", "2 数学", "3 英语", "4 政治"));
/**
* 输出所有的信息
*/
line.foreach(new VoidFunction<String>() {
public void call(String num) throws Exception {
System.out.println("numbers:" + num);
}
});
/**
* 将JavaRDD转换为JavaPairRDD
*/
JavaPairRDD<String,String> prdd = line.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) throws Exception {
return new Tuple2<String, String>(s.split(" ")[0],s.split(" ")[1]);
}
});
System.out.println("JavaRDD转换为JavaPairRDD---mapToPair");
//输出信息
prdd.foreach(new VoidFunction<Tuple2<String, String>>() {
public void call(Tuple2<String, String> t) throws Exception {
System.out.println(t);
}
});
System.out.println("============1============");
/**
* 将JavaPairRDD转化为JavaRDD
*/
JavaRDD<String> javaprdd = prdd.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> t) throws Exception {
System.out.println(t);
System.out.println("第一个参数是:" + t._1);
System.out.println("第二个参数是:" + t._2);
return t._1 + " " + t._2;
}
});
System.out.println("===============2=========");
//输出信息
javaprdd.foreach(new VoidFunction<String>(){
public void call(String num) throws Exception {
System.out.println("numbers;"+num);
}
});
}
}
运行结果如下:
numbers:1 语文
numbers:2 数学
numbers:3 英语
numbers:4 政治
19/01/22 11:54:24 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 620 bytes result sent to driver
19/01/22 11:54:24 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 35 ms on localhost (1/1)
19/01/22 11:54:24 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/01/22 11:54:24 INFO DAGScheduler: Stage 0 (foreach at JavaRDDToJavaPairRDD.java:27) finished in 0.045 s
19/01/22 11:54:24 INFO DAGScheduler: Job 0 finished: foreach at JavaRDDToJavaPairRDD.java:27, took 0.205928 s
JavaRDD转换为JavaPairRDD---mapToPair
(1,语文)
(2,数学)
(3,英语)
(4,政治)
19/01/22 11:54:24 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 10 ms on localhost (1/1)
19/01/22 11:54:24 INFO DAGScheduler: Stage 1 (foreach at JavaRDDToJavaPairRDD.java:45) finished in 0.011 s
19/01/22 11:54:24 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
19/01/22 11:54:24 INFO DAGScheduler: Job 1 finished: foreach at JavaRDDToJavaPairRDD.java:45, took 0.100059 s
============1============
===============2=========
(1,语文)
第一个参数是:1
第二个参数是:语文
numbers;1 语文
(2,数学)
第一个参数是:2
第二个参数是:数学
numbers;2 数学
(3,英语)
第一个参数是:3
第二个参数是:英语
numbers;3 英语
(4,政治)
第一个参数是:4
第二个参数是:政治
numbers;4 政治
从输出结果看:
===============1=========
===============2=========
比下面的map函数体的内容先输出,
主要原因是spark的程序都是懒加载的,
你用的时候他才会执行,所以上面两行会先打印出来。