一、Spark SQL原理
一、Spark SQL优化器
三、RDD、DataSet、DataFrame的概念
1、对于结构化的数据,推荐使用DataFrame
- DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格。
- DataFrame 与 RDD 的主要区别在于,前者带有 schema 元信息,即 DataFrame 所表示的二维表数据集的每一列都带有名称和类型。这使得 Spark SQL 得以洞察更多的结构 信息,从而对藏于 DataFrame 背后的数据源以及作用于 DataFrame 之上的变换进行了针对性 的优化,最终达到大幅提升运行时效率的目标。
2、DataSet 是什么
- DataSet 是分布式数据集合。DataSet 是 Spark 1.6 中添加的一个新抽象,是DataFrame 的一个扩展。DataFrame实际上就是DataSet的Row类型,type DataFrame = DataSet[Row]
- 它提供了RDD 的优势(强类型,使用强大的 lambda 函数的能力)以及Spark SQL 优化执行引擎的优点。
- DataSet 也可以使用功能性的转换(操作 map,flatMap,filter 等等)。
➢DataSet 是 DataFrame API 的一个扩展,是SparkSQL 最新的数据抽象
➢用户友好的 API 风格,既具有类型安全检查也具有DataFrame 的查询优化特性;
➢用样例类来对DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射到 DataSet 中的字段名称;
➢DataSet 是强类型的。比如可以有 DataSet[Car],DataSet[Person]。
➢DataFrame 是 DataSet 的特列,DataFrame=DataSet[Row] ,所以可以通过 as 方法将 DataFrame 转换为DataSet。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的 表结构信息都用 Row 来表示。获取数据时需要指定顺序
3、优缺点总结
4、转换时的注意点
- 在 IDEA 中开发程序时,如果需要RDD 与DF 或者DS 之间互相操作,那么需要引入 import spark.implicits._
- 这里的 spark 不是Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必 须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用var 声明,因为 Scala 只支持 val 修饰的对象的引入。 spark-shell 中无需导入,自动完成此操作。
四、DataSet操作
1、创建DataSet
- ①准备工作
➢ 首先创建SparkSession,默认变量名都写spark
➢ 然后通过session创建sparkContext,一般默认写sc
➢ 需要导入一个隐式转换
val spark = SparkSession.builder().master("local[4]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
- ②创建DataSet的几种方式
➢通过Sequence创建DataSet
➢通过List集合创建DataSet
➢通过RDD创建DataSet
➢开发常规用法,先将一个RDD和样例类关联,再将其转化为DataSet
val seqDs = spark.createDataset(1 to 10)
val listDs = spark.createDataset(List(("a",1),("a",1),("a",1)))
val rddDs = spark.createDataset(sc.parallelize(List(("a",1,2),("a",1,2),("a",1,2))))
case class Point(label:String,x:Double,y:Double)
val points = Seq(Point("hin",3.6,2.5),Point("bar",2.6,3.5),Point("foo",2.1,3.6)).toDS()
- ③在使用sql函数时之前要导包
import org.apache.spark.sql.functions._
2、具体DataSet项目操作练习如下
object Retail_dbTest{
case class Customers(id:String,fname:String,lname:String,email:String,password:String,street:String,city:String,state:String,zipcode:String)
case class Order_items(id:String,orderID:String,productID:String,quantity:Int,subtotal:Float,productPrice:Float)
case class Orders(id:String,date:String,customerID:String,status:String)
case class Products(id:String,categoryID:String,name:String,price:String,image:String)
case class OrdersDate(date:String)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
//textfile形式加载数据
val customerslines = sc.textFile("files/04retail_db/customers.csv")
val order_itemslines = sc.textFile("files/04retail_db/order_items.csv")
val orderslines = sc.textFile("files/04retail_db/orders.csv")
val productslines = sc.textFile("files/04retail_db/products.csv")
//转成RDD
val customersRDD = customerslines.map(x=>x.replace("\"","")).map(x => x.split(","))
val order_itemsRDD = order_itemslines.map(x=>x.replace("\"","")).map(x => x.split(","))
val ordersRDD = orderslines.map(x=>x.replace("\"","")).map(x => x.split(","))
//ordersDateRDD和ordersDateDs是针对第三题单独整了一个RDD和DataSet
import java.time.LocalDate
import java.time.format.DateTimeFormatter
val ordersDateRDD = ordersRDD.map(x=>(LocalDate.parse(x(1).toString,DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")).getDayOfWeek.toString))
val ordersDateDs = ordersDateRDD.map(x=>OrdersDate(x)).toDS()
val productsRDD = productslines.map(x=>x.replace("\"","")).map(x => x.split(","))
//和样例类关联,转成DataSet
val customersDs = customersRDD.map(x => Customers(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8))).toDS()
customersDs.show()
val order_itemsDs = order_itemsRDD.map(x => Order_items(x(0), x(1), x(2), x(3).toInt, x(4).toFloat, x(5).toFloat)).toDS()
order_itemsDs.show()
val ordersDs = ordersRDD.map(x => Orders(x(0), x(1), x(2), x(3))).toDS()
ordersDs.show()
val productsDs = productsRDD.map(x => Products(x(0), x(1), x(2), x(3), x(4))).toDS()
productsDs.show()
//谁的消费额最高?
import org.apache.spark.sql.functions._
ordersDs.join(order_itemsDs,order_itemsDs("orderID")===ordersDs("id"))
.join(customersDs.alias("cust"),customersDs("id")===ordersDs("customerID"))
.groupBy("cust.fname","cust.lname")
.agg(sum("subtotal").alias("total"))
.sort(desc("total"))
.limit(1).show()
//那个产品的销量最高
order_itemsDs.groupBy("productID")
.agg(sum("quantity").alias("quantity")).alias("oi")
.join(productsDs,col("oi.productID")===productsDs("id"))
.select("id","name","quantity")
.sort(desc("quantity")).limit(1)
.show()
//所有交易的周分布情况
//提示:JDK8提供了LocalDate日期处理类,可以通过getDayOfWeek()方法获取日期对应一周中哪一天
// ordersDs.groupBy("date").agg(count("date").alias("总数")).select("date","总数").show()
ordersDateDs.groupBy("date").agg(count("date").alias("count"))
.select("date","count").show()
//购买力最强的地区
order_itemsDs.alias("oi").join(ordersDs.alias("or"),order_itemsDs("orderID")===ordersDs("id"))
.join(customersDs.alias("cu"),customersDs("id")===ordersDs("customerID"))
.groupBy("cu.city")
.agg(round(sum("oi.subtotal"),1).alias("total2"))
.select("cu.city","total2")
.orderBy(desc("total2"))
.show()
}
}
五、DataFrame操作
1、DataFrame的创建方式
- ①通过SparkSession read直接读取对应的文件json/csv,
spark.read.json("")
- ②通过SparkSession read的format load方法,
spark.read.format("csv/json").load("")
val spark = SparkSession.builder().master("local[4]")
.appName(this.getClass.getSimpleName).getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val df = spark.read.json("files/02users/users.json")
val df = spark.read.format("csv").load("files/02users/users.csv")
- ③通过样例类,直接toDF()直接把RDD转DataFrame,和转DataSet的方法类似
val peopleDF = peopleRDD.map(_.split(",")).map(x=>People(x(0),x(1).toInt)).toDF()
- ④将RDD通过和Schema关联,获取DataFrame
➢通过StructType构建Schema信息:StructField代表一个字段,第一个参数是字段名称,第二个参数是参数类型,第三个参数是是否为空默认为true
➢将每行字符串切割,切割成Array,然后将其转化为RDD[Row]类型
➢将Row类型的RDD和Schema信息关联,创建一个DataFrame
➢通过.rdd方法将Dataframe 类型转化为RDD,格式为RDD[Row]
val peopleRDD = sc.textFile("files/02users/people.txt")
//通过StructType构建Schema信息:StructField代表一个字段,第一个参数是字段名称,第二个参数是参数类型,第三个参数是是否为空默认为true
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
//将每行字符串切割,切割成Array,然后将其转化为RDD[Row]类型
val peopleRowRDD = peopleRDD.map(_.split(",")).map(x=>Row(x(0),x(1).toInt))
//将Row类型的RDD和Schema信息关联,创建一个DataFrame
val df = spark.createDataFrame(peopleRowRDD,schema)
df.createOrReplaceTempView("people2")
spark.sql("select * from people2").show()
//通过.rdd方法将Dataframe 类型转化为RDD,格式为RDD[Row]
val dfToRdd = df.rdd
df.rdd.foreach(println(_))
2、DataFrame的常用方法
println("-----schema--------------")
df.printSchema()
println("-----提取字段--------------")
//提取字段
df.select("name").show()
println("-----字段进行简单运算--------------")
//字段进行简单运算
df.select(df("name"),df("age")+1).show()
println("-----过滤筛选,where--------------")
//过滤筛选,where
df.filter(df("age")>21).show()
println("-----分组聚合,求最大值最小值等--------------")
//分组聚合,求最大值最小值等
df.groupBy("age").count().show()
println("-----创建或 覆盖临时视图--------------")
//创建或 覆盖临时视图
df.createOrReplaceTempView("people")
println("-----普通查询语句--------------")
spark.sql("select * from people").show()
3、读写Parquet
- Parquet 格式是 Spark 默认的文件存储格式,基本操作流程如下
- 1、定义schema信息
- 2、准备Row类型的RDD
- 3、创建DataFrame
- 4、把parquet文件写入本地,但路径不能已经存在
- 5、Spark SQL读parquet文件
object RradWriteParquet{
case class A(name:String,favorite_color:String,favorite_numbers:Array[Int])
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate()
val sc = spark.sparkContext
//1、定义schema信息
val schema = StructType(Array(
StructField("name", StringType),
StructField("favorite_color", StringType),
StructField("favorite_numbers", ArrayType(IntegerType))
))
//2、准备Row类型的RDD
val rdd = sc.parallelize(List(("aaa",null,Array(1,2,4)),("bbb","blue",Array(1,2,2)),("ccc","red",Array(1,6,4))))
val rowRDD = rdd.map(x=>Row(x._1,x._2,x._3))
//3、创建DataFrame
val df = spark.createDataFrame(rowRDD,schema)
//4、把parquet文件写入本地,但路径不能已经存在
df.write.parquet("D:\\JavaProjects\\ClassStudy\\Scala\\sparkdemo\\files\\output\\03")
//5、Spark SQL读parquet文件
val df2 = spark.read.parquet("D:\\JavaProjects\\ClassStudy\\Scala\\sparkdemo\\files\\output\\01")
df2.show()
df2.printSchema()
}
}
4、内置函数的使用
- 使用内置函数,首先要导包
import org.apache.spark.sql.functions._
//内置函数使用
object InnerFunc{
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate()
val sc = spark.sparkContext
val accessLog = Array(
"2016-12-27,001",
"2016-12-27,001",
"2016-12-27,002",
"2016-12-28,003",
"2016-12-28,004",
"2016-12-28,002",
"2016-12-28,002",
"2016-12-28,001"
)
val accessLogRDD = sc.makeRDD(accessLog).map(row => {
val splited = row.split(",")
Row(splited(0), splited(1).toInt)
})
val structType = StructType(Array(
StructField("day", StringType, true),
StructField("userId", IntegerType, true)
))
import spark.implicits._
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(accessLogRDD,structType)
//求pv
df.groupBy($"day")
.agg(count($"userId")
.alias("pv"))
.select($"day",$"pv")
.show()
//uv
df.groupBy($"day")
.agg(countDistinct($"userId").alias("pv"))
.select($"day",$"pv")
}
}
5、UDF自定义函数
- SparkSQL自定义函数无需打jar包,可以直接写直接调用,使用方法如下:
- 1、创建视图
- 2、注册自定义函数:spark.udf.register(FUNC_NAME , 自定义的匿名函数)
- 3、直接调用
案例如下:
object UDFtest_fname{
case class customers(fname:String,lname:String)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val df = sc.textFile("files/04retail_db/customers.csv")
.map(x => x.replace("\"", "")).map(x => {
val y = x.split(",")
(y(1),y(2))
}).map(x => customers(x._1, x._2)).toDF()
//1、创建视图
df.createOrReplaceTempView("nametable")
//2、注册自定义函数:spark.udf.register(FUNC_NAME , 函数)
spark.udf.register("namefull",(s1:String,s2:String)=>s1++"-"++s2)//s1.concat("-").concat(s2)
//3、函数使用
spark.sql("select fname,lname,namefull(fname,lname) as name from nametable ").show()
}
}