一、反射
将对象中的属性自动映射为Datafram的列,对象中属性的类型自动映射为Dataframe中schema分别对应id、name、age、score
val spark = SparkSession.builder().master("local").appName("reflect").getOrCreate()
import spark.implicits._
//spark2.3直接读取文件为Dataset
val person = spark.read.textFile("./data/people.txt")
//读取文件为RDD
//val rdd: RDD[String] = spark.sparkContext.textFile("./data/people.txt")
val personDS = person.map(one => {
val arr = one.split(",")
Person(arr(0).toInt, arr(1).toString, arr(2).toInt, arr(3).toDouble)
})
val df = personDS.toDF()
二、动态创建schema
自动将Row类型RDD中每个数据映射为每列的数据,动态创建的ROW中数据的顺序要与创建Schema的顺序一致。
val spark = SparkSession.builder().master("local").appName("schema").getOrCreate()
val rdd: RDD[String] = spark.sparkContext.textFile("./data/people.txt")
//将RDD转变为RDD[Row]
val rowRDD = rdd.map(one => {
val arr = one.split(",")
Row(arr(0).toInt, arr(1).toString, arr(2).toInt, arr(3).toDouble)
})
//构建类型,类型放structType中,要和上面的数据顺序一致。参数为:字段、类型、是否可空
val structType = StructType(List[StructField](
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("score", LongType, nullable = true)
))
val frame = spark.createDataFrame(rowRDD,structType)