要求:
package com.wqs.invertedIndex;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class Map extends Mapper<LongWritable, Text, Text, Text>{
private static Text word = new Text();
private static Text one = new Text();
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException ,InterruptedException {
String fileName = ((FileSplit)context.getInputSplit()).getPath().getName();
StringTokenizer st = new StringTokenizer(value.toString());
while(st.hasMoreTokens()){
word.set(st.nextToken()+"\t"+fileName);
context.write(word, one);
}
}
}
package com.wqs.invertedIndex;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Reduce extends Reducer<Text, Text, Text, Text>{
private static StringBuilder sub = new StringBuilder(256);
private static Text index = new Text();
protected void reduce(Text word, Iterable<Text> values, Context context)
throws java.io.IOException ,InterruptedException {
for (Text v : values) {
sub.append(v.toString()).append(",");
}
index.set(sub.toString());
context.write(word, index);
sub.delete(0,sub.length());
}
}
package com.wqs.invertedIndex;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Combine extends Reducer<Text, Text, Text, Text>{
private static Text word = new Text();
private static Text index = new Text();
@SuppressWarnings("unused")
protected void reduce(Text key, Iterable<Text> values, Context context)
throws java.io.IOException ,InterruptedException {
String[] splits = key.toString().split("\t");
if(splits.length != 2){
return ;
}
long count = 0;
for (Text v : values) {
count++;
}
word.set(splits[0]);
index.set(splits[1]+"->"+count);
context.write(word, index);
}
}
package com.wqs.invertedIndex;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WC45Partitioner extends Partitioner<Text, Text> {
private static int partitionNumber = 0;
@Override
public int getPartition(Text key, Text value, int numPartitions) {
String word = key.toString().trim();
if(word.length()==0) return 0;
char firstchar = Character.toUpperCase(word.charAt(0));
if(firstchar>='A'&&firstchar<='M') {
partitionNumber = 0;
}
else if (firstchar>='N'&&firstchar<='Z') {
partitionNumber = 1;
}
else {
partitionNumber = 2;
}
return partitionNumber;
}
}
package com.wqs.invertedIndex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Main {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
System.setProperty("hadoop.home.dir", "E:/hadoop-2.7.7");
args = new String[] { "/demo01/in/", "/demo01/out" };
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage:InvertedIndex");
System.exit(2);
}
Job job = Job.getInstance();
job.setJarByClass(Main.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Combine.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(WC45Partitioner.class);
job.setNumReduceTasks(3);
FileInputFormat.addInputPath(job, new Path("hdfs://192.168.222.128:9000" + args[0]));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.222.128:9000" + args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
DFS目录结构: