MR中的输入/输出控制

MR中的输入控制(InputFormat和MultipleInputs)

InputFormat

  1. InputFormat(输入格式化器):MapReduce开始阶段,InputFormat用来产生InputSplit,并基于RecordReader把它切分成record,形成Mapper的输入
  2. MR内置的InputFormat:
    1)TextInputFormat:作为默认的文件输入格式,用于读取纯文本文件,文件被分为一系列LF或CR结束的行,key是每一行的位置偏移量,是LongWritable类型,value是这一行的内容,Text类型。
    2)KeyValueTextInputFormat:同样用于读取文本文件,如果行被分隔符(缺省是tab)分为两部分,则第一部分是key,剩下的部分是value;如果没有分隔符,则整行都是key,value为空。
    3)SequenceFileInputFormat:用于读取sequence file。sequence file是Hadoop用于存储数据自定义格式的binary文
    件。它有两个子类:SequenceFileAsBinaryInputFormat,将 key和value以BytesWritable的类型读出;
    SequenceFileAsTextInputFormat,将key和value以Text类型读出。
    4)SequenceFileInputFilter:根据filter从sequence文件中取得部分满足条件的数据,通过 setFilterClass指定Filter,内
    置了三种 Filter,RegexFilter取key值满足指定的正则表达式的记录;PercentFilter通过指定参数f,取记录行数%f0的
    记录;MD5Filter通过指定参数f,取MD5(key)%f
    0的记录。
    5)NLineInputFormat:0.18.x新加入,可以将文件以行为单位进行split,比如文件的每一行对应一个mapper。得到的
    key是每一行的位置偏移量(LongWritable类型),value是每一行的内容,Text类型。
    6)CompositeInputFormat:用于多个数据源的join。
  3. 为MR设置指定的InputFormat:job.setInputFormatClass(xxxInputFormat.class);
  4. 自定义的InputFormat
    • 内置的输入格式化器可以应对大部分需求,但在有些需求下,内置的输入格式化器无法满足,我们需要自己定义输入格式化器。
    • 所有的InputFormat都要直接或间接继承!InputFormat抽象类!(注:也有InputFormat接口,getSplit方法的返回不一样)
    • InputFormat抽象类中主要定义了如下两个方法:
	/**
	此方法返回一个RecordReader对象
	一个RecordReader包含方法描述,如何从InputSplit中切分出要送入Mapper的key-value对
	*/
	@Override
	public RecordReader<K, V> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
			throws IOException, InterruptedException {
		return null;
	}
	/**
	生产InputSplit集合的方法
	此方法接受JobContext接受环境信息,得到要处理的文件信息后,进行逻辑切割,产生InputSplit集合返回
	*/
	@Override
	public List<InputSplit> getSplits(JobContext arg0) throws IOException, InterruptedException {
		return null;
	}
  • 在更多的时候我们不会直接继承InputFormat,而是会选择继承它的一个实现子类,
  • 比如:FlieInputFormat–此类是所有来源为文件的InputFormat的基类,默认的TextInputFormat就是继承了它
  • FileInputFormat继承了InputFormat抽象类,1)实现了getSplits方法,根据配置的逻辑切割文件,返回InputSplit的集合,2)并提供了isSplitable()方法,子类可以用过在这个方法中返回boolean类型的值表明是否要对文件进行逻辑切割,如果返回false,则无论文件的大小是否超过一个Block都不会进行切割,而将这个文件作为一个逻辑块返回,3)而对createRecordReader方法则没有提供实现,设置为抽象方法,要求子类实现
  • 如果要更精细的改变逻辑切块规则则可以覆盖getSplit方法,自己编写代码,而更多的时候直接使用父类的方法,将精力放置在决定如何将InputSplit转化为一个个的recorder
  • 注意:FileInputFormat的导包:import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

案例

读取score1.txt文件,从中每四行读取成绩,其中第一行为姓名,后三行为单科成绩,计算总分,输出 姓名:总分格式的文件

