如果需要RDD与DS或者DF之间操作,那么都需要引入 import spark.implicits._ 【spark不是包名,而是sparkSession对象的名称】
1、RDD 《-》 DataFrame
1、RDD -》 DataFrame (确定Schema)
a、直接手动确定:
peopleRDD.map{x =>
val para = x.split(",")
(para(0), para(1).trim.toInt)
}.toDF("name","age")
b、通过反射确定 (利用case class 的功能)
case class People(name:String, age:Int)
peopleRdd.map{ x =>
val para = x.split(",")
People(para(0),para(1).trim.toInt)
}.toDF
c、通过编程方式来确定
1、准备Scheam
val schema = StructType( StructField("name",StringType):: StructField("age",IntegerType)::Nil )
2、准备Data 【需要Row类型】
val data = peopleRdd.map{ x =>
val para = x.split(",")
Row(para(0),para(1).trim.toInt)
}
3、生成DataFrame
val dataFrame = spark.createDataFrame(data, schema)
2、DataFrame -》 RDD
dataFrame.rdd 即可, 返回的是 RDD[Row]
2、RDD 《-》 DataSet
1、RDD -》 DataSet (case class 确定schema)
case class People(name:String, age:Int)
peopleRDD.map{x =>
val para = x.split(",")
People(para(0), para(1).trim.toInt)
}.toDS
2、DataSet -》 RDD
dataSet.rdd 即可, 返回的是 RDD[People]
3、DataFrame 《-》 DataSet
1、DataSet -》 DataFrame
dataSet.toDF 即可,直接复用case class的名称
2、DataFrame -》 DataSet (Scheam需要借助case class) 【DF的列名要和 case class的列名一致。】
case class People(name:String, age:Int)
dataFrame.as[People] 即可。