输入:三个文件分别如下,并放在c文件夹下
xm@master:~/workspace$ hadoop fs -text /c/file1
MapReduce is simple
xm@master:~/workspace$ hadoop fs -text /c/file2
MapReduce is powerful is simple
xm@master:~/workspace$ hadoop fs -text /c/file3
Hello MapReduce bye MapReduce
输出:
Hello | file3 | 0.11928031367991561
MapReduce | file3 | 0.0| file2 | 0.0| file1 | 0.0
bye | file3 | 0.11928031367991561
is | file2 | 0.0704365036222725| file1 | 0.058697086351893746
powerful | file2 | 0.09542425094393249
simple | file2 | 0.03521825181113625| file1 | 0.058697086351893746
实现原理请参照上一个博客
实现代码:
package Inverted
import java.io.IOException
import java.util.StringTokenizer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.GenericOptionsParser
public class InvertedIndex {
static String INPUT_PATH = "hdfs://master:9000/c"
static String OUTPUT_PATH = "hdfs://master:9000/output"
private static double file_num = 0
private static int word_sum = 0
//key=单词名:所在文件名:文件中单词总数_ value=1 实现单词计数 //求得总文件数file_num
static class Map extends Mapper<Object,Object,Text,Text>{
private Text keyInfo = new Text()
private Text valueInfo = new Text()
private FileSplit split
String k = ""
//求得总文件数file_num
protected void setup(Context context) throws IOException, InterruptedException{
FileSplit fs = (FileSplit) context.getInputSplit()
k = fs.getPath().getName()
file_num = file_num+1
}
//key=单词名:所在文件名:文件中单词总数_ value=1 实现单词计数
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{
//求文件中单词总数
StringTokenizer itr2 = new StringTokenizer(value.toString())
word_sum = 0
while(itr2.hasMoreElements()){
itr2.nextToken()
word_sum++
}
split = (FileSplit)context.getInputSplit()
StringTokenizer itr = new StringTokenizer(value.toString())
while(itr.hasMoreTokens()){
int splitIndex = split.getPath().toString().indexOf("file")
//key=单词名:所在文件名:文件中单词总数_ value=1 实现单词计数
keyInfo.set(itr.nextToken()+":"+split.getPath().toString().substring(splitIndex))
keyInfo.set(keyInfo.toString()+":"+word_sum)
//value--->1
valueInfo.set("1")
context.write(keyInfo, valueInfo)
}
}
}
//写入key=单词名 value=文件名:词频
static class Combine extends Reducer<Text,Text,Text,Text>{
private Text info = new Text()
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException{
//求得单词出现总数
double sum=0
for(Text value:values){
sum+=Integer.parseInt(value.toString())
}
//求得文件中单词数/文件单词总数
String arr[] = key.toString().split(":")
double a = Double.parseDouble(arr[2])
double b = sum/a
// String result = String .format("%.2f",b)
info.set(arr[1]+":"+b)
key.set(arr[0])
//写入key=单词名 value=文件名:词频
context.write(key, info)
}
}
//两次遍历values,第一遍取得该单词出现的文件数,第二遍求得IF-IDF
static class Reduce extends Reducer<Text,Text,Text,Text>{
private Text result = new Text()
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException{
//The number of words exist in files
double file_sum = 0
String fileList = new String()
String []ar = new String[10]
double []dr = new double[10]
int i=0
for(Text value:values){
file_sum++
String[] arr2 = value.toString().split(":")
double c = Double.parseDouble(arr2[1])
ar[i] = arr2[0]
dr[i] = c
i++
}
for(int y=0
if(ar[y]!=null&&dr[y]!=0){
dr[i] = dr[i]*Math.log10(file_num/file_sum)
fileList = fileList+"| "+ar[y]+" | "+dr[y]*Math.log10(file_num/file_sum)
}
}
result.set(fileList)
context.write(key, result)
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Path outputpath = new Path(OUTPUT_PATH)
Configuration conf = new Configuration()
FileSystem fs = outputpath.getFileSystem(conf)
if(fs.exists(outputpath)){
fs.delete(outputpath,true)
}
conf.set("fs.default.name ", "hdfs://master:9000/")
Job job = Job.getInstance(conf)
job.setJarByClass(InvertedIndex.class)
job.setCombinerClass(Reduce.class)
FileInputFormat.setInputPaths(job, INPUT_PATH)
FileOutputFormat.setOutputPath(job, outputpath)
job.setMapperClass(Map.class)
job.setCombinerClass(Combine.class)
job.setReducerClass(Reduce.class)
job.setMapOutputKeyClass(Text.class)
job.setMapOutputValueClass(Text.class)
job.setOutputKeyClass(Text.class)
job.setOutputValueClass(Text.class)
job.waitForCompletion(true)
}
}