spark复习三:SparkJoin操作

1:准备customers.csv

scala> val dfCustomers=spark.read.format("csv").option("delimiter",",").option("quote","\"").option("escape","\"").load("file:///home/data/customers.csv")
dfCustomers: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 7 more fields]

scala> dfCustomers.show

2:导入包:

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._


3:根据csv文件格式生成DF

3.1转换customers.csv
scala> val dfCustomers=spark.read.format("csv").option("delimiter",",").option("qoute","\"").option("escape","\"").load("file:///home/data/customers.csv").withColumn("ID",col("_c0").cast(IntegerType)).withColumnRenamed("_c1","FirstName").withColumnRenamed("_c2","LastName").withColumnRenamed("_c3","HomePhone").withColumnRenamed("_c4","WorkPhone").withColumnRenamed("_c5","Address").withColumnRenamed("_c6","City").withColumnRenamed("_c7","State").withColumnRenamed("_c8","Zipcode").drop("_c0")
dfCustomers: org.apache.spark.sql.DataFrame = [FirstName: string, LastName: string ... 7 more fields]
scala> dfCustomers.show(5)

3.2转换orders.csv
scala> val dfOrders=spark.read.format("csv").option("delimiter",",").option("qoute","\"").option("escape","\"").load("file:///home/data/orders.csv").withColumn("OrderID",col("_c0").cast(IntegerType)).withColumnRenamed("_c1","OrderDate").withColumn("OrderCustomerID",col("_c2").cast(IntegerType)).withColumnRenamed("_c3","OrderStatus").drop("_c0").drop("_c2")
dfOrders: org.apache.spark.sql.DataFrame = [OrderDate: string, OrderStatus: string ... 2 more fields]

scala> dfOrders.show(false)

4把生成的DF转化为临时表:

scala> dfCustomers.createOrReplaceTempView("customers")

scala> dfOrders.createOrReplaceTempView("orders")

5联合join查询:

scala> val dfSQLResult=spark.sql("select c.FirstName,c.LastName,o.OrderDate,o.OrderStatus from customers c inner join orders o on c.ID = o.OrderCustomerID")
dfSQLResult: org.apache.spark.sql.DataFrame = [FirstName: string, LastName: string ... 2 more fields]
查询结果:
scala> dfSQLResult.show(10,false)

6DF方式联合查询:

scala> val dfResult=dfCustomers.alias("c").join(dfOrders.alias("o"),col("c.ID")===col("o.OrderCustomerID"),"inner").select(col("c.FirstName"),col("c.LastName"),col("o.OrderDate"),col("o.OrderStatus"))
dfResult: org.apache.spark.sql.DataFrame = [FirstName: string, LastName: string ... 2 more fields]

scala> dfResult.show(false)

7 Left_outer:

scala> val dfleftResult=dfCustomers.alias("c").join(dfOrders.alias("o"),col("c.ID")===col("o.OrderCustomerID"),"left_outer").select(col("c.FirstName"),col("c.LastName"),col("o.OrderDate"),col("o.OrderStatus"))
dfleftResult: org.apache.spark.sql.DataFrame = [FirstName: string, LastName: string ... 2 more fields]

scala> dfleftResult.show(5,false)

8准备order_items表,为三表联查做准备:

scala> val dfItems=spark.read.format("csv").option("header","false").option("delimiter",",").option("quote","\"").option("escape","\"").load("file:///home/data/order_items.csv")
dfItems: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 4 more fields]

scala> dfItems.show(2)

继续转化:

scala> val dfItems=spark.read.format("csv").option("header","false").option("delimiter",",").option("quote","\"").option("escape","\"").load("file:///home/data/order_items.csv").withColumn("ItemOrderID",col("_c1").cast(IntegerType)).withColumn("Quantity",col("_c3").cast(IntegerType)).withColumn("SubTotal",col("_c4").cast(DoubleType)).withColumn("ProductPrice",col("_c5").cast(DoubleType)).select("ItemOrderID","Quantity","SubTotal","ProductPrice")
dfItems: org.apache.spark.sql.DataFrame = [ItemOrderID: int, Quantity: int ... 2 more fields]

scala> dfItems.show()

三表联查(SQL实现):

scala> dfItems.createOrReplaceTempView("items")

scala> val dfSQLResult=spark.sql("select * from customers c inner join orders o on c.ID=o.OrderCustomerID inner join items i on o.OrderID=i.ItemOrderID")
dfSQLResult: org.apache.spark.sql.DataFrame = [FirstName: string, LastName: string ... 15 more fields]

scala> dfSQLResult.show

三表查询消费最多的人:

scala> val dfSQLResult=spark.sql("select c.FirstName,c.LastName,sum(i.SubTotal) as TotalPurchase from customers c inner join orders o on c.ID=o.OrderCustomerID inner join items i on o.OrderID=i.ItemOrderID group by c.FirstName,c.LastName order by TotalPurchase desc")
dfSQLResult: org.apache.spark.sql.DataFrame = [FirstName: string, LastName: string ... 1 more field]

scala> dfSQLResult.show

DF方式实现:

scala> val dfResult=dfCustomers.alias("c").join(dfOrders.alias("o"),col("c.ID")===col("o.OrderCustomerID"),"inner").join(dfItems.alias("i"),col("o.OrderID")===col("i.ItemOrderID"),"inner")
dfResult: org.apache.spark.sql.DataFrame = [FirstName: string, LastName: string ... 15 more fields]

scala> dfResult.show

继续 转化格式:

scala> val dfResult=dfCustomers.alias("c").join(dfOrders.alias("o"),col("c.ID")===col("o.OrderCustomerID"),"inner").join(dfItems.alias("i"),col("o.OrderID")===col("i.ItemOrderID"),"inner").select(col("c.FirstName"),col("c.LastName"),col("i.SubTotal"))
dfResult: org.apache.spark.sql.DataFrame = [FirstName: string, LastName: string ... 1 more field]

scala> dfResult.printSchema
root
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- SubTotal: double (nullable = true)


实现:
scala> val dfResult=dfCustomers.alias("c").join(dfOrders.alias("o"),col("c.ID")===col("o.OrderCustomerID"),"inner").join(dfItems.alias("i"),col("o.OrderID")===col("i.ItemOrderID"),"inner").select(col("c.FirstName"),col("c.LastName"),col("i.SubTotal")).groupBy(col("FirstName"),col("LastName")).agg(sum(col("SubTotal")).as("TotalPurchase")).sort(col("TotalPurchase").desc)
dfResult: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [FirstName: string, LastName: string ... 1 more field]

scala> dfResult.show

发布了26 篇原创文章 · 获赞 35 · 访问量 734

猜你喜欢

转载自blog.csdn.net/csdnliu123/article/details/105604479