Spark SQL原理与应用
1、关系型数据库的运行架构:
SELECT a1,a2,a3 FROM tableA Where condition
(1)SQL语句的结构
该语句是由Projection(al,a2,a3)、Data Source(tableA)、Filter(condition)组成的,分别对应SQL查询过程中的Result、Data Source、Operation。也就是说,SQL 语句是按Result –> Data Source –> Operation的次序来描述的。但在实际的执行过程中,是按Operation –> Data Source –> Result的次序来进行的,和SQL语句的次序刚好相反。
(2)SQL语句的运行过程
- 一般的数据库系统先将读入的SQL语句(Query)进行解析(Parse),分辨出SQL语句中的关键词(如SELECT、FROM、WHERE)、表达式、Projection和Data Source等。从而判断SQL语句是否规范,不规范就报错,规范就继续下一步过程绑定(Bind)。
- 过程绑定是将SQL语句和数据库的数据字典(列、表、视图等)进行绑定,如果相关的Projection、Data Source等都存在,就表示这个SQL语句是可以执行的。
- 在执行前,一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划(Optimize),最终执行(Execute)该计划,并返回结果。
在执行过程中,有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。
2、Spark SQL的性能
2.1 内存列存储(In-Memory Columnar Storage)
Spark SQL的表数据在内存中存储不是采用原生态的JVM对象存储方式,而是采用内存列存储,如图所示。该存储方式无论在『空间占用量』和『读取吞吐率』上都占有很大优势。
(1)对于原生态的JVM对象存储方式:
- 空间占用量大:由于数据类型不同,内存中字节存放多有空余,每个对象通常要增加12~16字节的额外开销。
- 对象多,垃圾回收慢:使用这种方式,每个数据记录产生一个JVM对象,如果是大小为200B的数据记录,32GB的堆栈将产生1.6亿个对象。这么多的对象,对于GC来说;可能要消耗几分钟的时间来处理(JVM的垃圾收集时间与堆栈中的对象数量呈线性相关)。显然这种内存存储方式对于基于内存计算的Spark来说,很昂贵也负担不起。
(2)对于内存列存储来说:
- 数组存储:将所有原生数据类型的列采用原生数组存储,将Hive支持的复杂数据类型(如array、map等)先序化后并接成一个字节数组来存储。这样,每个列创建一个JVM对象,从而导致可以快速地GC和紧凑的数据存储。
- 高效压缩:还可以使用低廉CPU开销的高效压缩方法(如字典编码、行长度编码等压缩方法)来降低内存开销。
- 列内存连续:对于分析查询中频繁使用的聚合特定列,性能会得到很大的提高,原因就是这些列的数据放在一起,更容易读入内存进行计算。
2.2 字节码生成技术(bytecode generation,即CG)
- 查询语句中的表达式操作多次涉及虚函数的调用,虚函数的调用会打断CPU的正常流水线处理,减缓执行速度。
- codegen模块使用动态字节码生成技术,使Spark SQL在执行物理计划时,会对匹配的表达式采用特定的代码动态编译,然后运行,简化程序运行过程。
3、Spark SQL的运行架构:
Spark SQL对SQL语句的处理和关系型数据库对SQL语句的处理采用了类似的方法,首先会将SQL语句进行解析(Parse),然后形成一个Tree。后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。
在整个SQL语句的处理过程中,Tree和Rule相互配合,完成了解析、绑定(在Spark SQL中称为Analysis)、优化、物理计划等过程,最终生成可以执行的物理计划。
4、Spark SQL的运行过程:
Spark SQL有两个分支:sqlContext和hiveContext:
- sqlContext现在只支持SQL语法解析器(SQL-92语法);
- hiveContext 现在支持SQL语法解析器和HiveSQL语法解析器,默认为Hive SQL语法解析器,用户可以通过配置切换成SQL语法解析器来运行HiveQL不支持的语法,如select 1。
4.1 sqlContext
(1)SQL 语句经过SqlParse解析成Unresolved LogicalPlan。
(2)使用analyzer 结合数据数据字典(catalog)进行绑定,生成resolved LogicalPlan。
(3)使用optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan。
(4)使用SparkPlan将LogicalPlan 转换成PhysicalPlan。
(5)使用prepareForExecution()将PhysicalPlan转换成可执行物理计划。
(6)使用execute)执行可执行物理计划。
(7)生成SchemaRDD。
4.2 hiveContext
(1)SQL语句经过HiveQl.parseSql解析成Unresolved LogicalPlan,在这个解析过程中对hiveql 语句使用getAst()获取AST树,然后再进行解析。
(2)使用analyzer 结合数据Hive源数据Metastore(新的catalog)进行绑定,生成resolved LogicalPlan。
(3)使用optimizer对resolved LogicalPlan 进行优化,生成optimized LogicalPlan,优化前使用了ExtractPythonUdfs(catalog.PrelnsertionCasts(catalog.CreateTables(analyzed)))进行预处理。
(4)使用hivePlanner将LogicalPlan 转换成PhysicalPlan。
(5)使用prepareForExecution()将PhysicalPlan 转换成可执行物理计划。
(6)使用execute()执行可执行物理计划。
(7)执行后,使用map(_.copy)将结果导入SchemaRDD。
hiveContext继承自sqlContext,所以在hiveContext的的运行过程中除了override的函数和变量,可以使用和sqlContext一样的函数和变量。
5、Spark SQL的应用
5.1 SchemaRDD(DataFrame)
Spark SQL引入了一种新的RDD——SchemaRDD(DataFrame),SchemaRDD由行对象(row)以及描述行对象中每列数据类型的schema组成。SchemaRDD很像传统数据库中的表。SchemaRDD可以通过RDD、Parquet 文件、JSON文件或者通过使用HiveQL 查询Hive数据来建立。
SchemaRDD除了可以和RDD一样操作外,还可以通过registerTempTable注册成临时表,然后通过SQL语句进行操作。
input = hiveCtx.jsonFile(inputFile)
// 注册输入的SchemaRDD
input.registerTempTable("tweets")
// 依据retweetCount( 转发计数)选出推文
topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets
ORDER BY retweetCount LIMIT 10""")
使用registerTempTable注册表是一个临时表,生命周期只在所定义的sqlContext或hiveContext实例之中。换言之,在一个sqlContext(或hiveContext)中registerTempTable的表不能在另一个sqlContext(或hiveContext)中使用。
Spark SQL1.1对数据的查询分成了2个分支:sqlContext和hiveContext。至于两者之间的关系,HiveSQL继承了sqlContext,所以拥有sqlontext的特性之外,还拥有自身的特性(最大的特性就是支持Hive)。
5.2 sqlContext的基础应用
首先创建sqlContext,并引入sqlContext.createSchemaRDD以完成RDD隐式转换成SchemaRDD:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
5.2.1 RDD
(1)case class方式
//RDD1演示
case class Person(name:String,age:Int)
val rddpeople = sc.textFile("/SparkSQL/people.txt").map(_.split(",")).map(p =>
Person(pc0),p(1).trim.toInt))
rddpeople.registerTempTable("rddTable")
sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").
map(t =>"Name:"+t(0)).collect().foreach(println)
(2)applySchema方式
//RDD2演示
//导入spark SQL的数据类型和Row
import org.apache.spark.sql._
//创建于数据结构匹配的 schema
val schemaString = "name age"
val schema = StructType(schemaString.split("").map(fieldName =>
StructField(fieldName,StringType,true)))
//创建 rowRDD
val rowRDD = sc.textFile("/SparkSQL/people.txt").map(_.split(",")).map(p =>Row(p(0),p(1).trim))
//用applySchema将schema应用到rowRDD
val rddpeople2 = sqlContext.applySchema(rowRDD,schema)
rddpeople2.registerTempTable("rddTable2")
sqlContext.sql("SELECT name FROM rddTab1e2 WHERE age >= 13 AND age <= 19").map(t =>
"Name:"+t(0)).collect().foreach(println)
5.2.2 parquet文件
将建立的SchemaRDD:people保存成parquet文件:
rddpeople.saveAsParquetFile("/Spark sQL/people.parquet")
将people.parquet读入,注册成表parquetTable:
//parquet演示
val parquetpeople = sqlContext.parquetFile("/Spark SQL/people.parquet")
parquetpeople.registerTempTable("parquetTable")
sqlContext.sql("SELECT name FROM parquetTable WHERE age> = 25").map(t =>
"Name:"+t(0)).collect().foreach(println)
5.2.3 JSON文件
//JSON演示
val jsonpeople = sqlContext.jsonFile("/Spark SQL/people.json")
jsonpeople.registerTempTable("jsonTable")
sqlcontext.sql("SELECT name FROM jsonTable WHERE age >= 25").map(t =>
"Name:"+t(0)).collect().foreach(println)
5.3 hiveContext的基础应用
构建hiveContext:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
Hive数据进行操作:
hiveContext.sql("use saledata")
hiveContext.sql("show tables").collect().foreach(println)
查询所有订单中每年的销售单数、销售总额:
//所有订单中每年的销售单数、销售总额
//三个表连接后以count(distinct a.ordernumber)计销售单数,sum(b.amount)计销售总额
hiveContext.sql("select c.theyear, count(distinct a.ordernumber),sum(b.amount)
from tblStock a join tblstockDetail b on a.ordernumber = b.ordernumber
join tbldate c on a.dateid = c.dateid group by c.theyear order by c.theyear").
collect().foreach(println)
5.4 混合使用
在sqlContext或hiveContext中,来源于不同数据源的表在各自生命周期中可以混用,但是不同实例之间的表不能混合使用。
在sqlContext中混合使用:
//在sqlContext中混合使用
//sqlContext中来自rdd的表rddgab1e和来自parquet文件的表parquetTable混合使用
sqlContext.sql("select a.name,a.age,b.age from rddTable a join parquetTable b
on a.name = b.name").collect().foreach(println)
在hiveContext中混合使用:
//在hiveContext中混合使用
//创建一个hiveTable,并将数据加载,注意people.txt 第二列有空格,所以age取string类型
hiveContext.sql("CREATE TABLE hiveTable(name string,age string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY','
LINES TERMINATED BY'\n'")
hiveContext.sql("LOAD DATA LOCAL INPATH '/home/mmicky/mboo/MyClass/doc/Spark SQL/data/people.txt'
INTO TABLE hiveTable")
//创建一个源自parquet文件的表parquetTable2,然后和hiveTable混合使用
hiveContext.parquetFile("/Spark SQL/people.parquet").registerAsTable("parquetTable2")
hiveContext.sql("select a.name,a.age,b.age from hiveTable a join parquetTable2 b
on a.name = b.name").collect().foreach(println)
5.5 缓存之使用
Spark SQL的cache可以使用两种方法来实现:
- cacheTable()方法;
- CACHE TABLE命令。
千万不要先使用cache SchemaRDD,然后registerAsTable;使用RDD的cache()将使用原生态的cache,而不是针对SQL优化后的内存列存储。看看cacheTable的源代码:
在sqlContext里可以如下使用cache:
//sqlContext的 cache 使用
sqlContext.cacheTable("rddTable")
sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t =>
"Name:"+t(0)).collect().foreach(println)
sqlContext.sql("CACHE TABLE parquetTable")
sqlContext.sql("SELECT name FROM parquetTable WHERE age > = 13 AND age <= 19").map(t =>
"Name:"+t(0)).collect().foreach(println)
观察webUI,可以看到cache的信息(注意cache是“lazy”的,要有action才会实现;uncache是“eager”的,可以立即实现)。
使用如下命令可以取消cache:
sqlContext.uncacheTable("rddTable")
sqlContext.sql("UNCACHE TABLE parquetTable")
5.6 ThriftServer
通过ThriftServer连接JDBC访问Spark SQL:
package doc
import java.sql.DriverManager
object SQLJDBC{
def main(args: Array[ String]){
Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000","hadoop","")
try{
val statement = conn.createStatement
val rs = statement.executeQuery("select ordernumber, amount from tblstockDetail where amount > 3000")
while(rs.next){
val ordernumber = rs.getstring("ordernumber")
val amount = rs.getString("amount")
print1n("ordernumber = $s, amount = %s".format(ordernumber, amount))
}
} catch {
case e: Exception => e.printStackTrace
}
conn.close
}
}
5.7 Spark SQL综合应用
店铺分类:根据销售额对店铺分类,使用Spark SQL和MLlib
使用Spark SQL+MLlib的聚类算法,采用IntelliJ IDEA调试代码,最后生成doc.jar,然后使用spark-submit提交给集群运行。
package doc
import org.apache.log4j.{Level,Logger}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
object SQLMLlib{
def main(args:Array[String]){
//屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//设置运行环境
val sparkConf = new SparkConf().setAppName("SQLMLlib")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
//使用Spark SQL查出每个店的销售数量和金额
hiveContext.sql("use saledata")
hiveContext.sql("SET spark.sql.shuffle.partitions = 20")
val sqldata = hiveContext.sql("select a.locationid, sum(b.qty)totalqty, sum(b.amount)totalamount from tblStock a join tblstockdetail b on a.ordernumber
= b.ordernumber group by a.locationid")
//将查询数据转换成向量
val parsedData = sqldata.map{
case Row(_, totalgty, totalamount) =>
val features = Array[Double](totalqty.tostring.toDouble, totalamount.tostring.toDouble)
Vectors.dense(features)
}
//对数据集聚类,3个类,20次迭代,形成数据模型
//注意这里会使用设置的partition数20
val numClusters = 3
val numIterations = 20
val model = KMeans.train(parsedData, numclusters, numIterations)
//用模型对读入的数据进行分类,并输出
//由于partition没设置,输出为200个小文件,可以使用bin/hdfs dfs-getmerge合并下载到本地
val result2 = sqldata.map{
case Row(locationid, totalqty, totalamount) =>
val features = Array[Double](totalqty.tostring.toDouble, totalamount.toString.toDouble)
val linevectore = Vectors.dense(features)
val prediction = model.predict(linevectore)
locationid+" "+totalqty+" "+ totalamount +" "+prediction
}.saveAsTextFile(args(0))
sc.stop()
}
}