0. 问题TF-IDF
- 实现统计多个文档中一个单词出现的频数和出现在哪个文档中
- 在map中读取当前文档的每一行数据,得到当前文档路径
- mapkey(单词:文档路径),mapvalue(数值1)
- 在map端设置Combiner类(整合数据,减少向reduce端传输数据的网络开销)
- 将map的输出重新组合输出<单词,文档路径:单词频数>
- 输入文件为三篇txt英文文档
- 输出文件格式为 单词 文档1:频数 文档2:频数….
computation a2.txt:1
computers a1.txt:1 a3.txt:1
data a1.txt:1
deliver a2.txt:1
1. 主方法
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration cfg=new Configuration()
Job job = Job.getInstance(cfg)
job.setJarByClass(ReverseSort0606.class)
job.setMapperClass(reMaper.class)
job.setReducerClass(reReduceer.class)
job.setCombinerClass(MyCom.class)
//job的输出key-value
job.setOutputKeyClass(Text.class)
job.setOutputValueClass(Text.class)
job.setMapOutputKeyClass(Text.class)
job.setMapOutputValueClass(Text.class)
//FileSystem 获取文件系统,可以删除已存在的输出目录
Path outpath=new Path("d:\\mr\\outresort")
FileSystem fs=FileSystem.get(cfg)
if (fs.exists(outpath)){
fs.delete(outpath,true)
}
//输入路径和输出路径的设置
FileInputFormat.addInputPath(job, new Path("d:\\mr\\input\\resort"))
FileOutputFormat.setOutputPath(job,outpath)
System.exit(job.waitForCompletion(true)?0:1)
}
2. map
static class reMaper extends Mapper<LongWritable,Text,Text,Text> {
private Text mkey=new Text();
private Text mvalue=new Text("1");
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("map");
FileSplit fs =(FileSplit) context.getInputSplit();
String path = fs.getPath().getName();
String[] lines = value.toString().split("\\s|, |\\. ");
for (String word : lines) {
mkey.set(word+":"+path);
context.write(mkey,mvalue);
}
}
}
3. 自定义Combiner类, 实现map端的数据整合
static class MyCom extends Reducer<Text,Text,Text,Text>{
private Text ckey=new Text();
private Text cvalue=new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
System.out.println("combiner");
String[] lines = key.toString().split(":");
int sum=0;
for (Text value : values) {
sum+=new Integer(value.toString());
}
ckey.set(lines[0]);
cvalue.set(lines[1]+":"+sum);
context.write(ckey,cvalue);
}
}
4. reduce
static class reReduceer extends Reducer<Text,Text,Text,Text> {
private Text rvalue=new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
System.out.println("reduce");
StringBuffer buf=new StringBuffer();
for (Text value : values) {
buf.append(value.toString()+" ");
}
rvalue.set(buf.toString());
context.write(key,rvalue);
}
}