多目录输出:
输出两个文件一个是最大值,一个最小值。
主要操作reduce阶段
1.在reduce阶段定义多目录输出对象
private MutipleOutPuts<Text,DouWritable> outputs =null;
2.在setup()创建多目录输出对象 需要context支持
outputs = new MultipleOutputs(context);
3.在reduce用多目录输出对象生成多目录
outputs.write(new Text(maxkey), new DoubleWritable(maxval), "maxout/max");
outputs.write(new Text(minkey), new DoubleWritable(minval), "minout/min");
代码:
package com.hnxy.mr.Multiple;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.hnxy.mr.entity.WordWritable;
public class MaxMinWord extends Configured implements Tool {
private static class MWMapper extends Mapper<LongWritable, Text, Text, WordWritable> {
// 定义属性
private Text outkey = new Text();
private WordWritable outval = new WordWritable();
// 定义临时变量
private String maxkey = "";
private Double maxval = 0D;
private String minkey = "";
private Double minval = 0D;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, WordWritable>.Context context)
throws IOException, InterruptedException {
// 获得数据
String[] star = value.toString().split("\t");
// 判断是否符合要求
if(star.length ==7 && null != star){
// 判断最大值
if(maxval < Double.parseDouble(star[6])){
maxval = Double.parseDouble(star[6]);
maxkey = star[3];
// 判断最小值
}
if(minval <= 0D){
minval = Double.parseDouble(star[6]);
minkey = star[3];
}
if(minval > Double.parseDouble(star[6])){
minval = Double.parseDouble(star[6]);
minkey = star[3];
}
}else{
context.getCounter("erro_line", "word_line").increment(1);
}
//System.out.println(maxkey+"-->"+maxval);
//System.out.println(minkey+"-->"+minval);
}
@Override
protected void cleanup(Mapper<LongWritable, Text, Text, WordWritable>.Context context)
throws IOException, InterruptedException {
outkey.set("max_min");
outval.setKeyName(maxkey);
outval.setKeyval(maxval);
context.write(outkey, outval);
//System.out.println(maxkey+"-->"+maxval);
// 保存最小值
outkey.set("max_min");
outval.setKeyName(minkey);
outval.setKeyval(minval);
context.write(outkey, outval);
//System.out.println(minkey+"-->"+minval);
}
}
public static class MWReducer extends Reducer<Text, WordWritable, WordWritable, NullWritable> {
// 定义属性
private WordWritable outkey = new WordWritable();
// 定义临时变量
private String maxkey = "";
private Double maxval = 0D;
private String minkey = "";
private Double minval = 0D;
// 获取每个key的val
private Double tmpval = 0D;
//定义多目录输出
private MultipleOutputs<Text, DoubleWritable> outputs =null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//创建MultipleOutputs需要实例化对象
outputs = new MultipleOutputs(context);
}
@Override
protected void reduce(Text key, Iterable<WordWritable> values,
Reducer<Text, WordWritable, WordWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// 处理数据(最大值与最小值)
for (WordWritable w : values) {
// 求最小值
if(maxval<w.getKeyval()){
maxval=w.getKeyval();
maxkey=w.getKeyName();
}
//求最小值
if(minval<=0D){
minval=w.getKeyval();
minkey=w.getKeyName();
}
if(minval>w.getKeyval()){
minval=w.getKeyval();
minkey=w.getKeyName();
}
}
// 输出最大值与最小值
outkey.setKeyName(maxkey);
outkey.setKeyval(maxval);
outputs.write(new Text(maxkey), new DoubleWritable(maxval), "maxout/max");
context.write(outkey, NullWritable.get());
outkey.setKeyName(minkey);
outkey.setKeyval(minval);
outputs.write(new Text(minkey), new DoubleWritable(minval), "minout/min");
context.write(outkey, NullWritable.get());
System.out.println(maxkey+":"+minkey);
}
@Override
protected void cleanup(Reducer<Text, WordWritable, WordWritable, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
outputs.close();
}
}
@Override
public int run(String[] args) throws Exception {
// 创建方法的返回值
int count = 0;
// 创建核心配置对象
Configuration conf = this.getConf();
// 创建一个任务
Job job = Job.getInstance(conf, "maxword");
// 配置任务打包类
job.setJarByClass(MaxMinWord.class);
// 设置MapReduce类
job.setMapperClass(MWMapper.class);
job.setReducerClass(MWReducer.class);
// 设置 map 阶段 和 reduce 节点的 输出数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WordWritable.class);
job.setOutputKeyClass(WordWritable.class);
job.setOutputValueClass(NullWritable.class);
// 设置文件路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
// 设置hdfs操作对象
FileSystem fs = FileSystem.get(conf);
// 绑定文件输出输入目录
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 自动删除
if (fs.exists(out)) {
fs.delete(out, true);
// 提示
System.out.println(job.getJobName() + "'s Path Output is deleted");
}
// 执行
boolean con = job.waitForCompletion(true);
// 返回
if (con) {
System.out.println("成功");
} else {
System.out.println("失败");
}
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new MaxMinWord(), args));
}
}
扫描二维码关注公众号,回复:
8940358 查看本文章