今天了解了sparksql的基本知识,并且用scala语言实践了一下RDD和DataFrame之间的转换,代码如下:
package scala import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType import org.apache.spark.sql.Row import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types._ import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkConf object RDD2Dataset { case class Student(id:Int,name:String,age:Int) def main(args:Array[String]) { dynamicCreate() } private def dynamicCreate():Unit={ val conf = new SparkConf() conf.setMaster("local") .setAppName("scalawordcount") //设置运行方式为本地 val sc = new SparkContext(conf) val employeeRDD = sc.textFile("H:\\student.txt") //通过RDD读取本地文件 val schemaString="id,name,age" val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema=StructType(fields) val rowRDD=employeeRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2))) var sqlContext = new SQLContext(sc) val stuDf=sqlContext.createDataFrame(rowRDD, schema)//创建dataframe val tmpView = stuDf.registerTempTable("student") //将dataframe数据注册成表,以便于用数据库语句操作 val nameDf=sqlContext.sql("select * from student") //nameDf.write.text("result") //将查询结果写入一个文件 nameDf.show() } }