Spark读取普通RDD加载为DataFrame

一、反射

将对象中的属性自动映射为Datafram的列,对象中属性的类型自动映射为Dataframe中schema分别对应id、name、age、score

	val spark = SparkSession.builder().master("local").appName("reflect").getOrCreate()
    import spark.implicits._

    //spark2.3直接读取文件为Dataset
    val person = spark.read.textFile("./data/people.txt")
    //读取文件为RDD
    //val rdd: RDD[String] = spark.sparkContext.textFile("./data/people.txt")
    val personDS = person.map(one => {
      val arr = one.split(",")
      Person(arr(0).toInt, arr(1).toString, arr(2).toInt, arr(3).toDouble)
    })
    val df = personDS.toDF()

二、动态创建schema

自动将Row类型RDD中每个数据映射为每列的数据,动态创建的ROW中数据的顺序要与创建Schema的顺序一致。

	val spark = SparkSession.builder().master("local").appName("schema").getOrCreate()

    val rdd: RDD[String] = spark.sparkContext.textFile("./data/people.txt")
    //将RDD转变为RDD[Row]
    val rowRDD = rdd.map(one => {
      val arr = one.split(",")
      Row(arr(0).toInt, arr(1).toString, arr(2).toInt, arr(3).toDouble)
    })
    //构建类型,类型放structType中,要和上面的数据顺序一致。参数为:字段、类型、是否可空
    val structType = StructType(List[StructField](
      StructField("id", IntegerType, nullable = true),
      StructField("name", StringType, nullable = true),
      StructField("age", IntegerType, nullable = true),
      StructField("score", LongType, nullable = true)
    ))
    val frame = spark.createDataFrame(rowRDD,structType)
发布了197 篇原创文章 · 获赞 245 · 访问量 4万+

猜你喜欢

转载自blog.csdn.net/qq_36299025/article/details/97813965