DataFrame:特殊的RDD,带有schema
创建方式
- 使用Row、Schema定义表
val rdd1: RDD[String] = sparkSession.sparkContext.textFile("hdfs://bigdata11:9000/input/user.txt")
val rdd2: RDD[Array[String]] = rdd1.map(_.split("\t"))
val rdd3: RDD[Row] = rdd2.map(x => {
//11 fazi
val id = x(0).toInt
val name = x(1).toString
//封装一行信息
Row(id, name)
})
//创建schema(描述DataFrame信息)sql表
val schema: StructType = StructType(List(
StructField("id", IntegerType, true),
StructField("name", StringType, true)
))
//创建dataFrame
val userDF: DataFrame = sparkSession.createDataFrame(rdd3,schema)
//注册表
userDF.createOrReplaceTempView("table")
sparkSession.sql("select * from table").show()
- 使用case class定义表
case class TestDemo(id:Int, name:String)
val rdd1: RDD[String] = sparkSession.sparkContext.textFile("hdfs://bigdata11:9000/input/user.txt")
val rdd2: RDD[Array[String]] = rdd1.map(_.split("\t"))
val rdd3: RDD[TestDemo] = rdd2.map(x => TestDemo(x(0).toInt,x(1)))
//将dataRdd转换成DataFrame,需要导入隐式转换规则
import sparkSession.implicits._
val userDF: DataFrame = rdd3.toDF()
//注册表
userDF.createOrReplaceTempView("table")
sparkSession.sql("select * from table").show()
- 直接读取一个带格式的文件(parquet、json、jdbc、orc、libsvm、csv、text)
//加载数据源
//方式1
val urlData: DataFrame = sparkSession.read.json("/Users/fazi/Desktop/url_data.json")
//方式2
val urlData2: DataFrame = sparkSession.read.format("json").load("/Users/fazi/Desktop/url_data.json")
urlData.createOrReplaceTempView("table")
sparkSession.sql("select * from table").show()
- 从MySql数据库读取
//加载数据源方法1
val urlData: DataFrame = sparkSession.read.format("jdbc").options(Map(
"url" -> "jdbc:mysql://localhost:3306/FZTest",
"driver" -> "com.mysql.cj.jdbc.Driver",
"dbtable" -> "table",
"user" -> "root",
"password" -> "mysql"
)).load()
//加载数据源方法2,通过Properties
val sqlProps: Properties = new Properties()
sqlProps.setProperty("url","jdbc:mysql://localhost:3306/FZTest")
sqlProps.setProperty("user","root")
sqlProps.setProperty("password","mysql")
val urlData2: DataFrame = sparkSession.read.jdbc("jdbc:mysql://localhost:3306/FZTest","table",sqlProps)
urlData.createOrReplaceTempView("table")
val result: DataFrame = sparkSession.sql("select * from table where uid > 2")
使用
官方文档有这么一句话
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
DataFrames支持RDD操作,通过访问Row中的索引或名称来获取数据。
import sparkSession.implicits._
val result: DataFrame = sparkSession.sql("select * from table where uid > 2")
result.filter(row => (row(1).toString.contains("123") || row(1).toString.contains("456"))).show()
result.map(row => "Name: " + row(1)).show()
可以对sql执行出来的DataFrame再进行筛选操作等。