java lambda
-
Spark是用scala写的,所以提供的javaAPI都是有些是java专用的。
- SparkConf通用。sc和RDD,PariRDD需要使用java专用
-
java中没有tuple,所以要用专门的Tuple2类 + mapToPair来模拟
- tuple表示2个有联系的元素组成的整体,是种数据结构
- JavaPairRDD中的元素就是Tuple2,可以使用tuple的API,比如_1(_1和_1()都可以用)
- tuple还可以使用swap
-
java的lambda使用 -> 来表示
-
java中使用parallelize从内存中创建RDD入参可以用Stream.of.collect(Collectors.toList()) 或 Arrays.asList
public class SparkJavaAPI {
/**
* spark的javaAPI
* SparkConf通用
* sc和RDD,PariRDD需要使用java专用
* java中没有tuple,所以要用专门的Tuple2类 + mapToPair来模拟
* mapToPair返回的RDD泛型已经是Tuple2了
* tuple表示2个有联系的元素组成的整体,是种数据结构
* JavaPairRDD中的元素就是Tuple2,可以使用tuple的API,比如_1(_1和_1()都可以用)
* tuple还可以使用swap
* java的lambda使用 -> 来表示
* java中使用parallelize从内存中创建RDD入参可以用Stream.of.collect(Collectors.toList()) 或 Arrays.asList
* 排序时,PairRDD可以使用sortByKey
*/
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("SparkJavaAPI").setMaster("local[*]");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
//JavaRDD<String> rdd = sparkContext.parallelize(Arrays.asList(new String[]{"hdfs", "hive", "hbase", "hive", "spark", "spark", "spark"}));
JavaRDD<String> rdd = sparkContext.parallelize(Stream.of("hdfs", "hive", "hbase", "hive", "spark", "spark", "spark").collect(Collectors.toList()));
JavaPairRDD<String, Integer> wordAnd1 = rdd.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> reduced = wordAnd1.reduceByKey((num1, num2) -> num1 + num2);
JavaPairRDD<Integer, String> swaped = reduced.mapToPair(tp -> tp.swap());
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
sorted.foreach(tp -> System.out.println(tp._1() + " | " + tp._2));
}
/**
* SparkSession的创建可以不使用config(sparkConf),直接在builder()后面整合sparkConf
* 使用ss.sql.toJavaRDD来创建基于sql的RDD
* 没有DataFrame类型,只有DataSet<Row>
*/
private void testSS() {
//SparkConf sparkConf = new SparkConf().setAppName("SparkJavaAPI").setMaster("local[*]");
SparkSession sparkSession = SparkSession.builder()/*.config(sparkConf)*/.appName("SparkJavaAPI").master("local[*]").enableHiveSupport().getOrCreate();
JavaRDD<Row> javaRDD = sparkSession.sql("").toJavaRDD();
/*
创建StructType schema,用于创建df和ds
创建1个List<StructField>,add之后,用DataTypes.createStructType(fields)转成StructType
*/
String schemaString = "name age";
List<StructField> fields = new ArrayList<StructField>(16);
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = javaRDD.map(record -> {
//此处record是Object类型,需要toString
String[] attributes = record.toString().split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
Dataset<Row> createDataFrame = sparkSession.createDataFrame(rowRDD, schema);
}
}
纯java
https://blog.csdn.net/xiefu5hh/article/details/51782319