MapReduce设置输出文件到多个文件夹下

一:自定义OutputFormat类

MapReduce默认的OutPutFormat会将结果输出文件放置到一个我们指定的目录下,但如果想把输出文件根据某个条件,把满足不同条件的内容分别输出到不同的目录下,就需要自定义实现OutputFormat类,且重写RecordWriter方法。

在驱动类中设置job.setOutputFormatClass方法为自定义实现的OutputFormat类

下面案例以一组购物文本数据,将其中的好评和差评分别输出到对应的好评文件夹下、差评文件夹下。

二:自定义实现OutputFormat类代码实现


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 自定义实现OutputFormat类
 */
public class MyOutputFormat extends FileOutputFormat<Text,NullWritable> {

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
       //从这个方法里面可以获取一个configuration
        Configuration configuration = context.getConfiguration();
        //获取文件系统的对象
        FileSystem fileSystem = FileSystem.get(configuration);
        //好评文件的输出路径
        Path goodComment = new Path("file:///F:\\goodComment\\1.txt");

        //差评文件的输出路径
        Path badComment = new Path("file:///F:\\badComment\\1.txt");

        //获取到了两个输出流
        FSDataOutputStream fsDataOutputStream = fileSystem.create(goodComment);
        FSDataOutputStream fsDataOutputStream1 = fileSystem.create(badComment);

        MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream, fsDataOutputStream1);

        return myRecordWriter;
    }
}

三:自定义实现RecordWriter类


import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class MyRecordWriter extends RecordWriter<Text,NullWritable> {
    private FSDataOutputStream goodStream;
    private FSDataOutputStream badSteam;

    public MyRecordWriter(){

    }

    public  MyRecordWriter(FSDataOutputStream goodStream,FSDataOutputStream badSteam){
        this.goodStream = goodStream;
        this.badSteam= badSteam;

    }

    /**
     * 重写write方法
     * 这个write方法就是往外写出去数据,我们可以根据这个key,来判断文件究竟往哪个目录下面写
     * goodStream:指定输出文件
     * badSteam:自定输出文件
     * @param key:k3
     * @param value
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String[] split = key.toString().split("\t");
        //获取评论状态  0  好评  1   中评  2 差评
     //   split[9]
        //判断评评论状态,如果是小于等于1,都写到好评文件里面去
        if(Integer.parseInt(split[9])<=1){
            //好评
            goodStream.write(key.getBytes());
            goodStream.write("\r\n".getBytes());
        }else{
            //差评
            badSteam.write(key.getBytes());
            badSteam.write("\r\n".getBytes());
        }
    }

    /**
     * 关闭资源
     * @param context:上下文对象
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(badSteam);
        IOUtils.closeStream(goodStream);
    }
}

四:自定义Map类


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyOutputMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}

五:驱动程序


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyOutputMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), "ownOutputFormat");

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));


        job.setMapperClass(MyOutputMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);


        job.setOutputFormatClass(MyOutputFormat.class);
        //由于重写了FileOutputFormat,所以下面这个指定的目录内不会有输出文件
        //输出文件在MyOutputFormat中重新指定
        MyOutputFormat.setOutputPath(job ,new Path("file:///F:\\output"));

        boolean b = job.waitForCompletion(true);

        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new MyOutputMain(), args);
        System.exit(run);
    }

}

猜你喜欢

转载自blog.csdn.net/qq_15076569/article/details/84206324