这是学习MR编程的一个典型模型,这里分享一下。
wordCount需求是统计文件中每个单词出现的次数。
处理过程中主要是将作业拆分成Map阶段(Mapper tasks)和Reduce阶段(Reducer tasks),数据转换过程如下:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, {v2,v2…}> -> reduce -> <k3, v3> (output)
我的理解其流程是:
一个文件被拆分成多个block(与blocksize对应),每个block由一个map来处理,给每个单词计数为1,再经过shuffling操作,将相同的单词放在一起,最后通过reduce统计每一块中相同单词的数量,然后输出到文件系统(可以是本地也可以是HDFS)。
具体实现代码和详细注释如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 使用MapReduce开发WordCount
* 要注意hadoop和Java之间的转化
*/
public class WordCountApp2 {
//自定义一个map类,读取文件
public static class MyMaper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable one=new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//map的参数中:key是指每行偏移量,value是指每行单词,这是map的输入,即<k1,v1>
// 接受到的每行数据转换成字符串
String line=value.toString();
//按照指定分隔符进行分隔
String[] words=line.split(" ");
//输出结果,<单词,个数1>,即<k2,v2>,
for(String word: words){
context.write(new Text(word), one);
}
}
}
//自定义一个reduce类,归并
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//reduce参数中values是一个集合,表示相同单词出现的次数,可能有好几个1,所以要求和
long sum=0;
for(LongWritable value:values){
//统计单词key出现的次数
sum+=value.get();
}
//最终统计结果的输出,即<k3,v3>,
context.write(key,new LongWritable(sum));
}
}
//封装MapReduce作业的所有信息
public static void main(String[] args) throws Exception{
//1.创建 Configuration
Configuration configuration=new Configuration();
//创建Job之前,准备清理已经存在的输出目录
Path outputPath= new Path(args[1]);
FileSystem fileSystem =FileSystem.get(configuration);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath,true);
System.out.println("输出文件夹存在且已被删除");
}
//2.创建job,通过getInstance 拿到一个实例
Job job=Job.getInstance(configuration,"wordCount");
//3.设置Job的处理类
job.setJarByClass(WordCountApp2.class);
//4.设置作业处理的输入路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
//************5.设置map相关参数
//设置map的处理类
job.setMapperClass(MyMaper.class);
//设置map输出参数的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//************6.设置reduce相关参数
//设置reduce的处理类
job.setReducerClass(MyReducer.class);
//设置reduce输出参数的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//7.设置作业的输出路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//8.提交结果
//参数true表示将运行进度等信息及时输出给用户,false的话只是等待作业结束
boolean result=job.waitForCompletion(true);
System.exit(result? 0: 1);
}
}