DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图
DSL(练习1)
数据:user.csv
user_id name gender birthyear
1 lisi male 2000/12/21
2 wangwu male 2000/12/22
3 zhangsan female 2000/12/23
DSLTest.scala
package SparkTest.SparkSql
import org.apache.spark.sql.SparkSession
object DSLTest {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("DSLTest").master("local[*]").getOrCreate()
import session.implicits._
import org.apache.spark.sql.functions._
val df =session.read.option("header","TRUE").csv("file:///F:/JavaTest/SparkDemo/data/user.csv")
// df.printSchema()
// df.show()
//指定字段查看数据
// df.select("user_id","name","gender").show() //或者
// df.select(col( "user_id"),col( "name"),col( "gender")).show() //或者
// df.select("*").show() //或者
// df.select($"user_id",$"name",$"gender").show()
//条件筛选
// df.where(expr("gender=='male'")).show() //或者
// df.where(col("gender")==="male").show() //或者
// df.filter($"gender".equalTo("male")).show()
//计算
df.groupBy("gender").count().show() //求性别个别的总和
df.groupBy().agg(count("gender").as("counts"),min("birthyear").as("max_year")).show() //求总和与最小
df.groupBy().agg(count("gender").as("counts"),max("birthyear").as("max_year")).show() //求总和与最大
session.close()
}
}
DSL2(练习2)
数据:login.txt
id,login_date
10001,2023-02-28
10001,2023-03-01
10001,2023-03-02
10001,2023-03-13
10001,2023-03-14
10001,2023-03-15
10001,2023-03-19
10002,2023-03-01
10002,2023-03-02
10002,2023-03-03
10002,2023-03-15
DSLTest2.scala
package SparkTest.SparkSql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
object DSLTest2 {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("DSLTest2").master("local[*]").getOrCreate()
import session.implicits._
import org.apache.spark.sql.functions._
val df =session.read.option("header","TRUE").csv("file:///F:/JavaTest/SparkDemo/data/login.txt")
// df.show()
/* df.createTempView("v_login")
//处理用户连续问题
//SQL
session.sql(
"""
|
|with v1 as(
|select
|id,
|login_date,
|row_number()over(partition by id order by login_date) as rn
|from
|v_login),
|v2 as(
|select
|id,
|login_date,
|date_sub(login_date,rn) dif
|from v1
|)
|select
|id,
|count(1) as counts,
|min(login_date) as start_dt,
|max(login_date) as end_dt
|from v2
|group by id,dif
|having counts >=3
|
|""".stripMargin
).show()*/
// DSL
val win=Window.partitionBy("id").orderBy("login_date")
df.select($"id",$"login_date",row_number().over(win).as("rn"))
.select($"id",$"login_date",expr("date_sub(login_date,rn) as dif"))
.groupBy("id","dif").agg(count("*").as("counts")
,max("login_date").as("end_date")
,min("login_date").as("start_date"))
.filter($"counts">=3).drop("dif").show()
session.close()
}
}