MapReduce之多目录输出

多目录输出:

输出两个文件一个是最大值,一个最小值。

主要操作reduce阶段

1.在reduce阶段定义多目录输出对象

private MutipleOutPuts<Text,DouWritable> outputs =null;

2.在setup()创建多目录输出对象 需要context支持

outputs = new MultipleOutputs(context);   

3.在reduce用多目录输出对象生成多目录

outputs.write(new Text(maxkey), new DoubleWritable(maxval), "maxout/max");

outputs.write(new Text(minkey), new DoubleWritable(minval), "minout/min");

代码:

package com.hnxy.mr.Multiple;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.hnxy.mr.entity.WordWritable;

public class MaxMinWord extends Configured implements Tool {
	private static class MWMapper extends Mapper<LongWritable, Text, Text, WordWritable> {
		// 定义属性
		private Text outkey = new Text();
		private WordWritable outval = new WordWritable();
		// 定义临时变量
		private String maxkey = "";
		private Double maxval = 0D;
		private String minkey = "";
		private Double minval = 0D;

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, WordWritable>.Context context)
				throws IOException, InterruptedException {
			// 获得数据
			String[] star = value.toString().split("\t");
			// 判断是否符合要求
			if(star.length ==7 && null != star){
				// 判断最大值
				if(maxval < Double.parseDouble(star[6])){
					maxval = Double.parseDouble(star[6]);
					maxkey = star[3];
					// 判断最小值
				}				
				if(minval <= 0D){
					minval = Double.parseDouble(star[6]);
					minkey = star[3];
				}
				if(minval > Double.parseDouble(star[6])){
					minval = Double.parseDouble(star[6]);
					minkey = star[3];
				}	
			}else{
				context.getCounter("erro_line", "word_line").increment(1);
			}		
				//System.out.println(maxkey+"-->"+maxval);
				//System.out.println(minkey+"-->"+minval);
		}
		@Override
		protected void cleanup(Mapper<LongWritable, Text, Text, WordWritable>.Context context)
				throws IOException, InterruptedException {
			
			outkey.set("max_min");
			outval.setKeyName(maxkey);
			outval.setKeyval(maxval);
			context.write(outkey, outval);
			//System.out.println(maxkey+"-->"+maxval);
			// 保存最小值
			outkey.set("max_min");
			outval.setKeyName(minkey);
			outval.setKeyval(minval);
			context.write(outkey, outval);
			//System.out.println(minkey+"-->"+minval);
			
		}
	}

	public static class MWReducer extends Reducer<Text, WordWritable, WordWritable, NullWritable> {
		// 定义属性
		private WordWritable outkey = new WordWritable();
		// 定义临时变量
		private String maxkey = "";
		private Double maxval = 0D;
		private String minkey = "";
		private Double minval = 0D;

		// 获取每个key的val
		private Double tmpval = 0D;
		//定义多目录输出
		private MultipleOutputs<Text, DoubleWritable> outputs =null;
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			//创建MultipleOutputs需要实例化对象
			 outputs = new MultipleOutputs(context);			 
		}
		@Override
		protected void reduce(Text key, Iterable<WordWritable> values,
				Reducer<Text, WordWritable, WordWritable, NullWritable>.Context context)
				throws IOException, InterruptedException {
			// 处理数据(最大值与最小值)
				
				for (WordWritable w : values) {
					// 求最小值
					if(maxval<w.getKeyval()){
						maxval=w.getKeyval();
						maxkey=w.getKeyName();
					}
					//求最小值
					if(minval<=0D){
						minval=w.getKeyval();
						minkey=w.getKeyName();
					}
					if(minval>w.getKeyval()){
						minval=w.getKeyval();
						minkey=w.getKeyName();
					}
				}
				// 输出最大值与最小值
			outkey.setKeyName(maxkey);
			outkey.setKeyval(maxval);
			outputs.write(new Text(maxkey), new DoubleWritable(maxval), "maxout/max");
			context.write(outkey, NullWritable.get());	
			outkey.setKeyName(minkey);
			outkey.setKeyval(minval);
			outputs.write(new Text(minkey), new DoubleWritable(minval), "minout/min");
			context.write(outkey, NullWritable.get());	
			System.out.println(maxkey+":"+minkey);
			
		}
		@Override
		protected void cleanup(Reducer<Text, WordWritable, WordWritable, NullWritable>.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			outputs.close();
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		// 创建方法的返回值
		int count = 0;
		// 创建核心配置对象
		Configuration conf = this.getConf();
		// 创建一个任务
		Job job = Job.getInstance(conf, "maxword");
		// 配置任务打包类
		job.setJarByClass(MaxMinWord.class);
		// 设置MapReduce类
		job.setMapperClass(MWMapper.class);
		job.setReducerClass(MWReducer.class);
		// 设置 map 阶段 和 reduce 节点的 输出数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(WordWritable.class);
		job.setOutputKeyClass(WordWritable.class);
		job.setOutputValueClass(NullWritable.class);
		// 设置文件路径
		Path in = new Path(args[0]);
		Path out = new Path(args[1]);
		// 设置hdfs操作对象
		FileSystem fs = FileSystem.get(conf);
		// 绑定文件输出输入目录
		FileInputFormat.addInputPath(job, in);
		FileOutputFormat.setOutputPath(job, out);
		// 自动删除
		if (fs.exists(out)) {
			fs.delete(out, true);
			// 提示
			System.out.println(job.getJobName() + "'s Path Output is deleted");
		}
		// 执行
		boolean con = job.waitForCompletion(true);
		// 返回
		if (con) {
			System.out.println("成功");
		} else {
			System.out.println("失败");
		}
		return 0;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new MaxMinWord(), args));
	}

}
扫描二维码关注公众号,回复: 8940358 查看本文章
发布了95 篇原创文章 · 获赞 0 · 访问量 1070

猜你喜欢

转载自blog.csdn.net/weixin_43006131/article/details/103098752