黑哥-细说spark SQL

spark-SQL

作者:黑哥

1, 介绍

sparkSQL是spark上高级模块,sparkSQL是一个SQL解析引擎,将SQL解析成特殊的RDD(DataFrame),然后再Spark集群中运行。

sparkSQL是用来处理结构化数据的(先将非结构化的数据转换成结构化数据)

sparkSQL支持两种编程API

  • 1,SQL方式
  • 2,DataFrame的方式(DSL)

sparkSQL兼容hive(元数据库、SQL语法、UDF、系列化、反序列化机制)

sparkSQL支持统一数据源,课程读取多种类型的数据

sparkSQL提供了标准的连接(JDBC、ODBC)

sparkSQL 1.x和2.x的编程API有一些变化,企业都有使用

spark1.x的方式:

创建一个SQLContext

1,创建一个SparkContext,然后再创建SQLContext

2,创建RDD,对数据进行整理,然后关联case class,将非结构化数据转换成结构化数据

3,显示的调用toDF方法将RDD转换成DataFrame

4,注册临时表

5,执行SQL(Transformation,lazy)

6,执行Action

2、 AIP

Spark SQL API 由 SQLContext、 HiveContext 、 DataFrame 这三个关键抽象组成。 Spark SQL 应用使用这些抽象来处理数据。

2.1 SQLContext

SQLContext 是 Spark SQL 库的入口点。 它是 Spark SQL 库中定义的类。 Spark SQL 应 用必须创建一个 SQLContext 或 HiveContext 的类实例。

只有有了 SQLContext 类实例,才能创建 Spark SQL 库提供的其他类的类实例。 同样, 只有有了 SQLContext 类实例,才能执行 SQL 查询。

2.2 创建 SQLContext实例

//只有有了 SparkContext 类实例,才能创建 SQLContext 类实例。 //一个应用可以像下面展 示的这样创建-个 SQLContext 类实例。

//只有有了 SparkContext 类实例,才能创建 SQLContext 类实例。
//一个应用可以像下面展 示的这样创建-个 SQLContext 类实例。
import org.apache.spark._ 
import org.apache.spark.sql._ 
val config = new SparkConf(). setAppName(my Spark SQL app”) 
val sc = new SparkContext(config) 
val sqlContext = new SQLContext(sc)

注意:在 Spark shell 中我们不需要创建 SQLContext 类实例,原因会在介绍 HiveContext 时进行说明。 因此,如果你正在使用 Spark shell,就可以跳过创建 SparkConf、 SparkContext 和 SQLContext 这些类的类实例的步骤。

2.3 创建 HiveContext 类实例

import org.apache.spark._ 
import org. apache. spark. sql. _ 

