Spark学习笔记(三):Spark DataFrame

系列博客是学习厦门大学林子雨老师spark编程基础课程的笔记,方便回顾

系列博客:

Spark学习笔记(一):Spark概述与运行原理 

Spark学习笔记(二):RDD编程基础 

  • Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是HiveHDFSCassandra等外部数据源,还可以是JSON格式的数据
  • Spark SQL目前支持ScalaJavaPython三种语言,支持SQL-92规范
  • DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能
DataFrame与RDD的区别
RDD 是分布式的 Java 对象的集合,但是,对象内部结构对于 RDD 而言却是不可知的
DataFrame 是一种以 RDD 为基础的分布式数据集,提供了详细的结构信息
  • DataFrame的创建 

Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContextHiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContextHiveContext所有功能

可以通过如下语句创建一个SparkSession对象

from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

实际上,在启动进入pyspark以后,pyspark就默认提供了一个SparkContext对象(名称为sc)和一个SparkSession对象(名称为spark

  • 从文件中加载数据创建DataFrame 

在创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame

spark.read.text("people.txt")#读取文本文件people.txt创建DataFrame
spark.read.json("people.json")#读取people.json文件创建DataFrame
spark.read.parquet("people.parquet")#读取people.parquet文件创建DataFrame
spark.read.format("text").load("people.txt")#读取文本文件people.json创建DataFrame;
spark.read.format("json").load("people.json")#读取JSON文件people.json创建DataFrame;
spark.read.format("parquet").load("people.parquet")#读取Parquet文件people.parquet
  • 从RDD转换得到DataFrame

(1)利用反射机制推断RDD模式

适用对已知数据结构的RDD转换

from pyspark.sql import Row
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
people=spark.sparkContext.textFile("people.txt").\
                          map(lambda line :line.split(",")).\
                          map(lambda p: Row(name=p[0],age=int(p[1])))
schemaPeople=spark.createDataFrame(people)
# 必须注册为临时表才能供下面的查询下载
schemaPeople.createOrReplaceTempView("people")
personsDF=spark.sql('select name,age from people where age >20')
# DataFrame中的每一个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值
personsDF.rdd.foreach(print)

(2)使用编程方式定义RDD模式

当无法提前获知数据结构时,就需要采用编程方式定义RDD模式

from pyspark.sql.types import *
from pyspark.sql import Row
#生成表头
schemaString='name age'
fields=[StructField(field_name,StringType(),True)for field_name in schemaString.split(" ")]
schema=StringType(fields)
#生成表中的记录
lines=spark.sparkContext.textFile("people.txt")
parts=lines.map(lambda x:x.split(","))
people=parts.map(lambda p:Row(p[0],p[1].strip()))
#把表头和表中的记录拼到一起
schemaPeople=spark.createDataFrame(people,schema)
schemaPeople.createOrReplaceTempView("people")
results=spark.sql("select name,age from people")
results.show()
  •  DataFrame的保存

可以使用spark.write操作,把一个DataFrame保存成不同格式的文件

df.write.text("people.txt")
df.write.json("people.json")
df.write.parquet("people.parquet")
df.write.format("text").save("people.txt")
df.write.format("json").save("people.json")
df.write.format ("parquet").save("people.parquet")
  •  DataFrame的常用操作

 printSchema()   dataframe的各列详情

select()   

filter()

groupBy()

sort()

发布了62 篇原创文章 · 获赞 118 · 访问量 22万+

猜你喜欢

转载自blog.csdn.net/qq_38412868/article/details/104315881