Spark 学习 (4)数据加载与存储

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/d413122031/article/details/82631463

Load and Save

  • Text files
    • 无结构
    • 每行为一条记录
    • Load


      • input = sc.textFile("file:///home/holden/repos/spark/README.md")
        "s3n://bucket/my-files/*.txt"
        "hdfs://master:port/path"
    • Save

      result.saveAsTextFile(outputFile)
  • Json

    • 半结构化
    • 大多数行为一个计入
    • Load

          import json 
          data = input.map(lambda x: json.loads(x))
          data = input.jsonFile("tweets.json")
          df = sqlContext.read \
              .format('json').load('py/test/sql/people.json')
          df =sparksession.read.format('json').load('py/test/sql/people.json')
          df = sparksession.read.json('py/test/sql/people.json')
      
    • Save

      (data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x))
                                                 .saveAsTextFile(outputFile))
  • CSV

    • 结构化
    • 通常用于电子表格
    • Load

          import csv
          import StringIO
      
          def loadRecord(line):
              """Parse a CSV line"""
              input = StringIO.StringIO(line)
              reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
          return reader.next()
          input = sc.textFile(inputFile).map(loadRecord)
          def loadRecords(fileNameContents):
              """Load all the records in a given file"""
               input = StringIO.StringIO(fileNameContents[1])
               reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"])
               return reader
          fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
    • Save

      def writeRecords(records):
      """Write out CSV lines"""
      output = StringIO.StringIO()
      writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
      for record in records:
      writer.writerow(record)
      return [output.getvalue()]
      pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
  • SequenceFiles

    • 结构化
    • 键值对类型的数据
    • Load

      data = sc.sequenceFile(inFile,
      "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
    • Save

      data.saveAsSequenceFile(outputFile)
  • Protocol buffers

    • 结构化
    • 一种快速、节省空间的多语言格式。
  • Object files

    • 结构化
    • 在Spark作业中用于共享
    • Load
      • pickleFile()
    • Save
      • saveAsPickleFile()

猜你喜欢

转载自blog.csdn.net/d413122031/article/details/82631463
今日推荐