Map/Reduce 单元测试
world count 测试
package com.irwin.hadoop; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mrunit.mapreduce.MapDriver; import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; import org.apache.hadoop.util.ReflectionUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import com.irwin.hadoop.WordCount.TokenizerMapper; import com.irwin.hadoop.WordCount.IntSumReducer; public class WordCountTest extends Assert{ private MapDriver<Object, Text, Text, IntWritable> mapDriver; private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver; private static final IntWritable one = new IntWritable(1); MapReduceDriver<Object, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver; @Before public void setUp(){ Mapper<Object, Text, Text, IntWritable> mapper = new TokenizerMapper(); mapDriver = MapDriver.newMapDriver(mapper); Reducer<Text, IntWritable, Text, IntWritable> reducer = new IntSumReducer(); reduceDriver = ReduceDriver.newReduceDriver(reducer); mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer); } @Test public void testReducer() throws Exception{ List<IntWritable> values = new ArrayList<IntWritable>(); values.add(new IntWritable(1)); values.add(new IntWritable(1)); reduceDriver.withInput(new Text("hello"), values); reduceDriver.withOutput(new Text("hello"), new IntWritable(2)); reduceDriver.runTest(); } @Test public void testMapReduce() throws IOException { mapReduceDriver.withInput(new LongWritable(1), new Text("cat cat dog")); mapReduceDriver.addOutput(new Text("cat"), new IntWritable(2)); mapReduceDriver.addOutput(new Text("dog"), new IntWritable(1)); mapReduceDriver.runTest(); } @Test public void testMapper() throws Exception { LineRecordReader reader = getBytesRecordReader(); int counter = 0; while (reader.nextKeyValue()) { mapDriver.withInput(reader.getCurrentKey(),reader.getCurrentValue()); StringTokenizer itr = new StringTokenizer(reader.getCurrentValue().toString()); while (itr.hasMoreElements()) { mapDriver.withOutput(new Text(itr.nextToken()), one); } counter++; } assertEquals(3, counter); // auto assert output mapDriver.runTest(); } private static LineRecordReader getBytesRecordReader()throws IOException, InterruptedException { Configuration conf = new Configuration(false); conf.set("fs.default.name", "file:///"); File testFile = new File("src\\test\\resources\\test_data\\text\\hello.text"); Path path = new Path(testFile.getAbsoluteFile().toURI()); FileSplit split = new FileSplit(path, 0, testFile.length(), null); TextInputFormat inputFormat = ReflectionUtils.newInstance(TextInputFormat.class, conf); TaskAttemptContext context = new TaskAttemptContextImpl(conf,new TaskAttemptID()); LineRecordReader reader = (LineRecordReader) inputFormat.createRecordReader(split, context); reader.initialize(split, context); return reader; } }