张三 
语文 97
数学 77
英语 69
李四 
语文 87
数学 57
英语 63
王五 
语文 47
数学 54
英语 39
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ScoreInputFormat  extends FileInputFormat<Text, Text>{
	
	//不对文件做逻辑上的切割
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}

	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {
		return new ScoreRecordReader();
	};
}


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class ScoreRecordReader extends RecordReader<Text, Text>{
	
	private BufferedReader bfreader = null;
	private Text key=null;
	private Text value=null;
	private float progress=0f;
	
	/**
	 * RecordReader关闭前调用的方法,一般用来释放资源
	 */
	@Override
	public void close() throws IOException {
		bfreader.close();	
	}

	/**
	 * 如果nextKeyValue()方法返回true,则调用此方法获取当前的键
	 */
	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}
	/**
	 * 如果nextKeyValue()方法返回true,则调用此方法获取当前的值
	 */
	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	/**
	 * 获取当前的进度
	 */
	@Override
	public float getProgress() throws IOException, InterruptedException {
		return progress;
	}

	/**
	 * 初始化方法
	 * split:当前的切片
	 * context:当前上下文
	 */
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		FileSplit fis = (FileSplit) split;
		Path path = fis.getPath();
		FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
		FSDataInputStream inputStream = fileSystem.open(path);
		bfreader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
		
	}

	/**
	 * 读取下一个键值对,如果读取到返回true,否则返回false
	 */
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		String line1 = bfreader.readLine();
		if(line1!=null){
			key = new Text(line1);
			String line2 = bfreader.readLine();
			String line3 = bfreader.readLine();
			String line4 = bfreader.readLine();
			value = new Text(line2+"\r\n"+line3+"\r\n"+line4);
			return true;
		}
		progress=1.0f;
		return false;
	}
	
}


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ScoreMapper extends Mapper<Text, Text, Text, LongWritable> {

	@Override
	protected void map(Text key, Text value, Mapper<Text, Text, Text, LongWritable>.Context context)
			throws IOException, InterruptedException {
		String name = key.toString();
		long score = 0;
		String[] str = value.toString().split("\r\n");
		for(String s:str){
			score = Long.parseLong(s.split(" ")[1]);
			context.write(new Text(name), new LongWritable(score));
		}
	}
}


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ScoreReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

	@Override
	protected void reduce(Text key, Iterable<LongWritable> values,
			Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
		long score = 0;
		for(LongWritable value : values){
			score += Long.parseLong(value.toString());
		}
		context.write(key, new LongWritable(score));
	}

}


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ScoreDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "Score_Job");
		job.setJarByClass(cn.tedu.score.ScoreDriver.class);

		job.setMapperClass(ScoreMapper.class);
		job.setReducerClass(ScoreReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		job.setInputFormatClass(ScoreInputFormat.class);
		
		FileInputFormat.setInputPaths(job, new Path("hdfs://10.9.157.179:9000/node01/score1.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://10.9.157.179:9000/scoreResult"));

		if (!job.waitForCompletion(true))
			return;
	}

}

MultipleInputs

  1. MultipleInputs可以将多个输入组装起来,同时为Mapper提供数据,当有多个来源读取数据时使用。
  2. 甚至,在指定来源的同时还可以为不同来源的数据指定不同的InputFormat和Mapper以应对不同格式的输入数据。
  3. MultipleInputs:类上的方法
/**
指定数据来源及InputFormat
*/
MultipleInputs.addInputPath(job, path, inputFormatClass);
/**
指定数据来源及InputFormat和Mapper
*/
MultipleInputs.addInputPath(job, new Path("hdfs://10.9.157.179:9000/node01/score1.txt"), 
		ScoreInputFormat.class, ScoreMapper.class);

案例

改造上述案例,同时从另一个文件score2.txt中读取数据统计成绩。score2.txt中的数据是一行为一个学生的成绩

