Mapreduce《案例之倒排索引》
源数据:
1)file1:
MapReduce is simple
2)file2:
MapReduce is powerful is simple
3)file3:
Hello MapReduce bye MapReduce
要实现的结果:
样例输出如下所示。
MapReduce file1.txt:1;file2.txt:1;file3.txt:2;
is file1.txt:1;file2.txt:2;
simple file1.txt:1;file2.txt:1;
powerful file2.txt:1;
Hello file3.txt:1;
bye file3.txt:1;
=========================JAVA CODE======================
package gq;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*
* Class Description:分词小demo测试类
*
* Author:gaoqi
*
* Date:2015年6月5日 下午2:03:08
*
*/
public class FenCi {
public static class Map extends Mapper<LongWritable, Text, Text, Text>{
private FileSplit fileSplit;
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
fileSplit = (FileSplit) context.getInputSplit();
StringTokenizer stk = new StringTokenizer(value.toString().trim());
while(stk.hasMoreElements()){
String v = stk.nextToken();
String path = fileSplit.getPath().toString();
String filename = path.substring(path.indexOf("file"));
context.write(new Text(v+":"+filename), new Text("1"));
}
}
}
public static class Combiner extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
int sum = 0;
for(Text t :values){
sum += Integer.parseInt(t.toString());
}
String v = key.toString();
context.write(new Text(v.substring(0,v.indexOf(":"))), new Text(v.substring(v.indexOf(":")+1)+":"+sum));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
Iterator<Text> its = values.iterator();
String value = "";
while(its.hasNext()){
value+=its.next()+";";
}
context.write(key, new Text(value));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = new Job(conf,"FenCi");
job.setJarByClass(FenCi.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("hdfs://h0:9000/user/tallqi/in/inputInvertedIndex"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://h0:9000/user/tallqi/in/outputFenci"));
System.exit(job.waitForCompletion(true)?0:1);
}
}