为什么要有Spark SQL?
以往在使用Hadoop时,Hive作为一个数据仓库,但在使用中,我们更多感觉Hive是一个解析引擎,而Hive的底层走的也是MapReduce,而这个MapReduce是Hadoop的,在前面我们也解释了Hadoop的MapReduce的缺点,那么此时我们是使用了Spark实现的MapReduce计算模型,而相对应的Spark也根据这个计算模型也出了对应的SparkSQL。
什么是Spark SQL?
Spark SQL是Spark上的高级模块,是一个SQL解析引擎,将SQL解析成特殊的RDD(DataFrame),然后在Spark集群上运行。
Spark SQL用来处理结构化数据,所以要先将非结构化数据转化为结构化数据。
特点
1.可以查询结构化数据
2.多数据源访问,可以访问JSON、JDBC等……
3.兼容Hive,而且将Hive好的东西都拿过来了
4.兼容HiveSQL、UDFs、序列化机制等
5.可以使用JDBC或ODBC连接,可以把结果写到关系型数据库,其他的BI工具就可以使用了。
6.和Spark Core无缝集成,写RDD时,可配合Spark SQL实现逻辑
RDD、DataFrames、Dataset
在Spark SQL中,Spark提供了两个新的抽象,分别是DataFrame和Dataset。
RDD(Spark 1.0) à DataFrame(Spark 1.3)à Dataset(Spark 1.6)
备注:底层还是RDD,只是进一步封装,进一步抽象,降低使用门槛,所以都有Transformation和Action的算子区别。
RDD
1.RDD是一个懒执行的,不可变的,可以支持Lambda表达式的并行数据集合。
2.RDD的最大好处就是简单,API的人性化程度很高。
3.RDD的劣势是性能的限制,RDD是一个JVM驻内存对象,也就决定了存在GC的限制和数据增加时Java序列化成本的升高。
DataFrame
与RDD类似,DataFrame也是一个分布式数据容器,然而DataFrame更像传统的数据库二维表格,除数据意外,还记录数据的结构信息,即Schema,同时与Hive类似,DataFrame也支持嵌套数据类型(Struct、Array、Map……)。
从API简易性来看,DataFrame的API提供的是一套高层的关系操作,比如函数的RDD API更加友好,即在使用Spark时,不必知道要使用groupByKey还是reduceByKey好,所以DataFrame会自动帮助我们选择使用哪一个,不然开发者要对Spark RDD的API底层要非常熟悉,SparkSQL进而也降低门槛,只要会SQL,也可以开发Spark程序。
上图直观的体现了DataFrame和RDD的区别,左侧的RDD[Person],虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构,而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚的知道该数据集中包含了哪些列,每列的名称和类型是什么。
DataFrame多了数据的结构信息,即Schema。
DataFrame除了提供比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter、下推等……
DataFrame是为数据提供了Schema的视图,可以把此当做数据库中的一张表对待。
DataFrame也是懒加载的。
DataFrame底层还是RDD,只是对RDD进一步封装,算子还是分Transformation和Action。
性能上比RDD要高,主要是:
1.定制化内存管理,数据以二进制的方式存在于非堆内存,节省了大量的空间之外,还摆脱了GC的限制,DataFrame在堆外内存,申请和回收由Spark自身控制。
2.优化的执行计划,查询计划通过Spark catalyst optimister进行优化,在得到优化执行计划转化为物理执行计划的过程中,还可以根据具体数据源的特性,将过滤条件下推到数据源。
DataFrame劣势:编译器缺少类型安全检查,导致运行时出错。
Dataset
1.是DataFrame API的一个扩展,是Spark最新的数据抽象。
2.用户友好的API风格,既具有类型安全检查,也具有DataFrame的查询优化特性。
3.Dataset支持编解码器,当需要访问非堆上的数据时,可以避免反序列化整个对象,提高了效率。
4.样例类被用来在Dataset中定义数据的结构信息,样例类中的每个属性的名称直接映射到Dataset中的字段名称。
5.DataFrame是Dataset的特例,type DataFrame = Dataset[Row],所以可以通过as将DataFrame转换为Dataset,Row是一个类型,跟Car、Person这些类型一样,所以的表结构信息都用Row来表示。
6.Dataset是强类型的,比如可以有Dataset[Car],Dataset[Person]。
DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候,是没办法在编译时,检查是否类型失败的,比如你可以对一个String进行减法操作,但是执行时才报错。
而Dataset不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。
7.支持更多的智能数据源
8.如果读取HDFS的数据会感知最优位置
9.K-V类型shuffle也有分区器
三者关系
Dataset = DataFrame 智能化版本
DataFrame = RDD + Schema + 优化
RDD = 隐藏MapReduce细节
RDD让开发者决定怎么做,怎么吃饭(倒立吃饭)
DataFrame和Dataset让开发者决定做什么,怎么吃饭(吃饭,不关心细节)
三者共性
1.都是分布式数据集
2.都有partition的概念
3.有许多共同的函数
4.三者之间可以互相转换
5.有许多共同的函数,filter、排序等
6.在对DataFrame和Dataset进行操作都需要import sqlContext.implicits._的支持
备注:Spark SQL的1.x和2.x的API有一些变化。
IDEA编程
pom.xml
<properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.7.3</hadoop.version> <encoding>UTF-8</encoding> </properties> <dependencies> <!-- 导入scala的依赖 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- 导入spark的依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- 指定hadoop-client API的版本 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- 导入spark sql的依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- mysql的连接驱动依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <!-- spark如果想整合Hive,必须加入hive的支持 --> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.2.0</version> </dependency> </dependencies> <build> <pluginManagement> <plugins> <!-- 编译scala的插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <!-- 编译java的插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- 打jar插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> |
Spark 1.x 案例1
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} /** * Spark Version 1.x Api */ object Demo1 { def main(args: Array[String]): Unit = { //创建配置 val conf = new SparkConf().setAppName("SparkSQL").setMaster("local[*]") //创建程序入口 val sc = new SparkContext(conf) //读取数据 val lines: RDD[String] = sc.textFile("hdfs://hadoop100:8020/person") //组装数据,封装数据的类必须是 样例类 case class val boyRDD: RDD[Boy] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val fv = fields(3).toInt Boy(id, name, age, fv) }) //SparkContext创建的RDD是Spark最原始的RDD //而SparkSQL这个模块对专门用SQL去处理数据的, //所以又对RDD做了一层封装,要使用这个封装的内容才能创建属于SparkSQL的RDD //但是程序的入口是SparkContext,所以要创建SparkContext并传入 val sqlContext: SQLContext = new SQLContext(sc) //boyRDD 装的是Boy类型的数据,有了Schema信息,但是还是一个RDD //将RDD转换成SparkSQL操作的那个RDD类型,即DataFrame //这个隐式转换在sqlContext类里面 import sqlContext.implicits._ //转换 val bdf: DataFrame = boyRDD.toDF //第一种API //变成了SparkSQL想要的RDD后,就可以使用两种API编程了 //把DataFrame先注册成为一张临时表 bdf.registerTempTable("t_boy") //在业务操作中,数据一般要先进行ETL,数据要先整理清洗 //书写SQL,(SQL的方法其实是Tranformation) //SparkSQL封装的RDD底层还是RDD,所以还是有区分Transformation和Action的RDD val result: DataFrame = sqlContext.sql("SELECT * FROM t_boy ORDER BY fv DESC, age ASC") //查看结果,触发Action result.show() //释放资源 sc.stop() //SparkSQL 底层会去选择要用什么RDD去操作,此时我们非常熟悉这些RDD哪个效率高,哪个好 } } case class Boy(id: Long, name: String, age: Int, fv: Double) |
Spark 1.x 案例2
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext} /** * Spark Version 1.x Api */ object OneDemo2 { def main(args: Array[String]): Unit = { //Spark的配置 val conf: SparkConf = new SparkConf().setAppName("SparkSQL").setMaster("local[*]") //Spark的入口 val sc = new SparkContext(conf) //读取数据 val lines: RDD[String] = sc.textFile("hdfs://hadoop100:8020/person") //创建SparkSQL的入口 val sqlContext: SQLContext = new SQLContext(sc) //导入隐式转换 import sqlContext.implicits._ //对数据进行组装 //Row是行的意思,每一行的数据 org.apache.spark.sql.Row val rowRDD: RDD[Row] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val fv = fields(3).toInt Row(id, name, age, fv) }) //获取到数据的每一行,那么则要定义每一个数据的表头,也就是描述DataFrame val structType:StructType = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("fv", IntegerType, true) )) //现在获取到了每一行的数据,也定义了每一列的数据的意义了 //接下来就需要使用SparkSQL去操作这个数据 //首先转换为SparkSQL的抽象DataFrame val bdf:DataFrame = sqlContext.createDataFrame(rowRDD,structType) //第一种API //将当前这个DataFrame注册成为一个临时表 bdf.registerTempTable("t_boy") //编写SQL val result:DataFrame = sqlContext.sql("SELECT * FROM t_boy ORDER BY fv DESC, age ASC") //查看结果 result.show() //释放资源 sc.stop() //SparkSQL的执行引擎是SparkCore,Hive的执行引擎是Yarn } } |
Spark 1.x 案例3
import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} /** * Spark Version 1.x Api */ object OneDemo3 { def main(args: Array[String]): Unit = { //创建配置 val conf: SparkConf = new SparkConf().setAppName("SparkSQL").setMaster("local[*]") //创建程序入口 val sc: SparkContext = new SparkContext(conf) //读取数据 val lines: RDD[String] = sc.textFile("hdfs://hadoop100:8020/person") //创建SparkSQL 容器(包装) val sqlContext: SQLContext = new SQLContext(sc) //导入隐式转换 import sqlContext.implicits._ //对数据进行组装 //Row是行的意思,每一行的数据 org.apache.spark.sql.Row val rowRDD: RDD[Row] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val fv = fields(3).toInt Row(id, name, age, fv) }) //获取到数据的每一行,那么则要定义每一个数据的表头,也就是描述DataFrame val structType:StructType = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("fv", IntegerType, true) )) //创建SparkSQL的操作对象 val bdf: DataFrame = sqlContext.createDataFrame(rowRDD,structType) //第二种API //不使用SQL的方式,就不用创建临时表 val select: DataFrame = bdf.select("name","age","fv") select.show() //排序 //desc、asc是隐式方法 val select2: Dataset[Row] = select.orderBy($"fv" desc,$"age" asc) select2.show() sc.stop() } } |
Spark 2.x 案例1
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * Spark Version 2.x Api */ object SparkSqlWordCount1 { def main(args: Array[String]): Unit = { //创建会话(三合一) val sparkSession = SparkSession .builder() .appName("SparkSQL") .master("local[*]") .getOrCreate() //Dataset分布式数据集是一个比DataFrame更加聪明的数据集(做了优化) //DataFrame是RDD,Dataset是一个更聪明的DataFrame RDD,那么底层还是一个RDD val lines: Dataset[String] = sparkSession.read.textFile("hdfs://hadoop100:8020/wc") //查看下这个结果 lines.show() //整理数据,切分压平 import sparkSession.implicits._ val words: Dataset[String] = lines.flatMap(_.split(" ")) //注册视图 words.createTempView("v_wc") //执行SQL val result: DataFrame = sparkSession.sql("SELECT value,COUNT(*) counts FROM v_wc GROUP BY value ORDER BY counts DESC") //查看结果 result.show() //释放资源 sparkSession.stop() //感觉比DataFrame慢,因为在执行前,Dataset会制定计划,数据量少慢,数据量多快 } } |
Spark 2.x 案例2
import org.apache.spark.sql.{Dataset, Row, SparkSession} /** * Spark Version 2.x Api */ object SparkSqlWordCount2 { def main(args: Array[String]): Unit = { //创建会话 (三合一) val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate() //指定读取 val lines: Dataset[String] = sparkSession.read.textFile("hdfs://hadoop100:8020/wc") //查看下获取到的数据 lines.show() //整理数据,切分压平 import sparkSession.implicits._ val words: Dataset[String] = lines.flatMap(_.split(" ")) //查看数据 words.show() //使用Dataset的Api (DSL) //默认是ocunt名称 val result: Dataset[Row] = words.groupBy($"value" as "word").count().sort($"count" desc) //查看结果 result.show() //方式二 import org.apache.spark.sql.functions._ val result2: Dataset[Row] = words.groupBy($"value" as "word").agg((count("*") as "counts")).orderBy($"counts" desc) //查看结果 result2.show() //释放资源 sparkSession.stop() } } |
Spark 2.x 案例3
import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, _} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * Spark Version 2.x Api */ object TwoDemo1 { def main(args: Array[String]): Unit = { //直接使用SparkSession //三合一了,三步和一步,可以理解为每一个Driver都相当于一个会话。 //SparkSession相当于把前面的几个都合并在一起了 val sparkSession: SparkSession = SparkSession .builder() .appName("SparkSQL") .master("local[*]") .getOrCreate() //读取数据 //此时会发现SparkSession内部似乎是已经实现了SparkContext,并且提供出对应的方法出来调用 val lines: RDD[String] = sparkSession.sparkContext.textFile("hdfs://hadoop100:8020/person") //整理数据 val rowRDD: RDD[Row] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val fv = fields(3).toInt Row(id, name, age, fv) }) //结构类型,就是表头,用于描述DataFrame val structType: StructType = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("fv", IntegerType, true) )) //创建DataFrame val bdf: DataFrame = sparkSession.createDataFrame(rowRDD,structType) bdf.show() //导入隐式转换 import sparkSession.implicits._ val bdf2: Dataset[Row] = bdf.where($"fv" > 2).orderBy($"fv" desc,$"age" asc) bdf2.show() //释放资源 sparkSession.stop() } } |
Spark 2.x Join案例
import org.apache import org.apache.spark import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object JoinDemo { def main(args: Array[String]): Unit = { //创建会话 val sparkSession = SparkSession .builder() .appName("SparkSQL") .master("local[*]") .getOrCreate() //导入隐式转换 import sparkSession.implicits._ //数据 val lines: Dataset[String] = sparkSession.createDataset(List("1,A,China","2,B,USA")) //对数据进行整理 val tp3Ds : Dataset[(Long, String, String)] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val nation = fields(2) (id, name, nation) }) val df1 = tp3Ds.toDF("id","name","nation") //查看下数据 df1.show() //数据2 val nations:Dataset[String] = sparkSession.createDataset(List("China,中国","USA,美国")) //整理数据 val tp2Ds: Dataset[(String, String)] = nations.map(line => { val fields = line.split(",") val ename = fields(0) val cname = fields(1) (ename, cname) }) val df2 = tp2Ds.toDF("ename","cname") //第一种方式:创建视图 df1.createTempView("v_users") df2.createTempView("v_nations") //执行语句 val result: DataFrame = sparkSession.sql("SELECT name,cname FROM v_users JOIN v_nations ON nation = ename") //查看结果 result.show() //第二种方式:默认是inner join import org.apache.spark.sql.functions._ //两个输入框,左右表 val result2: DataFrame = df1.join(df2,$"nation" === $"ename") //查看结果 result2.show() //第二种方式:指定join的方式 //Type of join to perform. Default `inner`. Must be one of: // `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`, // `right`, `right_outer`, `left_semi`, `left_anti`. val result3: DataFrame = df1.join(df2,$"nation" === $"ename","left") //查看结果 result3.show() //释放资源 sparkSession.stop() } } |
Spark 2.x 自定义UDAF案例
UDF 输入一行,返回一个结果,1对1
UDTF 输入一行,返回多行,1对多(flatMap)
import java.lang.Long import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{StructField, _} import org.apache.spark.sql._ /** * 自定义聚合函数 */ object UdafFunc { def main(args: Array[String]): Unit = { //创建会话 val sparkSession: SparkSession = SparkSession .builder() .appName("SparkSQL") .master("local[*]") .getOrCreate() //数据 val range: Dataset[Long] = sparkSession.range(1,11) //查看数据 range.show() //自定义的聚合函数 val geoMean = new GeoMean //方式一: //注册函数 sparkSession.udf.register("gm",geoMean) //将range这个Dataset[Long]注册成视图 range.createTempView("v_range") //运算 val result: DataFrame = sparkSession.sql("SELECT gm(id) result FROM v_range") result.show() //方式二 import sparkSession.implicits._ val result2 = range.agg(geoMean($"id").as("geomean")) result2.show() //释放资源 sparkSession.stop() } } class GeoMean extends UserDefinedAggregateFunction { //输入数据的类型 override def inputSchema: StructType = StructType(List( StructField("value",DoubleType) )) //产生的中间结果的数据类型,多次聚合、shuffle override def bufferSchema: StructType = StructType(List( //相乘之后返回的积数 StructField("product",DoubleType), //参与运算数字的个数 StructField("counts",LongType) )) //最终返回的结果类型 override def dataType: DataType = DoubleType //确保一致性 override def deterministic: Boolean = true //指定初始值 override def initialize(buffer: MutableAggregationBuffer): Unit = { //相乘的初始值 buffer(0) = 1.0 //参与运算数字的个数的初始值 buffer(1) = 0L } //每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的运算) override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { //每有一个数字参与运算就进行相乘(包含中间结果) buffer(0) = buffer.getDouble(0) * input.getDouble(0) //参与运算数据的个数也有更新 buffer(1) = buffer.getLong(1) + 1L } //全局聚合 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { //每个分区计算的结果都进行相乘 buffer1(0) = buffer1.getDouble(0) * buffer2.getDouble(0) //每个分区参与运算的中间结果进行相加 buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } //计算最终结果 override def evaluate(buffer: Row): Any = { math.pow(buffer.getDouble(0),1.toDouble / buffer.getLong(1)) } } |
Spark SQL 2.x特性
1、更简单,支持标准SQL和简化的API,Spark 2.x依然拥有标准的SQL支持和统一的DataFrame/Dataset API,但扩展了Spark的SQL性能,引进了一个新的ANSI SQL解析器并支持子查询,Spark 2.x 可以运行所有的99 TPC-DS的查询,这需要很多的SQL。
a.在编程API方面,已经简化了API
b.统一Scala/Java下的DataFrames和Datasets
c.SparkSession
d.更简单,更高性能的Accumulator API
e.基于DataFrame的Machine Learning API 将成为主要的ML.API
f.Machine Learning管道持久性
g.R中的分布式算法
2、更快,Spark作为一个编译器
3、更智能,Dataset结构化数据流
a.通过在DataFrames智商构建持久化的应用程序不断简化数据流,允许统一数据流,支持交互式和批量查询
IDEA编程
Spark 2.x JDBC数据源
import java.util.Properties import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object JdbcDataSource { def main(args: Array[String]): Unit = { //创建会话 val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate() //导入隐式转换 import sparkSession.implicits._ //连接数据库,并灭有读取数据,只是获取到元数据信息(Schema信息) //sparkSession.read.jdbc() = sparkSession.read.format("jdbc") val logs: DataFrame = sparkSession.read.format("jdbc").options(Map( "url" -> "jdbc:mysql://127.0.0.1:3306/bigdata?characterEncoding=utf-8", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "access_log", "user" -> "root", "password" -> "root" )).load() //打印Schema信息 logs.printSchema() //打印结果信息 logs.show() //过滤数据 val filter: Dataset[Row] = logs.filter(r => { r.getAs[Int](0) < 3 }) //查看过滤后的结果 filter.show() //lambda表达式 val result: Dataset[Row] = logs.filter($"id" < 3) result.show() //查找 val result1: DataFrame = result.select($"id",$"name") result1.show() //写数据库 val props = new Properties() props.put("user","root") props.put("password","root") /** * Specifies the behavior when data or table already exists. Options include: * - `overwrite`: overwrite the existing data. * - `append`: append the data. * - `ignore`: ignore the operation (i.e. no-op). * - `error`: default option, throw an exception at runtime. */ //ignore 自动建表 /* 报错:Cannot execute statement: impossible to write to binary log since BINLOG_FORMAT = STATEMENT and at least one table uses a storage engine limited to row-based logging. InnoDB is limited to row-logging when transaction isolation level is READ COMMITTED or READ UNCOMMITTED. 数据库执行: STOP SLAVE; SET GLOBAL binlog_format=ROW; START SLAVE; */ result.write.mode("ignore").jdbc("jdbc:mysql://127.0.0.1:3306/bigdata?characterEncoding=utf-8","t_test",props) //将结果保存为多种格式 //result.write.text("e:/spark/text") //只能单列保存 result.write.json("e:/spark/json") result.write.csv("e:/spark/csv") result.write.parquet("e:/spark/parquet") //释放资源 sparkSession.stop() } } |
Spark 2.x JSON数据源
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.SparkSession.Builder object JsonDataSource { def main(args: Array[String]): Unit = { //创建会话 val sparkSession: SparkSession = SparkSession .builder() .appName("SparkSQL") .master("local[*]") .getOrCreate() //导入隐式转换 import sparkSession.implicits._ //指定读取JSON类型的数据,有表头(KV) val jsons: DataFrame = sparkSession.read.json("e:/spark/datasource/data.json") //过滤数据 val filtered: Dataset[Row] = jsons.where($"age" < 20) filtered.show() sparkSession.stop() } } |
Spark 2.x CSV数据源
import org.apache.spark.sql.{DataFrame, SparkSession} object CsvDataSource { def main(args: Array[String]): Unit = { //创建会话 val sparkSession: SparkSession = SparkSession .builder() .appName("SparkSQL") .master("local[*]") .getOrCreate() //导入隐式转换 import sparkSession.implicits._ //读取数据 /* A,22 B,18 C,19 */ val csv: DataFrame = sparkSession.read.csv("e:/spark/datasource/data.csv") //指定读取CSV数据源的格式 val pdf: DataFrame = csv.toDF("name","age") pdf.show() //释放资源 sparkSession.stop() } } |
Spark 2.x Parquet数据源
import org.apache.spark.sql.{DataFrame, SparkSession} object ParquetDataSource { def main(args: Array[String]): Unit = { //创建会话 val sparkSession: SparkSession = SparkSession .builder() .appName("SparkSQL") .master("local[*]") .getOrCreate() //导入隐式转换 import sparkSession.implicits._ //指定读取的数据 val paquetLine: DataFrame = sparkSession.read.parquet("E:/spark/parquet") //结构,因为paquet写出的文件,是会记录结构信息的 paquetLine.printSchema() //数据 paquetLine.show() //方式2 //val parquetLine2: DataFrame = sparkSession.read.format("parquet").load("E:/spark/parquet") //释放资源 sparkSession.stop() } } |
Spark 2.x 三种Join
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * 三种join方式: * Broadcase Join :把小表广播到大表所在的每一台机器上,本地join * 适合一张很小的表和一张大表Join * * Shuffle Hash Join:当一侧的表比较小时,选择将其广播出去以避免shuffle提高性能, * 但因为被广播的表,首先被collect到Driver端,然后冗余分发到每个executor中, * 对Driver和Executor端的压力较大 * 适合小表(比上面大)join大表,或两个小表之间join * * Sort Merge Join:在Join前将数据进行排序,这个排序是全局的,后面再切割分发,那么就可以快速配对了 * 适合两张大表之间进行Join * * Spark1.x还是2.x默认都是ShuffleHashJoin * * 在Spark2.x变成为2种了。 * Spark2.x:只有BroadcaseHashJoin和SortMergeJoin,即ShuffleHashJoin和BroadcaseHashJoin合并 */ object JoinThreeDemo { def main(args: Array[String]): Unit = { //创建会话 val sparkSession = SparkSession .builder() .appName("SparkSQL") .master("local[*]") .getOrCreate() //BroadcaseHashJoin,默认最多为10M,默认就是这个 //如果设置为-1就不会使用了,而是使用SortMergeJoin了 sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold",1024*1024*10) //导入隐式转换 import sparkSession.implicits._ //数据 val lines: Dataset[String] = sparkSession.createDataset(List("1,A,China","2,B,USA")) //对数据进行整理 val tp3Ds : Dataset[(Long, String, String)] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val nation = fields(2) (id, name, nation) }) val df1 = tp3Ds.toDF("id","name","nation") //查看下数据 df1.show() //数据2 val nations:Dataset[String] = sparkSession.createDataset(List("China,中国","USA,美国")) //整理数据 val tp2Ds: Dataset[(String, String)] = nations.map(line => { val fields = line.split(",") val ename = fields(0) val cname = fields(1) (ename, cname) }) val df2 = tp2Ds.toDF("ename","cname") //第一种方式:创建视图 df1.createTempView("v_users") df2.createTempView("v_nations") //执行语句 val result: DataFrame = sparkSession.sql("SELECT name,cname FROM v_users JOIN v_nations ON nation = ename") //查看结果 result.show() //第二种方式:默认是inner join //两个输入框,左右表 val result2: DataFrame = df1.join(df2,$"nation" === $"ename") //查看结果 result2.show() //第二种方式:指定join的方式 //Type of join to perform. Default `inner`. Must be one of: // `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`, // `right`, `right_outer`, `left_semi`, `left_anti`. val result3: DataFrame = df1.join(df2,$"nation" === $"ename","left") //查看结果 result3.show() //释放资源 sparkSession.stop() } } |
Spark 2.x 整合Hive
将hive-site.xml放到resources文件夹下
import org.apache.spark.sql.{DataFrame, SparkSession} object SparkWithHive { def main(args: Array[String]): Unit = { //如果需要Hive运行在Spark上,需要开启Spark对Hive的支持 enableHiveSupport() val sparkSession: SparkSession = SparkSession .builder() .appName("SparkOnHive") .master("local[*]") .enableHiveSupport() .getOrCreate() //不需要视图 val result: DataFrame = sparkSession.sql("SHOW DATABASES") result.show() //释放资源 sparkSession.stop() } } |
RDD、DataFrame、Dataset互转
import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object SparkTransform { def main(args: Array[String]): Unit = { /* RDD、DataFrame、DataSet 相同: 都是分布式数据集 都有partition的概念 三者之间可以互相转换 */ //Scala集合 val list:List[Int] = List(1,2,3,4,5,6,7,8,9,10) //Spark 配置 val conf = new SparkConf().setAppName("SparkTransform").setMaster("local[*]") //Spark程序入口 val sparkContext: SparkContext = new SparkContext(conf) //Scala集合 -> RDD val rdd = sparkContext.parallelize(list) //创建SparkSQL入口 val sqlContext: SQLContext = new SQLContext(sparkContext) //导入隐式转换 import sqlContext.implicits._ //rdd -> DataFrame val rddDF: DataFrame = rdd.toDF() //DataFrame -> RDD val rdd2 = rddDF.rdd //RDD -> Dataset val rddDS = rdd.toDS() //Dataset -> RDD val rdd3 = rddDS.rdd //DataFrame -> Dataset val rddDS1 = rddDF.as("") //Dataset -> DataFrame val rddDF1 = rddDS.toDF() } } |