一、两种方式
【参考官网:http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#programmatically-specifying-the-schema】
二、反射方式
1.代码
//方式一:反射方式
val spark=SparkSession.builder().appName("RDD2DataFrameSpark").master("local[2]").getOrCreate()
val rdd = spark.sparkContext.textFile("datas/info.txt")
//使用DataFrame API
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()
infoDF.show()
//后续处理
infoDF.filter(infoDF.col("age") > 30).show()
//创建一个临时的表名称
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
spark.close()
2.重点
(1)使用隐式转换toDF()直接将RDD转换为DF,但是前提是需要引入:
import spark.implicits._
(2)后续操作
既可以通过DF的API,也可以通过createDataFrame创建临时表,然后使用sql语句来操作分析。
三、编程的方式
1.代码
val spark=SparkSession.builder().appName("RDD2DataFrameSpark").master("local[2]").getOrCreate()
val rdd = spark.sparkContext.textFile("datas/info.txt")
val infoRDD: RDD[Row] = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))
val structType=StructType(Array(StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)))
// spark.createDataFrame(infoRDD,structType)
val infoDF= spark.createDataFrame(infoRDD,structType)
infoDF.printSchema()
infoDF.show()
spark.close()