1 需求
现在有一些订单的评论数据,需求,将订单的好评与差评进行区分开来,将最终的数据分开到不同的文件夹下面去,数据内容参见资料文件夹,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评
2 分析
程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现。
3 实现
实现要点:
- 在mapreduce中访问外部资源
- 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()
第一步:自定义一个outputformat
public class MyOutPutFormat extends FileOutputFormat<Text,NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(context.getConfiguration()); Path goodComment = new Path("file:///E:\\MapReduce\\自定义outputFormat\\goodComment"); Path badComment = new Path("file:///E:\\MapReduce\\自定义outputFormat\\badCOmment"); FSDataOutputStream goodOutputStream = fs.create(goodComment); FSDataOutputStream badOutputStream = fs.create(badComment); return new MyRecordWriter(goodOutputStream,badOutputStream); } static class MyRecordWriter extends RecordWriter<Text, NullWritable>{ FSDataOutputStream goodStream = null; FSDataOutputStream badStream = null; public MyRecordWriter(FSDataOutputStream goodStream, FSDataOutputStream badStream) { this.goodStream = goodStream; this.badStream = badStream; } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { if (key.toString().split("\t")[9].equals("0")){ goodStream.write(key.toString().getBytes()); goodStream.write("\r\n".getBytes()); }else{ badStream.write(key.toString().getBytes()); badStream.write("\r\n".getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if(badStream !=null){ badStream.close(); } if(goodStream !=null){ goodStream.close(); } } } }
第二步:程序入口类main
public class MyOwnOutputFormatMain extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Configuration conf = super.getConf(); Job job = Job.getInstance(conf, MyOwnOutputFormatMain.class.getSimpleName()); job.setJarByClass(MyOwnOutputFormatMain.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("file:///E:\\MapReduce\\自定义outputFormat\\input")); job.setMapperClass(MyOwnMapper.class); job.setMapOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(MyOutPutFormat.class); //设置一个输出目录,这个目录会输出一个success的成功标志的文件 MyOutPutFormat.setOutputPath(job,new Path("file:///E:\\MapReduce\\自定义outputFormat\\output")); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); boolean b = job.waitForCompletion(true); return b?0:1; } public static class MyOwnMapper extends Mapper<LongWritable, Text,Text,NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); String commentStatus = split[9]; context.write(value,NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); ToolRunner.run(configuration,new MyOwnOutputFormatMain(),args); } }