SparkSQL 定义查询和使用


方式一

1.创建SparkSession
val spark = SparkSession.builder().master("local[2]")
						.appName("SparkSqlPractice")
						.getOrCreate()
2.读取数据为DataSet
val ds: Dataset[String] = spark.read.textFile("spark-sql/data/access.log")
3.转成DataFrame并指定列名称
val output = "spark-sql/out/access"

    import spark.implicits._

    val df: DataFrame = ds.map(x => {
      val splits = x.split("\\|")
      val ip = splits(0)
      val responsetime = splits(2)
      val httpcode = splits(5)
      val requestsize = splits(6).toInt
      val province = splits(16)
      val city = splits(17)
      val isp = splits(18)
      (ip, responsetime, httpcode, requestsize, province, city, isp)
    }).toDF("ip", "responsetime", "httpcode", "requestsize", "province", "city", "isp")
4.保存成文件
// 保存成各种形式的文件
df.write.format("parquet").mode("overwrite").save(output)
df.write.format("json").mode("overwrite").save(output)
df.write.format("orc").mode("overwrite").save(output)
5.创建临时表使用SQL查询
  // 创建临时表并查询
  df.createTempView("access")
  spark.sql("select province,sum(requestsize) total from access group by province").show(10, false)
6.使用API的方式查询
//使用api方式查询
import org.apache.spark.sql.functions._    
df.groupBy("province").agg(sum("requestsize").as("traffics")).sort('traffics.desc).show(false)
7.SQL方式实现分组求TOPN
// SQL 方式实现分组求province访问次数最多的TopN
df.createTempView("access")
val topNSQL =
"""
|select * from (
|select t.*,row_number() over(partition by province order by cnt desc) as r
|from
|(select province,city,count(1) cnt
|	from access
|	group by province,city) t)a
|where a.r < 2 order by a.province,a.r asc
|""".stripMargin

spark.sql(topNSQL).show()
8.API方式实现分组求TOPN

todo:


方式二

1.创建saprkSession
val spark = SparkSession.builder()
						.appName("SparkSQLAPP")
						.master("local")
						.getOrCreate()
2.读取文件内容
val studentData: RDD[Row] = spark.sparkContext.textFile("spark-sql/data/student.txt").map(line => {
      val splits = line.split(",")
      Row(splits(0).toInt, splits(1), splits(2).toInt)
    })
3.定义schema信息
val studentSchema = StructType(mutable.ArraySeq(
     StructField("sno", IntegerType, nullable = false),
     StructField("sname", StringType, nullable = false),
     StructField("sage", IntegerType, nullable = false)
   ))
4.创建DataFrame
val studentDF: DataFrame = spark.createDataFrame(studentData, studentSchema)
5.创建临时表
studentDF.createTempView("student")
6.查询数据
spark.sql("select sno,sname,sage from student").show()

猜你喜欢

转载自blog.csdn.net/qq_43081842/article/details/105305820