赵六 56 47 69
陈七 73 84 91
刘八 45 56 66
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ScoreInputFormat02 extends FileInputFormat<Text, Text>{

	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}
	
	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
			throws IOException, InterruptedException {
		return new ScoreRecordReader02();
	}

}


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class ScoreRecordReader02 extends RecordReader<Text, Text>{

	private BufferedReader bfreader = null;
	private Text key=null;
	private Text value=null;
	private float progress=0f;
	
	/**
	 * RecordReader关闭前调用的方法,一般用来释放资源
	 */
	@Override
	public void close() throws IOException {
		bfreader.close();	
	}

	/**
	 * 如果nextKeyValue()方法返回true,则调用此方法获取当前的键
	 */
	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}
	/**
	 * 如果nextKeyValue()方法返回true,则调用此方法获取当前的值
	 */
	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	/**
	 * 获取当前的进度
	 */
	@Override
	public float getProgress() throws IOException, InterruptedException {
		return progress;
	}

	/**
	 * 初始化方法
	 * split:当前的切片
	 * context:当前上下文
	 */
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		FileSplit fis = (FileSplit) split;
		Path path = fis.getPath();
		FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
		FSDataInputStream inputStream = fileSystem.open(path);
		bfreader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
		
	}

	/**
	 * 读取下一个键值对,如果读取到返回true,否则返回false
	 */
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		String line = bfreader.readLine();
		if(line!=null){
			String[] str = line.split(" ");
			key = new Text(str[0]);
			value = new Text(str[1]+" "+str[2]+" "+str[3]);
			return true;
		}
		progress=1.0f;
		return false;
	}

}


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ScoreMapper02 extends Mapper<Text, Text, Text, LongWritable> {

	@Override
	protected void map(Text key, Text value, Mapper<Text, Text, Text, LongWritable>.Context context)
			throws IOException, InterruptedException {
		String[] str = value.toString().split(" ");
		for(String s:str){
			context.write(key, new LongWritable(Long.parseLong(s)));
		}
	}

}


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class ScoreDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "Score_Job");
		job.setJarByClass(cn.tedu.score.ScoreDriver.class);

		job.setReducerClass(ScoreReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);


		MultipleInputs.addInputPath(job, new Path("hdfs://10.9.157.179:9000/node01/score1.txt"), 
				ScoreInputFormat.class, ScoreMapper.class);
		MultipleInputs.addInputPath(job, new Path("hdfs://10.9.157.179:9000/node01/score2.txt"), 
				ScoreInputFormat02.class, ScoreMapper02.class);
		
		FileOutputFormat.setOutputPath(job, new Path("hdfs://10.9.157.179:9000/score02Result"));

		if (!job.waitForCompletion(true))
			return;
	}

}

MR中的输出控制(OutputFormat和MultipleOutputs)

OutputFormat

  1. OutputFormat(输出格式化器):MapReduce结束阶段,OutputFormat类决定了Reducer如何产生输出。
  2. MR内置的OutputFormat:
    1)TextOutputFormat
    2)SequenceFileOutputFormat
    3)SequenceFileAsBinaryOutputFormat
    4)MapFileOutputFormt
  3. 为MR设置指定的OutputFormat:job.setOutputFormatClass(xxxOutputFormat.class);
  4. 自定义的InputFormat
    • 内置的输出格式化器可以应对大部分需求,但在有些需求下,内置的输出格式化器无法满足,我们需要自己定义输出格式化器。
    • 所有的OutputFormat都要直接或间接继承!OutputFormat抽象类!
    • 0utputFormat抽象类中主要定义了如下三个个方法:
	@Override
	public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
	}

	@Override
	public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
		return null;
	}

	@Override
	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext conetext) throws IOException, InterruptedException {
		return null;
	}
  • 在更多的时候我们不会直接继承OutputFormat,而是会选择继承它的一个实现子类,
  • 比如:FlieOutputFormat–此类是所有目的地为文件的OutputFormat的基类,默认的TextOutputFormat就是继承了它
  • FileOutputFormat继承了OutputFormat抽象类,实现了checkOutputSpecs和getOutputCommitter方法,并将get=RecordWriter方法设置为抽象方法要求子类实现。

案例

输出格式按“#”分隔,输出为一行

import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ScoreOutputFormat extends FileOutputFormat<Text, LongWritable>{

	@Override
	public RecordWriter<Text, LongWritable> getRecordWriter(TaskAttemptContext context)
			throws IOException, InterruptedException {
		Path path = getDefaultWorkFile(context, "");
		FileSystem fs = path.getFileSystem(context.getConfiguration());
		FSDataOutputStream out = fs.create(path, false);
		return new ScoreRecordWriter(out);
	}

}


import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class ScoreRecordWriter extends RecordWriter<Text, LongWritable>{

	private DataOutputStream out = null;
	
	public ScoreRecordWriter(DataOutputStream out){
		this.out=out;
	}
	
	@Override
	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
		out.close();
	}

	@Override
	public void write(Text key, LongWritable value) throws IOException, InterruptedException {
		out.write((key.toString()+"~"+value.get()+"#").getBytes());
	}

}


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class ScoreDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "Score_Job");
		job.setJarByClass(cn.tedu.score.ScoreDriver.class);

		job.setReducerClass(ScoreReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		job.setOutputFormatClass(ScoreOutputFormat.class);

		MultipleInputs.addInputPath(job, new Path("hdfs://10.9.157.179:9000/node01/score1.txt"), 
				ScoreInputFormat.class, ScoreMapper.class);
		MultipleInputs.addInputPath(job, new Path("hdfs://10.9.157.179:9000/node01/score2.txt"), 
				ScoreInputFormat02.class, ScoreMapper02.class);
		
		FileOutputFormat.setOutputPath(job, new Path("hdfs://10.9.157.179:9000/score03Result"));

		if (!job.waitForCompletion(true))
			return;
	}

}