val config = new SparkConf().setAppName{"My Spark SQL app”) 
val sc = new SparkContext(config) 
val hiveContext = new HiveCo川ext(sc) 
//用代码执行 HiveQL 查询 
val resultSet = hiveContext.sql(”SELECT count(l) FROM my_hive_table“) 

2.4 创建 Data Frame

介绍:

DataFrame 是 Spark SQL 的主要数据抽象。 它表示若干行的分布式数据,每一行有若 干个有名字的列。它是受 R 和 Python 的影响而产生的。 从概念上,它类似于关系数据库中 的表。

DataFrame 是 Spark SQL 库中定义的类。 它提供了各种方法用于处理、分析结构化数据。 举例来说,它提供的方法可以用于选择列,过滤行,聚合列.连接表.抽样数据,以 及其他一些常见的数据处理任务。

与 RDD 不同 , DataFrame 不需要预设模式。 RDD 就是一个隐晦数据分区的集合。 而 DataFrame 却知道数据集中每一列的名字和类型。 因此, DataFrame 类可以提供用于数据处 理的丰富的领域专属语言( DSL) 。

相比于 RDD API, DataFrame API 更为易懂易用。 然而,如果有需要,可以把 DataFrame 当成 RDD 来使用。 可以从一个 DataFrame 上创建一个 RDD。 而且, RDD 的所有接口都可 以用于处理用 DataFrame 表示的数据。

一个 DataFrame 可以被当成一个临时表注册到应用上,在上面可以使用 SQL 或 HiveQL 来进行查询。 只有注册到的应用正在运行,这个临时表才可以使用c

行是 Spark SQL 的一个抽象,用于表示一行数据。 从概念上看,它等价于一个关系元 组或表中的一行L Spark SQL 提供了创建 Row 对象的工厂方法。 下面是一个例子。

Spark SQL 提供了创建 Row 对象的工厂方法。 下面是一个例子。

import org. apache. spark. sql. _

val rowl = Row(”Barack Obama”,”President”,”United States”)

val row2 = Row("David Cameron", "Prime Minister",”United Kingdom”)

一个 Row 对象中某一列的值可以使用序号来获取。 下面是一个例子。

val presidentName = row1.get5tring(o)

val country = row1.getString(2)

2.5 创建 DataFrame

创建 DataFrame 有两种方式:1,从数据源创建;2,从RDD创建DataFrame

  • 从 RDD 创建 DataFrame

Spark SQL 提供了两个方法用于从 RDD 创建 DataFrame: toDF 和 createDataFrame.

toDF:Spark SQL 提供了一个名为 toDF 的隐式转换方法,用于从 RDD 中创建DataFrame,这 个 ROD 中的每一个对象由一个样本类表示。 当使用这个方法时, Spark SQL 将会自行推断 数据集的数据格式。 RDD 类中并没有定义 toDF 方法,但是可以通过隐式转换来使用它。 为了能使用 toDF 方法将 RDD 转换成 DataFrame ,需要导人 implicits 对象中定义的隐式方法, 如下所示。

RDD 类中并没有定义 toDF 方法,但是可以通过隐式转换来使用它。 为了能使用 toDF 方法将 RDD 转换成 DataFrame ,需要导人 implicits 对象中定义的隐式方法,

举例来说:

假设我们有一个 csv 文件,它存储公司员工的信息。 文件的每一行记 录一个员工的姓名、年龄和性别。 下面的代码展示怎么使用 toDF 方法来从 RDD 中创建DataFrame

import org.apache.spark._ 
import org. apache. spark. sql. _ 

val config = new SparkConf(). setAppName(”my Spark SQL app”) 
val sc = new SparkContext(config) 
val sqlContext = new SQLContext(sι) 

import sqlContext.implicits._ 
case class Employee(name: String, age: Int, gender: String) 
val rowsRDD = sc.textFile("path/to/employees.csv")
val employeesRDD = rowRDD.map(row => row.split(","))
.map(cols => Employee(cols(0),cols(1).trim.toInt,cols(2)))
val employeesDF = employeesRDD. toDF() 

createDataFrame

createDataframe 方法有两个参数: 一个由行构成的 RDD 和一个数据格式,它返回一个 Dataframe。

数据集的数据格式可以是一个 StructType 类实例。 其中, StructType 是一个样本类。一 个 StructType 对象包含了一个 Structfield 对象序列。 Structfield 也是样本类。 它用于指定 一列的名字和数据类型,并且可选择性地指定这一列是否包含空值及其元数据。 包 org.apache.spark.sql句pes 定义了 Spark SQL 的 StructType 和 StructField 支持的各种 数据类型。 在使用 createDataframe 方法时,需要导人这个包。 让我们来看看在上一节中用到的例子。假设我们有一个 csv 文件,它存储公司员工的 信息。 文件的每一行记录一个员工的姓名、年龄和性别。 这一次使用 createDataframe 方法 来从 RDD 中创建 Dataframe.

import org.apache.spark._ 
import org. apache. spark. sql. _ 
import org.apache.spark.sql.types._ 

val config = new SparkConf (). setAppName( "My Spark SQL app”) 
val sc = new SparkContext(config) 
val sqlContext = new SQLContext(sc) 

val linesRDD = sc.textFile(”path/to/employees.csv”) 
val rowsRDD = linesRDD.map{row => row.split(",")}
.map{cols => Row(cols(o), ιols(1).trim.t0Int, cols(2))} 
val schema = StructType(List(
    StructField(”name飞 StringType, false), 
    StructField(”age”, IntegerType, false), 
    StructField(”gender”, StringType, false) 
 ))
 val employeesDF = sqlContext.createDataFrame(rowsRDD,schema) 

ere at巳DataFrame 方法并不需要写死数据格式。可以在运行时创建各种不同的 StructType 对象。 举例来说,可以将配置文件中配置的数据格式传递给它。而且,同一个应用可 以处理多个不同的数据源,而不需要重新编译。

toDF 方法和 createDataFrame 方法的区别就在于前者自己推断数据格式,后者则需要 指定格式。而且, toDF 看上去更加易用,然而, createDataFrame 方法更灵活。

从数据源创建 DataFrame

Spark SQL 为多种数据源提供了统一的接口用于创建 DataFrame。举例来说,同一个 API 可以从 MySQL 、 PostgreSQL 、 Oracle 或 Cassandra 表中创建 DataFrame。类似地,同 一个 API 可以从 Parquet、 JSON 、 ORC 或 csv 文件中创建 DataFrame,这个文件可以位于 本地文件系统、 HDFS 、 S3 o Spark SQL 原生支持一些常用的数据源,包括 Parquet、 JSON 、 Hive、兼容 JDBC 的数 据库。 对于其他数据源还可以使用外部库。 Spark SQL 提供了一个名为 DataFrameReader 的类,在这个类中定义了从数据师、读取数 据的接口 。 它使得你可以为读取数据设置不同的选项。通过它的创建方法,可以指定数据 格式、分区以及其他数据糠的一些特定选项。 SQLContext 和 HiveContext 类都提供一个名 为 read 的工厂方法,这个方法返回 DataFrameReader 类的一个实例。

实例 从不同数据源创建 DataFrame

import org.apache.spark._ 
import org. apache. spark. sql. _ 

val config =new SparkConf().setAppName(”My Spark SQL app”) 
val sc = new SparkContext(config) 
val sqlContext = new org. apache. spark. sql. hive. HiveContext ( sc) 

// create a DataFrame from parquet files 
val parquetDF = sqlContext.read 
.format(”org.apache.spark.sql.parquet") 
.load(”path/to/Parquet-file-or-directory") 

// create a DataFrame from JSON files 
val jsonDF = sqlContext.read 
.format(”org.apache.spark.sql.json") 
. load( "pathlto/JSON-file-or-directory”) 

// create a DataFrame from a table in a Postgres database 
val jdbcDF = sqlContext.read 
.format(”org.apache.spark.sql.jdbc”) 
.options(Map( 
”url”-〉”jdbc:postgresql://host:port/database?user=<USER>&password=<PASS>”,
"dbtable" -> "schema-name.table-name"))
.load()

//II create a DataFrame from a Hive table 
val hiveDF = sqlContext.read 
.table(”hive-table-name”) 
与创建 RDD 的方法类似,创建 DataFrame 的方法是惰性的。
举例来说, load 方法是惰 性的。 当调用 load 方法时,数据并没有读入进来。 
只有操作才会触发从数据源读取数据。 除了上面展示的这些从各种数据源创建 DataFrame 的
通用方法以外, Spark SQL 还提 供从原生支持的数据源创建 DataFrame 的特殊方法。 
这些原生支持的数据源包括 Parquet、 ORC 、 JSON 、 Hive 和兼容 JDBC 的数据库.

JSON

DataFrameReader 类提供了一个名为 json 的方法,用于从 JSON 数据集中i卖取数据。 json 方法的参数是一个路径,它返回一个 DataFrame 实例。 这个路径可以是 JSON 文件名, 也可以是包含有多个 JSON 文件的目录。

val jsonDF = sqlContext.read.json(”pathltolJSON-file-or-directory”)

Spark SQL 可以从如下 JSON 数据集中创建 DataFrame : Hadoop 支持的存储系统、本 地文件系统上的 JSON 文件、 HDFS 和 S3 .

val jsonHdfsDF = sqlContext.read.json(”hdfs:/INAME_NODElpath/toldata.json") val json53DF = sqlContext.read.json("s3a:llBUCKET_NAMEIFOLDER_NAMEldata.json”)

ORC

DataFram巳Reader 类提供了 一个名为 ore 的方法用于从以 ORC 文件格式存储数据的数 据集中读取数据。 ore 方法的参数是一个路径,它返回一个 DataFrame 实例。

val orcDF : hiveContext.read.orc(”path/to/ore-file-or-directory")

类似于 JSON 数据集和 Parquet 数据集,以 ORC 格式存储的数据集可以位于以下存储 系统中: Hadoop 支持的存储系统、本地文件系统、 HDFS 和 S3 o

val orcHdfsDF : sqlContext.read.orc(”hdfs://NAME_NODE/path/to/data.orc") val orcS3DF : sqlContext.read.orc(”s3a://BUCKET_NAME/FOLDER_NAME/data.orc”)

Hive

有两种方式可以从 Hive 表中创建 DataFrame。 第一种,使用 DataFrameReader 类中定 义的 table 方法。 table 方法的参数是 Hive metastore 中的表名,它返回一个 Dataframe 实例

第一种:

val hiveDF: hiveContext.read.table(”hive-table-name”)

第二种:从 Hive 表中创建 DataFrame 的方法就是使用 HiveContext 类中的 sql方法。

val hiveDF : hiveContext.sql(”SELECT col_a, col_b, col_c from hive-table”)

猜你喜欢

转载自blog.csdn.net/realize_dream/article/details/88371889