我们需要将文本使用mapreduce整理成需求的样子。也就是单词加文件名的样子,那我们又怎么能知道map方法现在调用的这行数据属于哪个文件呢?调用这个方法的是mapworker,而且调用这个map方法的还不止一个worker。虽然调用map方法的有很多worker但是,每个worker再调用方法的时候都会传一个context而这个context就包含了这个worker正在读的数据属于哪个文件。因为这个worker肯定知道自己读的是哪一部分数据。这样worker所读取的数据就有一个专业的术语:切片。当文件很小时,一个切片很可能就是一个文件,可是当文件很大时,一个文件就会分成几个切片,分别交给几个worker去处理。
所以再捋一下思路:worker调用map方法时传递的context包含这一行数据所属的切片。通过这个切片,我们就可以找到所属的文件。
我们可以通过context.getInputSplit();拿到对切片信息的描述。InputSplit这个类是一个抽象类,而我们要拿到的是对这个抽象类的实现,这里我们要用的是FileSplit,我们现在写的代码基本上都是针对文件,但是mapreduce不仅仅能处理文本文件,它还可以处理比如数据库文件之类的。FileSplit导包的时候导包名长的那个。拿到FileSplit切片信息之后我们就可以通过fileSplit.getPath().getName()拿到文件名。
一个切片对应一个worker,一个切片中的一行数据对应一个map方法,所以这两行代码我们不需要写在map方法里,我们我们应该写在map方法外。因为一个worker只需要拿一次文件名就行。之前我们用过cleanup方法,这个方法是在一个worker结束的时候才会去做cleanup方法里面的操作。同样的,worker在正式进行操作之前也会调用setup方法做一些初始化的操作。然后才真正的去拿数据处理。所以我们可以将这两行代码写在这个方法中。
以下贴出代码:
package com.test.wordcount4;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount1 {
public static class fileMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable();
String fileName = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
fileName = inputSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(" ");
for (String str : split) {
k.set(str+"-"+fileName);
v.set(1);
context.write(k,v);
}
}
}
public static class fileReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count+=value.get();
}
v.set(count);
context.write(key, v);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount1.class);
job.setMapperClass(fileMapper.class);
job.setReducerClass(fileReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileSystem fs = FileSystem.get(conf);
Path p = new Path(args[1]);
if(fs.exists(p)){
fs.delete(p, true);
}
FileOutputFormat.setOutputPath(job, p);
job.setNumReduceTasks(2);
boolean res = job.waitForCompletion(true);
System.out.println(res?"mr程序成功执行":"mr程序好像被外星人抓走了");
}
}
package com.test.wordcount4;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount2 {
public static class fileMapper extends Mapper<LongWritable, Text, Text, Text>{
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("-");
k.set(split[0]);
v.set(split[1]);
context.write(k, v);
}
}
public static class fileReducer extends Reducer<Text, Text, Text, Text>{
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text value : values) {
sb.append(value.toString()).append(",");
}
v.set(sb.toString());
context.write(key, v);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount2.class);
job.setMapperClass(fileMapper.class);
job.setReducerClass(fileReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileSystem fs = FileSystem.get(conf);
Path p = new Path(args[1]);
if(fs.exists(p)){
fs.delete(p, true);
}
FileOutputFormat.setOutputPath(job, p);
job.setNumReduceTasks(2);
boolean res = job.waitForCompletion(true);
System.out.println(res?"mr程序成功执行":"mr程序好像被外星人抓走了");
}
}
第一段代码的运行结果:
第二段代码的运行结果: