调用方法(通过调用方法实现数据分析)
show:(以表格的形式展示数据集中前N行(20)记录)
select :(投影查询,指定查询的字段)
selectExpr :(支持表达式(基本运算或者别名)的投影查询)
df
.selectExpr("id+10", "name as username")
withColumn :(添加额外列方法)
withColumnRenamed :(给列重命名方法 相当于sql中的as 别名)
val rdd = spark.sparkContext.makeRDD(List((1, "zs", true, 2000D), (2, "ls", false, 3000D)))
// rdd转换为ds或者df
import spark.implicits._
// scala隐式转换
val df = rdd.toDF("id", "name", "sex", "salary")
df
//.select("id", "name")
//.selectExpr("id+10", "name as username")
.withColumn("year_salary", $"salary" * 12)
.withColumnRenamed("name","username")
.show()
// select id,name,sex,salary,salary * 12 as yearsalary from t_user
+---+----+-----+------+-----------+
| id|name| sex|salary|year_salary|
+---+----+-----+------+-----------+
| 1| zs| true|2000.0| 24000.0|
| 2| ls|false|3000.0| 36000.0|
+---+----+-----+------+-----------+
+---+--------+-----+------+-----------+
| id|username| sex|salary|year_salary|
+---+--------+-----+------+-----------+
| 1| zs| true|2000.0| 24000.0|
| 2| ls|false|3000.0| 36000.0|
+---+--------+-----+------+-----------+
printSchema :(打印输出表的结果)
//结果类似与这样
root
|-- id: integer (nullable = false)
|-- username: string (nullable = true)
|-- sex: boolean (nullable = false)
|-- salary: double (nullable = false)
|-- year_salary: double (nullable = false)
drop :(用来删除特定列方法)
dropDuplicats :(特定列内容重复的数据只保留一个 (结果去重))
import spark.implicits._
val df = List(
(1, "zs", false, 1, 15000),
(2, "ls", false, 1, 18000),
(3, "ww", true, 1, 19000),
(4, "zl", false, 1, 18000)
).toDF("id", "name", "sex", "dept", "salary")
df
.dropDuplicates("sex") // db distinct
.show()
+---+----+-----+----+------+
| id|name| sex|dept|salary|
+---+----+-----+----+------+
| 3| ww| true| 1| 19000|
| 1| zs|false| 1| 15000|
+---+----+-----+----+------+
orderBy | sort :(结果排序)
import spark.implicits._
val df = List(
(1, "zs", false, 1, 15000),
(2, "ls", false, 1, 18000),
(3, "ww", true, 1, 19000),
(4, "zl", false, 1, 18000)
).toDF("id", "name", "sex", "dept", "salary")
df
// 根据salary和id进行结果降序排列 (依次排序 如果第一个列内容相同再根据第二个列的内容排序)
//.orderBy($"salary" desc,$"id" desc)
.sort($"salary" desc,$"id" desc)
.show()
+---+----+-----+----+------+
| id|name| sex|dept|salary|
+---+----+-----+----+------+
| 3| ww| true| 1| 19000|
| 4| zl|false| 1| 18000|
| 2| ls|false| 1| 18000|
| 1| zs|false| 1| 15000|
+---+----+-----+----+------+
where:(条件过滤 注意:SparkSql中没有having)
def m4(spark: SparkSession): Unit = {
import spark.implicits._
val df = List(
(1, "zs", false, 1, 15000),
(2, "ls", false, 1, 18000),
(3, "ww", true, 1, 19000),
(4, "zl", false, 1, 18000)
).toDF("id", "name", "sex", "dept", "salary")
df
//.where("name='zs' or salary > 18000")
// === 类似于JS判断语法 值和类型
.where($"name" === "zs")
.show()
}
+---+----+-----+----+------+
| id|name| sex|dept|salary|
+---+----+-----+----+------+
| 1| zs|false| 1| 15000|
+---+----+-----+----+------+
groupBy :(分组方法,将内容相同的数据分为一组)
def m5(spark: SparkSession): Unit = {
import spark.implicits._
val df = List(
(1, "zs", false, 1, 15000),
(2, "ls", false, 1, 18000),
(3, "ww", true, 2, 19000),
(4, "zl", false, 1, 18000)
).toDF("id", "name", "sex", "dept", "salary")
df
// 统计不同部门员工的最高工资 select dept,max(salary) from t_user group by dept
.groupBy($"dept")
.max("salary")
.where("dept = 2") // 等价于having
.show()
}
+----+-----------+
|dept|max(salary)|
+----+-----------+
| 2| 19000|
+----+-----------+
limit :(限制返回的结果条数 用在最后一个方法上)
agg :(分组后的聚合)
def m7(spark: SparkSession): Unit = {
import spark.implicits._
val df = List(
(1, "zs", false, 1, 15000),
(2, "ls", false, 1, 18000),
(3, "ww", true, 2, 19000),
(4, "zl", false, 1, 18000)
).toDF("id", "name", "sex", "dept", "salary")
df
.groupByKey(row => row.getInt(3)) // 3是第四列的下标
// 聚合操作支持:count sum avg 三种
.agg(typed.sumLong(row => row.getInt(4)))
.show()
}
+-----+--------------------------------------+
|value|TypedSumLong(org.apache.spark.sql.Row)|
+-----+--------------------------------------+
| 1| 51000|
| 2| 19000|
+-----+--------------------------------------+
case…when语句:(行转列的过程 通常情况下用于三个字段 )
def m8(spark: SparkSession): Unit = {
import spark.implicits._
val df = List(
(1, "math", 85),
(1, "chinese", 80),
(1, "english", 90),
(1, "english", 99),
(2, "math", 90),
(2, "chinese", 80)
).toDF("id", "course", "score")
// 方法一:case ...when...语句
df
// 基于表达式的投影查询
.selectExpr(
"id",
"case course when 'math' then score else 0 end as math",
"case course when 'chinese' then score else 0 end as chinese",
"case course when 'english' then score else 0 end as english"
)
.groupBy($"id")
.max("math","chinese","english")
.show()
spark.stop()
}
+---+---------+------------+------------+
| id|max(math)|max(chinese)|max(english)|
+---+---------+------------+------------+
| 1| 85| 80| 99|
| 2| 90| 80| 0|
+---+---------+------------+------------+
pivot(透视) :(简化行转列的处理 升级版行转列)
// 方法二:pivot透视方法实现(更为简单)
df
.groupBy("id")
.pivot("course") // 将课程字段math、chinese、english转换化结果表的字段
.max("score") // 保留每个科目分数最高的结果
.show()
+---+-------+-------+----+
| id|chinese|english|math|
+---+-------+-------+----+
| 1| 80| 99| 85|
| 2| 80| null| 90|
+---+-------+-------+----+
na(对结果表null指的一种处理方法)
- 赋予初始值
- 删除含有null值一行数据
def m9(spark: SparkSession): Unit = {
import spark.implicits._
val df = List(
(1, "math", 85),
(1, "chinese", 80),
(1, "english", 90),
(1, "english", 99),
(2, "math", 90),
(2, "chinese", 80)
).toDF("id", "course", "score")
df
.groupBy("id")
.pivot("course") // 将课程字段math、chinese、english转换化结果表的字段
.max("score") // 保留每个科目分数最高的结果
//.na.fill(-1,Array[String]("english")) // null 只对english字段的null填充一个初始值
.na.drop(Array[String]("math")) // null english字段中含有null值则删除这一行记录
.show()
spark.stop()
}
+---+-------+-------+----+
| id|chinese|english|math|
+---+-------+-------+----+
| 1| 80| 99| 85|
| 2| 80| -1| 90|
+---+-------+-------+----+
+---+-------+-------+----+
| id|chinese|english|math|
+---+-------+-------+----+ //math没有null值 所以结果没有变化
| 1| 80| 99| 85|
| 2| 80| null| 90|
+---+-------+-------+----+