系列博客是学习厦门大学林子雨老师spark编程基础课程的笔记,方便回顾
系列博客:
- Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据
- Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范
- •DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能
RDD 是分布式的 Java 对象的集合,但是,对象内部结构对于 RDD 而言却是不可知的DataFrame 是一种以 RDD 为基础的分布式数据集,提供了详细的结构信息
-
DataFrame的创建
从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能
可以通过如下语句创建一个SparkSession对象
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
实际上,在启动进入pyspark以后,pyspark就默认提供了一个SparkContext对象(名称为sc)和一个SparkSession对象(名称为spark)
- 从文件中加载数据创建DataFrame
在创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame
spark.read.text("people.txt")#读取文本文件people.txt创建DataFrame
spark.read.json("people.json")#读取people.json文件创建DataFrame
spark.read.parquet("people.parquet")#读取people.parquet文件创建DataFrame
spark.read.format("text").load("people.txt")#读取文本文件people.json创建DataFrame;
spark.read.format("json").load("people.json")#读取JSON文件people.json创建DataFrame;
spark.read.format("parquet").load("people.parquet")#读取Parquet文件people.parquet
- 从RDD转换得到DataFrame
(1)利用反射机制推断RDD模式
适用对已知数据结构的RDD转换
from pyspark.sql import Row
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
people=spark.sparkContext.textFile("people.txt").\
map(lambda line :line.split(",")).\
map(lambda p: Row(name=p[0],age=int(p[1])))
schemaPeople=spark.createDataFrame(people)
# 必须注册为临时表才能供下面的查询下载
schemaPeople.createOrReplaceTempView("people")
personsDF=spark.sql('select name,age from people where age >20')
# DataFrame中的每一个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值
personsDF.rdd.foreach(print)
(2)使用编程方式定义RDD模式
当无法提前获知数据结构时,就需要采用编程方式定义RDD模式
from pyspark.sql.types import *
from pyspark.sql import Row
#生成表头
schemaString='name age'
fields=[StructField(field_name,StringType(),True)for field_name in schemaString.split(" ")]
schema=StringType(fields)
#生成表中的记录
lines=spark.sparkContext.textFile("people.txt")
parts=lines.map(lambda x:x.split(","))
people=parts.map(lambda p:Row(p[0],p[1].strip()))
#把表头和表中的记录拼到一起
schemaPeople=spark.createDataFrame(people,schema)
schemaPeople.createOrReplaceTempView("people")
results=spark.sql("select name,age from people")
results.show()
-
DataFrame的保存
可以使用spark.write操作,把一个DataFrame保存成不同格式的文件
df.write.text("people.txt")
df.write.json("people.json")
df.write.parquet("people.parquet")
df.write.format("text").save("people.txt")
df.write.format("json").save("people.json")
df.write.format ("parquet").save("people.parquet")
-
DataFrame的常用操作
printSchema() dataframe的各列详情
select()
filter()
groupBy()
sort()