import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class join {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("join").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> persons = jsc.textFile("spark/input3/person.txt");
JavaRDD<String> addresses = jsc.textFile("spark/input3/address.txt");
JavaPairRDD<String, String> personkv = persons.mapToPair(new PairFunction<String, String,String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String,String> call(String str) throws Exception {
String[] personsplit = str.split(" |\t");
if (personsplit.length == 3)
{
String code=personsplit[2];
String value = personsplit[0] +" "+ personsplit[1];
return new Tuple2<String, String>(code, value);
}
else {
return new Tuple2<String,String>(null,null);
}
}
});
JavaPairRDD<String, String> addresskv = addresses.mapToPair(new PairFunction<String, String,String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String,String> call(String str) throws Exception {
String[] addresssplit = str.split(" |\t");
String code=addresssplit[0];
String value = addresssplit[1];
return new Tuple2<String, String>(code, value);
}
});
JavaPairRDD<String, Tuple2<String, String>> joinres=personkv.join(addresskv);
joinres.foreach(new VoidFunction<Tuple2<String, Tuple2<String, String>>>() {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Tuple2<String, String>> t) throws Exception {
System.out.println(t._2()._1+" "+t._1+ " "+t._2()._2);
}
});
joinres.saveAsTextFile("./spark/output3/");
}
}
输入:
person.txt:
1 Aaron 210000
.....
address.txt:
210000 Nanjing
.........