spark_入门03学习笔记
1、目标
-
1、掌握sparksql底层原理
-
2、掌握DataFrame和DataSet数据结构和使用方式
-
3、掌握通过sparksql来进行代码开发
2、sparksql概述
2.1 sparksql前世今生
- shark它是专门为spark设计的大规模数据仓库系统
- shark依赖于hive的代码,同时也依赖spark版本
- 后期发现hive的mapreduce设计框架限制了shark的性能
- 慢慢的把shark这个框架废弃,把重点转移到了sparksql
2.2 sparksql是什么
- Spark SQL is Apache Spark’s module for working with structured data.
- sparksql是apache spark框架处理结构化数据的一个模块。
- 它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
- 后期可以通过sql语句、DataFrame api、DataSet api去操作sparksql
2.3 sparksql特性
-
1、易整合
- 可以将sparksql与spark应用程序进行混合使用,同时可以通过java、scala、python、R不同语言进行代码开发
-
2、统一的数据源访问
-
sparksql可以通过一种相同的方式来对接任意的外部数据源
-
SparkSession.read.文件格式(该格式文件的路径)
-
-
3、兼容hive
- sparksql可以支持hivesql语句
-
4、支持标准的数据库连接
- 可以通过sparksql来使用标准的数据库连接来操作数据库
3、DataFrame
3.1 DataFrame是什么
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化
3.2 rdd与dataFrame的区别
RDD可看作是分布式的对象的集合,Spark并不知道对象的详细模式信息,DataFrame可看作是分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。
左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解 Person类的内部结构。
而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么,DataFrame多了数据的结构信息,即schema。
DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where …)
此外DataFrame还引入了off-heap,意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)
RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。
3.3 rdd与dataFrame优缺点
- rdd
- 优点
- 1、编译时类型安全
- 编译时会进行类型的检查,看一下数据类型是否符合要求
- 2、具有面向对象编程风格
- 后期可以通过操作rdd使用面向对象这种风格
- 1、编译时类型安全
- 缺点
- 1、数据序列化和反序列化性能开销很大。
- rdd是一个分布式数据集,后期在进行分布式计算的时候,需要进行大量的网络传输,在传输的时候需要把数据本身和数据结构先进行序列化,然后在通过反序列化获取当前该对象。
- 2、频繁的对象创建会带来大量的GC(垃圾清理–清理的时候所有任务停止)
- 很多对象的创建都需要占用jvm堆中的内存,由于堆中的内存它是有限,这个时候就需要进行大量的GC,来腾出更多的内存空间来存储对象,只要程序进行GC,所有的任务都是暂停。GC结束之后,任务才继续运行。
- 1、数据序列化和反序列化性能开销很大。
- 优点
- DataFrame
- dataFrame中引入了schema元信息和off-heap(不在jvm堆中的内存,直接使用操作系统层面上的内存)
- 它分别解决了rdd的缺点。
- 优点
- 1、由于dataFrame引入了schema
- 后期数据在进行网络传输的时候,只需要把数据本身进行序列化,然后在通过反序列化获取该数据就可以了,也就是说它结构信息这一块省略掉了。
- 解决了rdd中数据序列化和反序列化性能开销很大这个缺点。
- 2、由于dataFrame引入了off-heap(heap堆)
- rdd中大量对象的创建只能够在jvm堆中,内存不够,肯定要进行大量的GC,影响程序的性能
- dataFrame中的大量的对象创建就不在jvm堆以内,直接使用操作系统层面上的内存,这样一来,jvm堆中的内存使用率就比较小。大大减少程序GC.
- 解决了rdd中频繁的对象创建会带来大量的GC这个缺点。
- 1、由于dataFrame引入了schema
- 缺点
- 它解决了rdd的缺点,同时它也丢失了rdd的优点
- 1、编译时类型不安全
- 就是在编译时不在进行类型检查
- 2、不具备面向对象编程风格
- 1、编译时类型不安全
- 它解决了rdd的缺点,同时它也丢失了rdd的优点
4、读取数据源创建DataFrame
下列文件都在spark的安装包下
/export/servers/spark/examples/src/main/resources
将里面的上传到hdfs即可
4.1 读取文本文件创建DataFrame
val df=spark.read.text("/person.txt")
//打印schema
df.printSchema
//展示数据
df.show
4.2 读取json文件创建DataFrame
val df=spark.read.json("/people.json")
//打印schema
df.printSchema
//展示数据
df.show
4.3 读取parquet列存储文件创建DataFrame
val df=spark.read.parquet("/users.parquet")
//打印schema
df.printSchema
//展示数据
df.show
5、DataFrame常用的操作
5.1 DSL风格语法
-
dataFrame自身提供了一套属于自己的语法api
val rdd1=sc.textFile("/person.txt") val rdd2=rdd1.map(_.split(" ")) case class Person(id:Int,name:String,age:Int) val rdd3=rdd2.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) val personDF=rdd3.toDF personDF.printSchema personDF.show //查询name字段对应结果 personDF.select("name").show personDF.select(col("name")).show personDF.select($"name").show //实现每一个用户年龄加1 personDF.select($"name",$"age",$"age"+1).show //过滤出年龄大于30的用户 personDF.filter($"age" >30).show personDF.filter($"age" >30).count //统计不同的年龄用的出现的次数 personDF.groupBy("age").count.show
5.2 SQL风格语法
- 1、先把dataFrame注册成一张表
- personDF.registerTempTable(“person”)
- 2、通过sparkSession.sql(sql语句) 去操作dataFrame
spark.sql("select * from person").show
spark.sql("select * from person where age >30").show
spark.sql("select count(*) from person where age >30").show
spark.sql("select * from person where name='lisi' ").show
6、DataSet
6.1 dataSet是什么
它也是一个分布式的数据集合,支持强类型,就是在rdd的每一行数据加了一个类型约束,它是在spark1.6之后添加的,dataFrame只在spark1.3之后添加,这里的dataset比dataFrame出来的晚点。
6.2 dataFrame和dataSet互相转换
- 1、dataFrame转换成dataSet
val ds=df.as[强类型]
- 2、dataSet转换成dataFrame
val df=ds.toDF
- 补充
val rdd1=df.rdd
val rdd2=ds.rdd
6.3 如何创建DataSet
- 1、通过一个已经存在scala集合去构建
val ds1=spark.createDataset(List(1,2,3,4,5))
val ds2=List(1,2,3,4,5).toDS
- 2、通过一个已经存在的rdd去构建
val ds=spark.createDataset(sc.textFile("/person.txt"))
- 3、dataFrame转换成dataSet
val ds=df.as[强类型]
- 4、通过dataSet中的方法生成一个新的dataSet
7、通过IDEA开发代码实现把RDD转换成dataFrame
-
1、引入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.3</version> </dependency>
7.1 利用反射机制(定义样例类)
反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD的schema。
Scala支持使用case class类型导入RDD转换为DataFrame,通过case class创建schema(这里指的是源信息),case class的参数名称会被利用反射机制作为列名。这种RDD可以高效的转换为DataFrame并注册为表。
package cn.itcast.sparksql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
//todo:将rdd转换成dataFrame--------利用反射机制(定义样例类)
case class Person(id:Int,name:String,age:Int)
object CaseClassSchema {
def main(args: Array[String]): Unit = {
//1、创建SparkSession对象
val spark: SparkSession = SparkSession.builder()
.appName("CaseClassSchema")
.master("local[2]")
.getOrCreate()
//2、创建SparkContext
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("warn")
//3、读取数据
val data: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(_.split(" "))
//4、将rdd与样例类进行关联
val personRDD: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
//5、把personRDD转换成dataFrame
//需要手动导入隐式转换
import spark.implicits._
val personDF: DataFrame = personRDD.toDF()
//6、操作dataFrame
//----------------------DSL风格语法---------------start
//打印schema
personDF.printSchema()
//展示数据
personDF.show() //zhangsanxxxxxxxxxxxxxxxxxxxxxxxxx
personDF.show(1)
//打印第一条
println(personDF.head())
//获取前N条数据
personDF.head(2).foreach(println)
// 查询name字段对应的结果
personDF.select("name").show()
personDF.select($"name",$"age").show()
//实现把age+1
personDF.select($"age",$"age"+1).show()
//过滤出年龄大于30的用户
personDF.filter($"age" >30).show()
println(personDF.filter($"age" >30).count())
//按照年龄进行分组统计
personDF.groupBy("age").count().show()
//----------------------DSL风格语法---------------end
//----------------------SQL风格语法---------------start
personDF.createTempView("person")
spark.sql("select * from person").show()
spark.sql("select * from person where id =1").show()
spark.sql("select * from person where age >30 ").show()
spark.sql("select count(*) from person").show()
//----------------------SQL风格语法---------------end
//7、关闭
sc.stop()
spark.stop()
}
}
7.2 通过StructType指定schema
当case class不能提前定义好时,可以通过以下三步创建DataFrame
(1)将RDD转为包含Row对象的RDD
(2)基于StructType类型创建schema,与第一步创建的RDD相匹配
(3)通过sparkSession的createDataFrame方法对第一步的RDD应用schema创建DataFrame
package cn.itcast.sparksql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
//todo:将rdd转换成dataFrame---------通过StructType指定schema
object SparkSqlSchema {
def main(args: Array[String]): Unit = {
//1、创建SparkSession
val spark: SparkSession = SparkSession.builder()
.appName("SparkSqlSchema")
.master("local[2]")
.getOrCreate()
//2、创建SparkContext
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("warn")
//3、读取数据
val rdd1: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(_.split(" "))
//4、把rdd1与Row进行关联
val rowRDD: RDD[Row] = rdd1.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
//5、指定dataFrame的schema
val schema =
StructType(
StructField("id", IntegerType, true) ::
StructField("name", StringType, false) ::
StructField("age", IntegerType, false) :: Nil)
val df: DataFrame = spark.createDataFrame(rowRDD,schema)
df.printSchema()
df.show()
df.createTempView("person")
spark.sql("select * from person").show()
sc.stop()
spark.stop()
}
}
8、sparksql操作hivesql
HiveContext是对应spark-hive这个项目,与hive有部分耦合, 支持hql,是SqlContext的子类,在Spark2.0之后,HiveContext和SqlContext在SparkSession进行了统一,可以通过操作SparkSession来操作HiveContext和SqlContext。
-
1、引入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.3</version> </dependency>
-
2、代码开发
package cn.itcast.sparksql import org.apache.spark.sql.SparkSession //todo:利用sparksql操作hivesql object HiveSupport { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder() .appName("SparkSqlSchema") .master("local[2]") .enableHiveSupport() //开启对hivesql支持 .getOrCreate() //2、通过sparkSession操作hivesql //2.1 创建hive表 spark.sql("create table user(id int,name string,age int) row format delimited fields terminated by ','") //2.2 加载数据到hive表中 spark.sql("load data local inpath './data/user.txt' into table user") //2.3 查询 spark.sql("select * from user").show() spark.stop() } }
9、jdbc数据源
9.1 通过sparksql从mysql表中加载数据
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
-
1、通过idea进行开发的代码
package cn.itcast.sparksql import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} //todo:利用sparksql读取mysql表中的数据 object DataFromMysql { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder() .appName("DataFromMysql") .master("local[2]") .getOrCreate() //2、通过sparkSession操作mysql表中数据 //定义url val url="jdbc:mysql://192.168.200.100:3306/spark" //定义表名 val table="iplocation" //定义配置 val properties=new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") val mysqlDF: DataFrame = spark.read.jdbc(url,table,properties) mysqlDF.printSchema() mysqlDF.show() spark.stop() } }
-
2、通过spark-shell运行
- (1)、启动spark-shell(必须指定mysql的连接驱动包)
spark-shell \ --master spark://node-1:7077 \ --executor-memory 512M \ --total-executor-cores 1 \ --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar \ --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar
需要将mysql的驱动包引入 , 否则无法执行
- (2)、从mysql中加载数据
val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.140.46:3306/spark", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocation", "user" -> "root", "password" -> "123")).load()
- (3)、执行查询
9.2 把结果数据写入到mysql表中
-
1、代码开发(本地运行)
package cn.itcast.sparksql import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} //todo:sparksql把最后的结果数据写回到mysql表中进行存储 object Data2Mysql { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder() .appName("Data2Mysql") .master("local[2]") .getOrCreate() //2、创建SparkContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("warn") //3、读取数据 val data: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(_.split(" ")) //4、将rdd与样例类进行关联 val personRDD: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //5、把personRDD转换成dataFrame //需要手动导入隐式转换 import spark.implicits._ val personDF: DataFrame = personRDD.toDF() //打印结果 personDF.show() personDF.createTempView("person") val result: DataFrame = spark.sql("select * from person where age >30") //需要把结果数据写入到mysql表中 //定义url val url="jdbc:mysql://192.168.200.100:3306/spark" //定义表名 val table="person2" //定义配置 val properties=new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") //mode方法可以指定数据插入模式 //方法中需要的参数: //overwrite:覆盖,如果事先不存在表,它会事先帮你创建 //append: 追加,如果事先不存在表,它会事先帮你创建 //ignore:忽略,如果这个表事先存在,在插入数据的时候就直接忽略掉,也就是不进行任何操作 //error :报错,默认选项,如果表事先存在就报错 //将查询结果写入到mysql result.write.mode("ignore").jdbc(url,table,properties) //关闭 sc.stop() spark.stop() } }
-
2、代码开发(集群运行)
package cn.itcast.sparksql import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} //todo:sparksql把最后的结果数据写回到mysql表中进行存储 object Data2Mysql { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder() .appName("Data2Mysql") .getOrCreate() //2、创建SparkContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("warn") //3、读取数据 val data: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(" ")) //4、将rdd与样例类进行关联 val personRDD: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //5、把personRDD转换成dataFrame //需要手动导入隐式转换 import spark.implicits._ val personDF: DataFrame = personRDD.toDF() //打印结果 personDF.show() personDF.createTempView("person") val result: DataFrame = spark.sql("select * from person where age >30") //需要把结果数据写入到mysql表中 //定义url val url="jdbc:mysql://192.168.200.100:3306/spark" //定义表名 val table=args(1) //定义配置 val properties=new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") //mode方法可以指定数据插入模式 //方法中需要的参数: //overwrite:覆盖,如果事先不存在表,它会事先帮你创建 //append: 追加,如果事先不存在表,它会事先帮你创建 //ignore:忽略,如果这个表事先存在,在插入数据的时候就直接忽略掉,也就是不进行任何操作 //error :报错,默认选项,如果表事先存在就报错 result.write.mode("append").jdbc(url,table,properties) //关闭 sc.stop() spark.stop() } }
-
3、集群提交脚本
spark-submit \ --master spark://node1:7077 \ --class cn.itcast.sparksql.Data2Mysql(运行的主类) \ --executor-memory 1g \ --total-executor-cores 1 \ --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar \ --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar \ original-spark_class14-1.0-SNAPSHOT.jar \ /person.txt itcast
- 自己集群测试
- 在上传包的当前路径执行此命令 , 如: 我自己上传报的路径为
~
, 因此直接在~
下执行下列代码即可
spark-submit --master spark://node-1:7077 \
--class com.itck.spark_sql.Data2Mysql \
--executor-memory 512M \
--total-executor-cores 1 \
--jars /export/servers/hive/lib/mysql-connector-java-5.1.32.jar \
--driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.32.jar \
original-demo_spark-1.0-SNAPSHOT.jar \
/pson.txt person