MultipleOutputs

  1. MultipleOutputs可以令一个Reducer产生多个输出文件
  2. 主要方法:
/**
为特定kv打上指定标记
*/
MultipleOutputs<Text,LongWritable> mos = new MultipleOutputs<Text,LongWritable>(context);
mos.write("flag", key, value);
/**
为指定标记内容指定输出方式
可以指定多个
*/
MultipleOutputs.addNamedOutput(job, "flag", XxxOutputFormat.class, Key.class, Value.class);

案例

改造words案例,将首字母为a-j的输出到"small"中,其他输出到"big"中

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class Wc2Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private MultipleOutputs<Text, IntWritable> mos = null;
        @Override
        protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                        throws IOException, InterruptedException {
                 mos = new MultipleOutputs<>(context);
        }
        
        @Override
        protected void reduce(Text k3, Iterable<IntWritable> v3s,
                        Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                int count = 0;
                for(IntWritable v3 : v3s){
                        count += v3.get();
                }
                        
                String word = k3.toString();
                if(word.matches("^[a-j].*$")){
                        mos.write("small", new Text(word), new IntWritable(count));
                }else{
                        mos.write("big", new Text(word), new IntWritable(count));
                }
        
        }
        
}


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Wc2Driver {

        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf, "Wc2_Job");
                job.setJarByClass(cn.tedu.mr.outputformat.wc2.Wc2Driver.class);
                job.setMapperClass(cn.tedu.mr.outputformat.wc2.Wc2Mapper.class);
                job.setReducerClass(cn.tedu.mr.outputformat.wc2.Wc2Reducer.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                
                FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/wcdata"));
                
                MultipleOutputs.addNamedOutput(job, "small", Wc2OutputFormat.class, Text.class, IntWritable.class);
                MultipleOutputs.addNamedOutput(job, "big", TextOutputFormat.class, Text.class, IntWritable.class);
                FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/wcresult"));

                if (!job.waitForCompletion(true))
                        return;
        }

}

GroupingComparator

  • 在MR的Shuffle过程中,包含sort group操作,其依据默认是key的comparaTo方法来实现,也可以额外配置job.setGroupingComparatorClass;

案例

改造words案例,实现统计a-h和i-z开头的单词数量统计

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Wc3Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        
        @Override
        protected void map(LongWritable k1, Text v1, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                        throws IOException, InterruptedException {
                String [] words = v1.toString().split(" ");
                for(String word : words){
                        context.write(new Text(word), new IntWritable(1));
                }
        }
        
}


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Wc3Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        protected void reduce(Text k3, Iterable<IntWritable> v3s,
                        Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                int count = 0;
                for(IntWritable i : v3s){
                        count += i.get();
                }
                if(k3.toString().matches("^[a-h].*$")){
                        context.write(new Text("a-h"), new IntWritable(count));
                }else{
                        context.write(new Text("i-z"), new IntWritable(count));
                }
        }

}


import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Text.Comparator;

public class Wc3Comparator extends Comparator {
        
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
                try {
                        DataInput in = new DataInputStream(new ByteArrayInputStream(b1,s1,l1));
                        Text ta = new Text();
                        ta.readFields(in);
                        
                        DataInput in2 = new DataInputStream(new ByteArrayInputStream(b2,s2,l2));
                        Text tb = new Text();
                        tb.readFields(in2);
                        
                        if(ta.toString().matches("^[a-h].*$") && tb.toString().matches("^[a-h].*$")){
                                return 0;
                        }else if(ta.toString().matches("^[i-z].*$") && tb.toString().matches("^[i-z].*$")){
                                return 0;
                        }else{
                                return ta.compareTo(tb);
                        }
                } catch (IOException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                }
        }
}


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Wc3Driver {

        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf, "wc3_job");
                job.setJarByClass(cn.tedu.mr.gc.wc3.Wc3Driver.class);
                job.setMapperClass(cn.tedu.mr.gc.wc3.Wc3Mapper.class);
                job.setReducerClass(cn.tedu.mr.gc.wc3.Wc3Reducer.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);

                //--设置比较器
                job.setGroupingComparatorClass(Wc3Comparator.class);
                
                FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/wcdata"));
                FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/wcresult"));

                if (!job.waitForCompletion(true))
                        return;
        }

}

猜你喜欢

转载自blog.csdn.net/weixin_43652369/article/details/84109284