Spark RDD--数据类型转换

版权声明:未经同意,严禁抄袭。 https://blog.csdn.net/qq_36235275/article/details/82502352

将RDD,DataFrame,DataSet之间进行互相转换

RDD -》 DataFrame

  • 直接手动转换

scala> val people = spark.read.json("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.json")
people: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> val people1 = sc.textFile("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt")
people1: org.apache.spark.rdd.RDD[String] = /opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt MapPartitionsRDD[18] at textFile at <console>:24
scala> val peopleSplit = people1.map{x => val strs = x.split(",");(strs(0),strs(1).trim.toInt)}
peopleSplit: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[19] at map at <console>:26
scala> peopleSplit.collect
res6: Array[(String, Int)] = Array((Michael,29), (Andy,30), (Justin,19))        
scala> peopleSplit.to
toDF   toDS   toDebugString   toJavaRDD   toLocalIterator   toString   top
scala> peopleSplit.toDF
res7: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
scala> peopleSplit.toDF("name","age")
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> res8.show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+
  • 通过Scala编程实现
## 创建 schema 
scala> val schema = StructType(StructField("name",StringType)::StructField("age",IntegerType)::Nil)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
## 加载RDD数据
scala> val rdd = sc.textFile("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt")
rdd: org.apache.spark.rdd.RDD[String] = /opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:30
## 创建Row对象
scala> val data = rdd.map{x => val strs = x.split(",");Row(strs(0),strs(1).trim.toInt)}
data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[2] at map at <console>:32
## 生成DF
scala> spark.createDataFrame(data,schema)
18/09/06 09:45:00 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
18/09/06 09:45:00 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
18/09/06 09:45:02 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
res0: org.apache.spark.sql.DataFrame = [name: string, age: int]
  • 反射
scala> case class People(name:String,age:Int)
defined class People
scala> rdd.map{x => val strs=x.split(",");People(strs(0),strs(1).trim.toInt)}.toDF
res2: org.apache.spark.sql.DataFrame = [name: string, age: int]

DataFrame -》 RDD

scala> res8.rdd
res10: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[26] at rdd at <console>:31

RDD -》 DataSet

scala> peopleSplit.toDS
res11: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]
scala> case class People(name:String,age:Int)
defined class People
scala> val peopleDSSplit = people1.map{x => val strs = x.split(","); People(strs(0),strs(1).trim.toInt)}
peopleDSSplit: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[27] at map at <console>:28
scala> peopleDSSplit.toDS
res12: org.apache.spark.sql.Dataset[People] = [name: string, age: int]
scala> res12.show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

DataSet -》 RDD

scala> res12.rdd
res14: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[32] at rdd at <console>:33
scala> res14.map(_.name).collect
res15: Array[String] = Array(Michael, Andy, Justin)

DataSet -》 DataFrame

scala> res12.toDF
res16: org.apache.spark.sql.DataFrame = [name: string, age: int]

DataFrame -》 Datset

scala> res16.as[People]
res17: org.apache.spark.sql.Dataset[People] = [name: string, age: int]

猜你喜欢

转载自blog.csdn.net/qq_36235275/article/details/82502352