1.使用Structured Streaming读取Socket数据,把单词和单词的反转组成 json 格式写入到当前目录中的file文件夹中
package com.wzy.code.code01
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object StructuredStreaming {
def main(args: Array[String]): Unit = {
//使用Structured Streaming读取Socket数据,把单词和单词的反转组成 json 格式写入到当前目录中的file文件夹中
val spark = SparkSession.builder().appName("Json").master("local[*]").getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
val liens: DataFrame = spark.readStream
.option("host", "node01")
.option("port", 9999)
.format("socket")
.load()
import spark.implicits._
val dataDS: Dataset[String] = liens.as[String]
val reverseDF = dataDS.flatMap(_.split(" ")).map({
x =>
(x, x.reverse)
}).toDF("before", "reverse")
reverseDF.show()
reverseDF.writeStream
.format("json")
.option("path","E:\\SparkCode\\SparkSql\\Day0417Work\\file")
.option("checkpointLocation","./ck1")// 必须指定 checkpoint 目录,否则报错
.trigger(Trigger.ProcessingTime(0))
.start()
.awaitTermination()
}
}
下面题目需要数据:
数据需要下载一下:链接:https://pan.baidu.com/s/1JqzjgnnQwZlYY2a4spIg6g
提取码:kecv
2.请使用Structured Streaming读取student_info文件夹写的csv文件
2.1统计出文件中的男女生各有多少人
2.2统计出姓“王”男生和女生的各有多少人
package com.wzy.code.code02
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
object StructuredStreaming {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("Student_Info").getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
//学号 姓名 性别 所属班级编号 入学日期
val scheam = new StructType()
.add("id", IntegerType)
.add("name", StringType)
.add("sex", StringType)
.add("class", StringType)
.add("root", StringType)
val csvDF: DataFrame = spark.readStream
.schema(scheam).csv("file:///E:\\SparkCode\\SparkSql\\Day0417Work\\student_info")
import spark.implicits._
//2.1、统计出文件中的男女生各有多少人
//2.2、统计出姓“王”男生和女生的各有多少人
val count: DataFrame = csvDF.groupBy("sex").count()
val name = csvDF.where("name like '王%'").groupBy("sex").count()
// val count = csvDF.select("name")
name.writeStream
.format("console")
.outputMode("complete")
.trigger(Trigger.ProcessingTime(0))
.start().awaitTermination()
}
}
3.请使用Structured Streaming读取department_info文件夹写的csv文件
3.1统计出各个院系的分别多少条信息
package com.wzy.code.code03
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
object work03 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("dep").master("local[*]").getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
val scheam = new StructType()
.add("id", IntegerType)
.add("name", StringType)
val csvDF: DataFrame = spark.readStream
.schema(scheam).csv("department_info")
// 3.1统计出各个院系的分别多少条信息
val count = csvDF.groupBy("name").count().as("个数")
count.writeStream
.format("console")
.outputMode("complete")
.trigger(Trigger.ProcessingTime(0))
.start().awaitTermination()
}
}
4.请使用spark sql读取student_score文件夹写的csv文件
分别使用SQL格式和DSL格式书写代码
4.1、统计出每个班级的最高分数
4.2、统计出男生最高分
4.3、统计出女生最高分
4.4、分别统计出男生和女生的分数前三名
4.5、统计出分数在500分以上的人数
4.6、统计出分数在300分以下的人中男女各占多少
package com.wzy.code.code04
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.{DataFrame, SparkSession}
object SparkSql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("sql").getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
val cvsDF: DataFrame = spark.read.csv("student_score")
//学号,姓名,性别,所属班级编号,入学成绩
val studentDF: DataFrame = cvsDF.toDF("id", "name", "sex", "classID", "score")
studentDF.show()
/*
4、请使用spark sql读取student_score文件夹写的csv文件
4.1、统计出每个班级的最高分数
4.2、统计出男生最高分
4.3、统计出女生最高分
4.4、分别统计出男生和女生的分数前三名
4.5、统计出分数在500分以上的人数
4.6、统计出分数在300分以下的人中男女各占多少
*/
import spark.implicits._
import org.apache.spark.sql.functions._
// -------------------------SQL--------------------------------------
println("-------------------------SQL------------------------")
// 4.1、统计出每个班级的最高分数
studentDF.createOrReplaceTempView("scores")
spark.sql("select classID,max(score) from scores group by classID ").show()
// 4.2、统计出男生最高分
spark.sql("select * from scores where sex='男' AND score=(select max(score) from scores where sex='男')").show()
// 4.3、统计出女生最高分
spark.sql("select * from scores where sex='女' AND score=(select max(score) from scores where sex='女')").show()
// 4.4、分别统计出男生和女生的分数前三名
spark.sql("select name,classID,score,sex,row_number() over(partition by sex order by score desc) rk from scores having rk<=3 ").show()
// 4.5、统计出分数在500分以上的人数
spark.sql("select name,classID,sex,score,count(name) over() count from scores where score>500").show()
// 4.6、统计出分数在300分以下的人中男女各占多少
spark.sql("select sex,count(sex) from (select * from scores where score<300) group by sex ").show()
println("-------------------------DSL------------------------")
// -------------------------DSL--------------------------------------
// 4.1、统计出每个班级的最高分数
studentDF.groupBy("classID").agg(max("score")).show()
// 4.2、统计出男生最高分
studentDF.where($"sex"==="男").agg(max("score")).show()
// 4.3、统计出女生最高分
studentDF.where($"sex"==="女").agg(max("score")).show()
// 4.4、分别统计出男生和女生的分数前三名
val window: WindowSpec = Window.partitionBy("sex").orderBy($"score".desc)
studentDF.select($"name",$"sex",$"classID",$"score",row_number().over(window ).as("rk"))
.where("rk<=3")
.show()
// 4.5、统计出分数在500分以上的人数
studentDF.where($"score">500).select($"name",$"sex",$"classID",$"score",count($"name").over().as("count")).show()
// 4.6、统计出分数在300分以下的人中男女各占多少
studentDF.where($"score"<300).groupBy("sex").count().show()
}
}
5.请使用Spark sql读取class_info文件夹写的csv文件
5.1、统计出哪个院系的专业最多
5.2、统计出计算机学院中有多少专业
5.3、统计出经济管理学院的会计和工商管理的班级数
5.4、分别统计出每个学院的班级最多的专业
5.5、统计出班级编号以2开头的所有的专业名称
package com.wzy.code.code05
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
object SparkSQL {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("Sql").master("local[*]").getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
val csvDF = spark.read.csv("class_info")
//0101,计算机一班,20170901,计算机学院
//班级编号,班级名称,入学日期,所属院系中文名
val classz = csvDF.toDF("id", "className", "date", "college")
classz.show()
/**5.1、统计出哪个院系的专业最多
5.2、统计出计算机学院中有多少专业
5.3、统计出经济管理学院的会计和工商管理的班级数
5.4、分别统计出每个学院的班级最多的专业
5.5、统计出班级编号以2开头的所有的专业名称**/
import spark.implicits._
import org.apache.spark.sql.functions._
classz.createOrReplaceTempView("student")
spark.udf.register("subString",(str:String)=>{
str.substring(0,str.length-2)
})
//
//5.1、统计出哪个院系的专业最多
// spark.sql(" select college, from student group by college ").show()
spark.sql("select college,subString(className) sub from student").createOrReplaceTempView("t1")
spark.sql("select college,count(distinct(sub)) count from t1 group by college ").createOrReplaceTempView("t2")
spark.sql("select * from t2 order by count desc limit 1 ").show()
// 5.2、统计出计算机学院中有多少专业
spark.sql("select college,count(distinct(subString(className))) from student where college='计算机学院' group by college").show()
//5.3、统计出经济管理学院的会计和工商管理的班级数
// spark.sql("select college,count(className) from student where college='经济管理学院' AND className like '会计%'").show()
spark.sql("select college,count(className) from student where college='经济管理学院' AND className like '会计%' group by college" ).show()
spark.sql("select college,count(className) from student where college='经济管理学院' AND className like '工商%' group by college" ).show()
//5.4、分别统计出每个学院的班级最多的专业
spark.sql(
"""
|select
| id,
| college,
| subString(className)cname,
| date
|from
| student
""".stripMargin).createOrReplaceTempView("s1")
spark.sql("select college ,cname,count(id) count from s1 group by college,cname").createOrReplaceTempView("s2")
spark.sql("select *,row_number() over(partition by college order by count desc ) rank from s2 having rank<=1").show
//5.5、统计出班级编号以2开头的所有的专业名称
spark.sql("select distinct(subString(className)) from student where id like '02%'").show()
println("------------------------------------------------")
println("------------------------------------------------")
println("------------------------------------------------")
//5.1、统计出哪个院系的专业最多
classz.select($"college",callUDF("subString",$"className")).distinct().groupBy("college").count()
.sort($"count".desc).limit(1).show()
//5.2、统计出计算机学院中有多少专业
classz.select($"college",callUDF("subString",$"className")).distinct()
.where($"college"==="计算机学院").groupBy($"college").count().show()
//5.3、统计出经济管理学院的会计和工商管理的班级数
// spark.sql("select college,count(className) from student where college='经济管理学院' AND className like '会计%' group by college" ).show()
classz.where("college='经济管理学院' AND className like '会计%'").groupBy("college").count().show()
classz.where("college='经济管理学院' AND className like '工商%'").groupBy("college").count().show()
//5.4、分别统计出每个学院的班级最多的专业
val dlsFour = classz.select($"college", callUDF("subString", $"className").as("cname")).groupBy($"college", $"cname").count()
def window: WindowSpec = Window.partitionBy($"college").orderBy($"count".desc)
dlsFour.select($"college",$"cname",$"count",row_number().over(window).as("rk")).where("rk<=1").show()
//5.5、统计出班级编号以2开头的所有的专业名称
classz.select(callUDF("subString",$"className")).where("id like '02%'").distinct().show()
}
}
SQL练习题(6-50)
表结构:
表数据:
TXT文件:
teacher.txt
804 李诚 男 1958-12-02 副教授 计算机系
856 张旭 男 1969-03-12 讲师 电子工程系
825 王萍 女 1972-05-05 助教 计算机系
831 刘冰 女 1977-08-14 助教 电子工程系
score.txt
103 3-245 86
105 3-245 75
109 3-245 68
103 3-105 92
105 3-105 88
109 3-105 76
101 3-105 64
107 3-105 91
108 3-105 78
101 6-166 85
107 6-166 79
108 6-166 81
course.txt
3-105 计算机导论 825
3-245 操作系统 804
6-166 数字电路 856
9-888 高等数学 831
student.txt
108 丘东 男 1977-09-01 95033
105 匡明 男 1975-10-02 95031
107 王丽 女 1976-01-23 95033
101 李军 男 1976-02-20 95033
109 王芳 女 1975-02-10 95031
103 陆君 男 1974-06-03 95031
准备工作
读取数据:
val spark = SparkSession.builder().master("local[*]").appName("sql").getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
/**--------------设置scheam-----------**/
val studentScheam = StructType(Seq(
StructField("Sno", StringType, true),//学号
StructField("Sname", StringType, true),//学生姓名
StructField("Ssex", StringType, true),//学生性别
StructField("Sbirthday", StringType, true),//学生出生年月
StructField("Class", StringType, true)//学生所在班级
))
val courseScheam = StructType(Seq(
StructField("Cno", StringType, true),//课程号
StructField("Cname", StringType, true),//课程名称
StructField("Tno", StringType, true)//教工编号
))
val scoreScheam = StructType(Seq(
StructField("Sno", StringType, true),//学号
StructField("Cno", StringType, true),//课程号
StructField("Degree", IntegerType, true)//成绩
))
val teacherScheam = StructType(Seq(
StructField("Tno", StringType, true),//教工编号
StructField("Tname", StringType, true),//教工姓名
StructField("Tsex", StringType, true),//教工性别
StructField("Tbirthday", StringType, true),//教工出生年月
StructField("Prof", StringType, true),//职称
StructField("Depart", StringType, true)//教工所在部门
))
import spark.implicits._
/**----------读取数据--------------**/
val studentFile = sc.textFile("student/student.txt")
val courseFile = sc.textFile("student/course.txt")
val scoreFile = sc.textFile("student/score.txt")
val teacherFile = sc.textFile("student/teacher.txt")
/**-------------处理数据-----------------**/
val studentRow = studentFile.map(_.split(" ")).map(student => Row(student(0), student(1), student(2), student(3), student(4)))
val courseRow = courseFile.map(_.split(" ")).map(course => Row(course(0), course(1), course(2)))
val scoreRow = scoreFile.map(_.split(" ")).map(score => Row(score(0), score(1), score(2).toInt))
val teacherRow = teacherFile.map(_.split(" ")).map(teacher => Row(teacher(0), teacher(1), teacher(2), teacher(3), teacher(4),teacher(5)))
/**------------设置表--------------**/
spark.createDataFrame(studentRow,studentScheam).createOrReplaceTempView("student")
spark.createDataFrame(courseRow,courseScheam).createOrReplaceTempView("course")
spark.createDataFrame(scoreRow,scoreScheam).createOrReplaceTempView("score")
spark.createDataFrame(teacherRow,teacherScheam).createOrReplaceTempView("teacher")
/**------打印表------**/
spark.sql("select * from student").show()
spark.sql("select * from course").show()
spark.sql("select * from score").show()
spark.sql("select * from teacher").show()
/**----------获取当前时间的函数---------**/
def getDate(time: String) = {
val now: Long=System.currentTimeMillis()
var df: SimpleDateFormat = new SimpleDateFormat(time)
df.format(now)
}
6.查询Student表中“95031”班或性别为“女”的同学记录。
spark.sql("select * from student where Class='95031' or Ssex='女'").show()
7.以Class降序,升序查询Student表的所有记录。
spark.sql("select * from student order by Class asc").show()
8.以Cno升序、Degree降序查询Score表的所有记录。
spark.sql("select * from score order by Cno asc,Degree desc").show()
9.查询“95031”班的学生。
spark.sql("select * from student where Class='95031'").show()
10.查询Score表中的最高分的学生学号和课程号。(子查询或者排序)
spark.sql("select * from score where Degree=(select max(Degree) from score)").show()
11.查询每门课的平均成绩。
spark.sql("select c.Cname,avg(s.Degree) from score s join course c on s.Cno=c.Cno group by c.Cname ").show()
12.查询Score表中至少有5名学生选修的并以3开头的课程的平均分数。
spark.sql("select c.Cname,avg(s.Degree) from score s join course c on s.Cno=c.Cno and s.Cno like '3%' group by c.Cname having count(1)>=5 ").show()
13.查询分数大于70,小于90的Sno列。
spark.sql("select Sno from score where Degree BETWEEN 70 AND 90 ").show()
14.查询所有学生的Sname、Cno和Degree列。
spark.sql("select st.Sname,s.Cno,s.Degree from student st join score s on s.Sno=st.Sno").show()
15.查询所有学生的Sno、Cname和Degree列。
spark.sql("select st.Sno,c.Cname,s.Degree from student st join score s join course c on s.Sno=st.Sno and s.Cno=c.Cno").show()
16.查询所有学生的Sname、Cname和Degree列。
spark.sql("select st.Sname,c.Cname,s.Degree from student st join score s join course c on s.Sno=st.Sno and s.Cno=c.Cno").show()
17.查询“95033”班学生的平均分。
spark.sql("select st.Class,avg(s.Degree) from score s join student st on s.Sno=st.Sno group by st.Class having st.Class='95033'").show()
18.查询所有选修“计算机导论”课程的“女”同学的成绩表。
spark.sql("select * from student st join score s join course c on s.Sno=st.Sno AND st.Ssex='女' AND c.Cno=s.Cno And c.Cname='计算机导论' ").show()
19.查询选修“3-105”课程的成绩高于“109”号同学成绩的所有同学的记录。
spark.sql("select * from student st join score s on s.Sno=st.Sno AND s.Cno='3-105' where s.Degree>=(select s.Degree from student st join score s on s.Sno=st.Sno AND st.Sno='109' AND s.Cno='3-105')").show()
20.查询score中选学多门课程的同学中分数为非最高分成绩的记录。
spark.sql("select * from score where Degree!=(select MAX(Degree) from score)").show()
21.查询成绩高于学号为“109”、课程号为“3-105”的成绩的所有记录。
spark.sql("select * from score where Degree >(select max(Degree) from score where Sno='109' AND Cno='3-105')").show()
22.查询和学号为105的同学同年出生的所有学生的Sno、Sname和Sbirthday列。
//UDF函数
spark.udf.register("subBirthday",(bir:String)=>(
bir.split("-")(0)
))
spark.sql("select * from student where subBirthday(Sbirthday)=(select subBirthday(Sbirthday) from student where Sno='105')").show()
`` spark.udf.register("subBirthday",(bir:String)=>(
bir.split("-")(0)
))
spark.sql("select * from student where subBirthday(Sbirthday)=(select subBirthday(Sbirthday) from student where Sno='105')").show()`
23.查询“张旭“教师任课的学生成绩
spark.sql("select * from teacher t join course c on c.Tno=t.Tno join score s on s.Cno=c.Cno where t.Tname='张旭'").show()
24.查询选修某课程的同学人数多于4人的教师姓名。
spark.sql("select t.Tname from teacher t join course c on c.Tno=t.Tno join score s on s.Cno=c.Cno group by t.Tname having count(1)>=4").show()
25.查询95033班和95031班全体学生的记录。
spark.sql("select * from student where Class='95033' or Class='95031'").show()
26.查询存在有85分以上成绩的课程Cno.
spark.sql("select * from course c join score s on s.Cno=c.Cno where s.Degree>85").show()
27.查询出“计算机系“教师所教课程的成绩表。
spark.sql("select * from teacher t join course c on t.Tno=c.Tno join score s on s.Cno=c.Cno where t.Depart='计算机系'").show()
28.查询“计算机系”与“电子工程系“不同职称的教师的Tname和Prof。
spark.sql("select Tname,Prof from Teacher a where Prof not in(select Prof from Teacher b where a.Depart!=b.Depart)").show()
29.查询选修编号为“3-105“课程且成绩至少高于选修编号为“3-245”的同学的Cno、Sno和Degree,并按Degree从高到低次序排序。
spark.sql("select * from score where Cno='3-105' AND Degree>(select MIN(Degree) from score where Cno='3-245') order by Degree desc ").show()
30.查询选修编号为“3-105”且成绩高于选修编号为“3-245”课程的同学的Cno、Sno和Degree
spark.sql("select * from score where Cno='3-105' AND Degree>(select MAX(Degree) from score where Cno='3-245') ").show()
31.查询所有教师和同学的name、sex和birthday.
spark.sql(
"""
|select Sname,Ssex,Sbirthday from student
|union
|select Tname,Tsex,Tbirthday from teacher
|""".stripMargin).show()
32.查询所有“女”教师和“女”同学的name、sex和birthday.
spark.sql(
"""
|select Sname,Ssex,Sbirthday from student where Ssex='女'
|union
|select Tname,Tsex,Tbirthday from teacher where Tsex='女'
|""".stripMargin).show()
33.查询成绩比该课程平均成绩低的同学的成绩表。
spark.sql("select Sno,Cno,Degree from Score a where a.Degree<(select AVG(Degree) from Score b where a.Cno=b.Cno)").show()
34.查询所有任课教师的Tname和Depart.
spark.sql("select * from teacher where Tname in(select Tname from teacher t join course c on t.Tno=c.Tno join score s on s.Cno=c.Cno)").show()
35.查询所有未讲课的教师的Tname和Depart.
spark.sql("select * from teacher where Tname not in(select Tname from teacher t join course c on t.Tno=c.Tno join score s on s.Cno=c.Cno)").show()
36.查询至少有2名男生的班号。
spark.sql("select Class from student where Ssex='男' group by Class having count(1)>=2 ").show()
37.查询Student表中不姓“王”的同学记录。
spark.sql("select * from student where Sname not like '王%'").show()
38.查询Student表中每个学生的姓名和年龄。将函数运用到spark sql中去计算,可以直接拿String的类型计算不需要再转换成数值型 默认是会转换成Double类型计算浮点型转整型
spark.sql("SELECT Sname, ("+ getDate("yyyy") +" - substring(sbirthday, 0, 4)) AS age FROM student t").show()
spark.sql("SELECT Sname, ("+ getDate("yyyy") +" - substring(sbirthday, 0, 4)) AS age FROM student t").show()
39.查询Student表中最大和最小的Sbirthday日期值。 时间格式最大值,最小值
spark.sql("SELECT MAX(t.sbirthday) AS maximum FROM student t").show()
spark.sql("SELECT MIN(t.sbirthday) AS minimum FROM student t").show()
40.以班号和年龄从大到小的顺序查询Student表中的全部记录。 查询结果排序
spark.sql("SELECT * " +
"FROM student " +
"ORDER BY Class DESC, CAST("+ getDate("yyyy") +" AS INT) - CAST(substring(Sbirthday, 0, 4) AS INT) DESC").show()
41.查询“男”教师及其所上的课程。
spark.sql("select * from teacher t join course c on t.Tno=c.Tno where Tsex='男'").show()
42.查询最高分同学的Sno、Cno和Degree列。
spark.sql("select * from score where Degree=(select MAX(Degree) from score) ").show()
43.查询和“李军”同性别的所有同学的Sname.
spark.sql("select * from student where Ssex=(select Ssex from student where Sname='李军') AND Sname!='李军'").show()
44.查询和“李军”同性别并同班的同学Sname.
spark.sql("select * from student where Ssex=(select Ssex from student where Sname='李军') AND Class=(select Class from student where Sname='李军') AND Sname !='李军'").show()
45.查询所有选修“计算机导论”课程的“男”同学的成绩表。
spark.sql("select * from student st join score s on s.Sno=st.Sno join course c on s.Cno=c.Cno where st.Ssex='男' AND c.cname='计算机导论' ").show()
46.查询Student表中的所有记录的Sname、Ssex和Class列。
spark.sql("select Sname,Ssex,Class from student").show()
47.查询教师所有的单位即不重复的Depart列。
spark.sql("select distinct Depart from teacher").show()
48.查询Student表的所有记录
spark.sql("select * from student").show()
49.查询Score表中成绩在60到80之间的所有记录。
spark.sql("select * from score where Degree BETWEEN 60 AND 80").show()
50.查询Score表中成绩为85,86或88的记录。
spark.sql("select * from score where Degree in(85,86,88)").show()