SparkSQL---Spark计算引擎模块

一、Spark SQL简介


Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

SparkSQL可以看做是一个转换层,向下对接各种不同的结构化数据源,向上提供不同的数据访问方式。

二、Spark SQL特征

1.易整合


将SQL查询与Spark程序无缝对接。

Spark SQL允许您使用SQL或熟悉的DataFrame API查询Spark程序内的结构化数据。 可用于Java,Scala,Python和R.

2.统一的数据访问方式


以同样的方式连接到任何数据源。

DataFrames和SQL提供了访问各种数据源的常用方式,包括Hive,Avro,Parquet,ORC,JSON和JDBC。 您甚至可以通过这些来源加入数据。

3.兼容Hive


在现有仓库上运行SQL或HiveQL查询。

Spark SQL支持HiveQL语法以及Hive SerDes和UDF,允许您访问现有的Hive仓库。

4.标准的数据连接


通过JDBC或ODBC连接。

服务器模式为商业智能工具提供行业标准的JDBC和ODBC连接。

扫描二维码关注公众号,回复: 10401958 查看本文章

三、RDD、DataFrame 和 DataSet


在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。
在后期的Spark版本中,DataSet会逐步取代RDD和DataFrame成为唯一的API接口。

1.RDD

RDD是一个懒执行的不可变的可以支持Lambda表达式的并行数据集合。
RDD的最大好处就是简单,API的人性化程度很高。
RDD的劣势是性能限制,它是一个JVM驻内存对象,这也就决定了存在GC的限制和数据增加时Java序列化成本的升高。

2.DataFrame

2.1 什么是DataFrames

与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验。

2.2 创建DataFrames

在Spark SQL中SQLContext是创建DataFrames和执行SQL的入口,在spark中已经内置了一个sqlContext。
(1)使用toDF函数创建DataFrame

/./ 需要导入隐式转换包 import SparkSession.implicits._
rdd.toDF(column...)

(2)使用createDataFrame函数创建DataFrame

//读取文件
    val empRDD: RDD[String] = sc.textFile("E:\\test\\scala\\emp.csv")
    val rddRow: RDD[Row] = empRDD.map(line => {
      //7369,SMITH,CLERK,7902,1980/12/17,800,,20
      val fields: Array[String] = line.split(",")
      val empno = fields(0).toLong
      val ename = fields(1)
      val job = fields(2)
      val mgr = fields(3).toLong
      val hiredate = fields(4)
      val sal = fields(5).toInt
      val comm = fields(6).toInt
      val dept = fields(7).toLong
      Row(empno, ename, job, mgr, hiredate, sal, comm, dept)
    })
    //定义schema
    val structType = StructType(
      List(
        StructField("empno", LongType, true),
        StructField("ename", StringType, true),
        StructField("job", StringType, true),
        StructField("mgr", LongType, true),
        StructField("hiredate", StringType, true),
        StructField("sal", DoubleType, true),
        StructField("comm", IntegerType, true),
        StructField("dept", LongType, true)
      )
    )
    val df2:DataFrame = spark.createDataFrame(rddRow, structType)

(3)通过读取文件直接创建DataFrame

// csv是读取文件类型,也可以是 json  text parquet  jdbc  load  orc
SparkSession.read.csv(“文件路径。。。”) 
2.3 DataFrame常用操作
2.3.1 DSL风格语法

(1)查看DataFrame中的内容

personDF.show

(2)看DataFrame部分列中的内容

personDF.select(personDF.col("name")).show
personDF.select(col("name"),col("age")).show
personDF.select("name").show

(3)打印DataFrame的Scheme信息

personDF.printSchema

(4)过滤age大于等于18的

personDF.filter(col("age") >= 18 ).show

(5)按年龄进行分组并统计相同年龄的人数

personDF.groupBy("age").count().show
2.3.2 SQL风格语法

(1)如果想使用SQL风格的语法,需要将DataFrame注册成表
【临时表是Session范围内的,Session退出后,表就失效了。如果想应用范围内有效,可以使用全局表。注意使用全局表时需要全路径访问,如:global_temp.people】

personDF.createOrReplaceTempView("t_person") //spark2.x以上写法
personDF.registerTempTable("t_person")   //spark1.6以下写法

(2)查询年龄最大的前两名

sqlContext.sql("select * from t_person order by age desc limit 2").show

(3)显示表的Schema信息

sqlContext.sql("desc t_person").show

3.DataSet

3.1 什么是DataSet

DataSet是从Spark 1.6开始引入的一个新的抽象。DataSet是特定域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作。每个DataSet都有一个称为DataFrame的非类型化的视图,这个视图是行的数据集。为了有效地支持特定域对象,DataSet引入了Encoder(编码器)。

3.2 创建DataSet

DataSet是具有强类型的数据集合,需要提供对应的类型信息。
(1)从DataFrame进行转换

SparkSession.read.josn("文件路径。。").as[Emp]

(2)通过读取文件直接创建DataSet

SparkSession.read.textFile("文件路径。。")

(3)通过toDS函数将RDD转换成DataSet

sc.textFile("文件路径。。").toDS()
3.3 DataSet使用示例

使用DataSet的进行词频统计

import org.apache.spark.sql.functions._
#创建DataSet
val ds = sqlContext.read.text("hdfs://node-1.itcast.cn:9000/wc").as[String]
val result = ds.flatMap(_.split(" "))
                      .filter(_ != "")
                      .toDF()
                      .groupBy($"value")
                      .agg(count("*") as "numOccurances")
                      .orderBy($"numOccurances" desc)
                      .show()

val wordCount = ds.flatMap(_.split(" ")).filter(_ != "").groupBy(_.toLowerCase()).count().show()

4.三者的共性

(1)RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过。
(3)三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
(4)三者都有partition的概念
(5)三者有许多共同的函数,如filter,排序等
(6)在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持
import spark.implicits._
(7)DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

5.三者的区别

DataSet和RDD主要的区别是:
(1)DataSet是特定域的对象集合;然而RDD是任何对象的集合。
(2)DataSet的API总是强类型的;而且可以利用这些模式进行优化,然而RDD却不行。

DataFrame是特殊的DataSet,它在编译时不会对模式进行检测。

四、以编程方式执行Spark SQL查询

首先在maven项目的pom.xml中添加Spark SQL的依赖。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
package com.sparksql.demo1
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * SparkSql简单使用
  */
object SparkSql01 {
  def main(args: Array[String]): Unit = {
    //创建sparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .appName("sparksql")
      .master("local[*]")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取文件
    val df: DataFrame = spark.read.json("E:\\test\\scala\\people.json")
    //构建临时表
    df.createTempView("peo")//至此内存中多了一张表

    //查询表=> 表名叫做peo(对应RDD的filter(age>20))
    val res: DataFrame = spark.sql("select * from peo where age >20")
    //展示临时表
   res.show()
   res.foreach(x=>{
      //DataFrame的缺点,缺少类型检查,只能通过手动指定
      println(x.getAs[String]("name")+"\t"+x.getAs[Int]("age"))
    })

    //关闭SparkSession
    spark.stop()
  }
}
发布了13 篇原创文章 · 获赞 14 · 访问量 647

猜你喜欢

转载自blog.csdn.net/LCY_1013/article/details/105232046