版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/d413122031/article/details/82631463
Load and Save
- Text files
- 无结构
- 每行为一条记录
- Load
-
input = sc.textFile("file:///home/holden/repos/spark/README.md")
"s3n://bucket/my-files/*.txt"
"hdfs://master:port/path"
-
- Save
result.saveAsTextFile(outputFile)
Json
- 半结构化
- 大多数行为一个计入
Load
import json data = input.map(lambda x: json.loads(x))
data = input.jsonFile("tweets.json")
df = sqlContext.read \ .format('json').load('py/test/sql/people.json') df =sparksession.read.format('json').load('py/test/sql/people.json') df = sparksession.read.json('py/test/sql/people.json')
Save
(data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x)) .saveAsTextFile(outputFile))
CSV
- 结构化
- 通常用于电子表格
Load
import csv import StringIO def loadRecord(line): """Parse a CSV line""" input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"]) return reader.next() input = sc.textFile(inputFile).map(loadRecord)
def loadRecords(fileNameContents): """Load all the records in a given file""" input = StringIO.StringIO(fileNameContents[1]) reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"]) return reader fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
- Save
def writeRecords(records):
"""Write out CSV lines"""
output = StringIO.StringIO()
writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
for record in records:
writer.writerow(record)
return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
SequenceFiles
- 结构化
- 键值对类型的数据
- Load
data = sc.sequenceFile(inFile,
"org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
- Save
data.saveAsSequenceFile(outputFile)
Protocol buffers
- 结构化
- 一种快速、节省空间的多语言格式。
Object files
- 结构化
- 在Spark作业中用于共享
- Load
- pickleFile()
- Save
- saveAsPickleFile()