版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Suubyy/article/details/82752077
-
DataFrame
的创建-
基于数据源创建
DataFrame
def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder().appName("DataFrameTest").master("local[2]").getOrCreate() /** * 以下是比较常用的直接从数据源生成DataFrame类型的数据 */ sparkSession.read.json("") sparkSession.read.table("") sparkSession.read.text("") sparkSession.read.jdbc() sparkSession.read.load("") sparkSession.read.csv("") sparkSession.read.orc("") sparkSession.read.parquet("") }
-
基于
RDD
创建DataFrame
-
如果事先知道被处理的数据类型,可以通过
case class
的方式将RDD
转换成DataFrame
,但是在spark2.1.0
中只支持22
个字段。/** * 注意这个case class 一定要定义在方法域之外,否则会报异常 * Error:(25, 103) value toDF is not a member of org.apache.spark.rdd.RDD[Person] */ case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder().appName("DataFrameTest").master("local[2]").getOrCreate() val personRDD: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\39402\\Desktop\\person.txt") import sparkSession.implicits._ val personDF: DataFrame = personRDD.map(_.split(",")).map(word => Person(word(0), word(1).toInt)).toDF() personDF.show() }
-
如果事先不知道处理的数据类型,在运行时才知道具体的数据类型,那么就用
StructType
来将RDD
转换成DataFrame
private def createDataFrameBySchema(sparkSession: SparkSession) = { //根据文件获取RDD val personRDD: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\39402\\Desktop\\person.txt") //将文件的每一行拆分并生成Row类型的RDD val rowRDD: RDD[Row] = personRDD.map(personLine => { val splitStr: Array[String] = personLine.split(",") Row(splitStr(0), Integer.valueOf(splitStr(1))) }) //定义Schema val structType = StructType(Array(StructField("name", StringType), StructField("age", IntegerType))) //将Row类型的RDD根据Schema转换成DataFrame val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, structType) personDF.show() }
-
基于反射来将
RDD
转换成DataFrame
/** * 注意这个case class 一定要定义在方法域之外,否则会报异常 * Error:(25, 103) value toDF is not a member of org.apache.spark.rdd.RDD[Person] */ case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder().appName("DataFrameTest").master("local[2]").getOrCreate() //根据文件获取RDD val personRDD: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\39402\\Desktop\\person.txt") import sparkSession.implicits._//引入隐式转换 //利用反射将RDD转换成DataFrame personRDD.map(_.split(",")).map(line => Person(line(0), line(1).toInt)).toDF() }
-
-
基于
DataSet
创建DataFrame
def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder().appName("DataFrameTest").master("local[2]").getOrCreate() //根据文件获取RDD val personRDD: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\39402\\Desktop\\person.txt") //将文件的每一行拆分并生成Row类型的RDD val rowRDD: RDD[Row] = personRDD.map(personLine => { val splitStr: Array[String] = personLine.split(",") Row(splitStr(0), Integer.valueOf(splitStr(1))) }) //定义Schema val structType = StructType(Array(StructField("name", StringType), StructField("age", IntegerType))) //将Row类型的RDD根据Schema转换成DataFrame val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, structType) //将DataFrame转换成DataSet import sparkSession.implicits._ val personDS: Dataset[Person] = personDF.as[Person] //将DataSet转换成DataFrame val df: DataFrame = personDS.toDF() df.show() }
-
-
DataFrame
操作风格-
DSL
风格/** * 注意这个case class 一定要定义在方法域之外,否则会报异常 * Error:(25, 103) value toDF is not a member of org.apache.spark.rdd.RDD[Person] */ case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder().appName("DataFrameTest").master("local[2]").getOrCreate() //根据文件获取RDD val personRDD: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\39402\\Desktop\\person.txt") import sparkSession.implicits._ //引入隐式转换 //利用反射将RDD转换成DataFrame val personDF: DataFrame = personRDD.map(_.split(",")).map(line => Person(line(0), line(1).toInt)).toDF() //DSl风格的查询方式, personDF.select($"name", $"age" + 1).show() }
-
SQL
风格/** * 注意这个case class 一定要定义在方法域之外,否则会报异常 * Error:(25, 103) value toDF is not a member of org.apache.spark.rdd.RDD[Person] */ case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder().appName("DataFrameTest").master("local[2]").getOrCreate() val peopleDF: DataFrame = sparkSession.read.json("C:\\Users\\39402\\Desktop\\people.json") peopleDF.createOrReplaceTempView("people") //第二种方式利用sql方式处理数据 //转换成临时表,用sql查询 sparkSession.sql("select * from people").show() }
-
-
DataSet
的创建-
基于
RDD
常见DataSet
private def createDataSetByRdd(sparkSession: SparkSession, personRDD: RDD[String]) = { import sparkSession.implicits._ val personDS: Dataset[Person] = personRDD.map(_.split(",")).map(splitStr => Person(splitStr(0), splitStr(1).toInt)).toDS() personDS.show() }
-
基于
DataFrame
创建DataSet
/** * 注意这个case class 一定要定义在方法域之外,否则会报异常 * Error:(25, 103) value toDF is not a member of org.apache.spark.rdd.RDD[Person] */ case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder().appName("DataFrameTest").master("local[2]").getOrCreate() //根据文件获取RDD val personRDD: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\39402\\Desktop\\person.txt") //将文件的每一行拆分并生成Row类型的RDD val rowRDD: RDD[Row] = personRDD.map(personLine => { val splitStr: Array[String] = personLine.split(",") Row(splitStr(0), Integer.valueOf(splitStr(1))) }) //定义Schema val structType = StructType(Array(StructField("name", StringType), StructField("age", IntegerType))) //将Row类型的RDD根据Schema转换成DataFrame val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, structType) //将DataFrame转换成DataSet import sparkSession.implicits._ val personDS: Dataset[Person] = personDF.as[Person] }
-