Spark把RDD数据保存到一个单个文件中

Spark是当前最流行的分布式数据处理框架之一,相比于Hadoop,Spark在数据的处理方面更加灵活方便。然而在最近的使用中遇到了一点小麻烦:Spark保存文件的的函数(如saveAsTextFile)在保存数据时都需要新建一个目录,然后在这个目录下分块保存文件。如果我们想在原有的目录下增加一个文件(而不是增加一个目录),Spark就无能为力了。

        有网友给出建议,用

rddx.repartition(1).saveAsTextFile("test/test.txt")
rddx.coalesce(1).saveAsTextFile("test/test.txt")
  • 1
  • 2

把数据合并到一个分区中,然而得到的结果是这样的:

$ ./bin/hadoop fs -du -h test/test.txt
0        test/test.txt/_SUCCESS
499.9 M  test/test.txt/part-00000
  • 1
  • 2
  • 3
  • 4

Spark仍然是新建了一个目录test.txt,然后在这个目录下把数据都保存在了part-00000文件中。

       Spark的保存模式的设定注定了在保存数据的时候只能新建目录,如果想把数据增加到原有的目录中,单独作为一个文件,就只能借助于Hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem实现在已有目录下用一个文件保存Spark数据:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.URI;
/**
 * 使用Hadoop的FileSystem把数据写入到HDFS
 */
public class HdfsOperate implements Serializable{

    private static Logger logger = LoggerFactory.getLogger(HdfsOperate.class);
    private static Configuration conf = new Configuration();
    private static BufferedWriter writer = null;

    //在hdfs的目标位置新建一个文件,得到一个输出流
    public static void openHdfsFile(String path) throws Exception {
        FileSystem fs = FileSystem.get(URI.create(path),conf);
        writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path))));
        if(null!=writer){
            logger.info("[HdfsOperate]>> initialize writer succeed!");
        }
    }

    //往hdfs文件中写入数据
    public static void writeString(String line) {
        try {
            writer.write(line + "\n");
        }catch(Exception e){
            logger.error("[HdfsOperate]>> writer a line error:"  ,  e);
        }
    }

    //关闭hdfs输出流
    public static void closeHdfsFile() {
        try {
            if (null != writer) {
                writer.close();
                logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!");
            }
            else{
                logger.error("[HdfsOperate]>> closeHdfsFile writer is null");
            }
        }catch(Exception e){
            logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

在Spark中处理并保存数据:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import hdfsoperate.HdfsOperate;
import org.apache.spark.Partition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.NlpModuleWrapper;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

/**
 * 调用HdfsOperate类的方法把RDD数据保存到Hdfs上
 */
public class FeatureExtractor implements Serializable {
    private static Logger logger = LoggerFactory.getLogger(FeatureExtractor.class);

    public void extractFeature(JavaSparkContext sc, int repartitionNum) throws Exception {
        String hdfsPath = "test/corpus/2016-09-02"; //存放原始数据的文件
        //Spark可以读取单独的一个文件或整个目录
        JavaRDD<String> rddx = sc.textFile(hdfsPath).repartition(repartitionNum); 
        rddx = rddx.map(new ExtractFeatureMap());

        //写入hdfs文件位置
        String destinationPath = "test/result/2016-09-02" ;
        //创建Hdfs文件,打开Hdfs输出流
        HdfsOperate.openHdfsFile(destinationPath);

        //分块读取RDD数据并保存到hdfs
        //如果直接用collect()函数获取List<String>,可能因数据量过大超过内存空间而失败
        for (int i = 0; i < repartitionNum; i++) {
            int[] index = new int[1];
            index[0] = i;
            List<String>[] featureList = rddx.collectPartitions(index);
            if (featureList.length != 1) {
                logger.error("[FeatureExtractor]>> featureList.length is not 1!");
            }
            for (String str : featureList[0]) {
                //写一行到Hdfs文件
                HdfsOperate.writeString(str);
            }
        }
        //关闭Hdfs输出流
        HdfsOperate.closeHdfsFile();

    }



    class ExtractFeatureMap implements Function<String, String> {
        @Override
        public String call(String line) throws Exception {
            try {
                //TODO:你自己的操作,返回String类型
            } catch (Exception e) {
                logger.error("[FeatureExtractor]>>GetTokenAndKeywordFeature error:", e);
            }
            return null;
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

(PS:目前还没有看到过单用Spark接口能实现该功能,有知道的大神欢迎指点)

猜你喜欢

转载自blog.csdn.net/kwame211/article/details/80585910