这篇来说说编写一个MapReduce程序都需要做哪些事情.
大体上要编写三个模块:map类,reduce类,以及驱动.
Map
我们编写的map类需要继承org.apache.hadoop.mapreduce.Mapper.在打开Mapper类之后,会看到几个方法以及一个类:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
有一个Context类,这个类在编写reduce类需要继承的org.apache.hadoop.mapreduce.Reducer类中也存在.只不过Mapper中的Context实现的是MapContext类,Reducer中实现的是ReduceContext类.
然后还包含4个方法:setup(),map(),cleanup(),run().我们可以在run()中看到其他三个方法的执行顺序:先执行setup(),在try{}中执行map(),在finally中执行cleanup().都将Context实例作为参数传入.
setup()主要用来执行map()之前的一些准备工作,比如要进行join操作的时候,就可以利用setup()先将小表加载进来.默认方法体中没有任何代码,我们如果需要前期准备工作,就需要重写它.
接下来看看try{}中的map().可以通过方法名看出具体的逻辑:当有下一个KV对,就执行一次map(),map()中有3个参数:当前的key,当前的value,Context实例.看起来有点像迭代器:
//假设存在一个Iterator对象it.
whille(it.hasNext()){
it.next();
//其他代码
}
nextKeyValue()跟hasNext()很相似,map()跟next()很相似,当然Context并没有实现迭代器接口,只是看起来相似.
将参数传入之后,就执行我们编写的map()中的逻辑对数据进行处理.
最后是cleanup(),既然在finally{}中,回想之前的异常处理,就表示无论如何都要执行一次,默认同setup()一样,没有任何代码,如果需要在map()执行完毕后需要一些善后工作,我们需要重写这个方法.
然后我们来看看我们自己编写的map,这里以我写的词频统计为例,输入文件为Hadoop中的README.txt:
class Map extends Mapper<LongWritable,Text,Text,LongWritable>{
// 1 2 3 4
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String word: words)
context.write(new Text(word),new LongWritable(1));
}
}
这里没有重写setup()与cleanup(),只重写了map().特殊情况下,map()也不需要重写,直接利用Mapper提供的map即可.
首先看Mapper后面的四个泛型,意义分别是:1.map()输入key类型,2.map()输入value类型,3.map()输出key类型,4.map()输出value类型.一般来说,由于map()的处理结果需要传输至reduce(),所以3和4需要和reduce类的1和2相匹配.
参数1,2被传入map(),作为输入KV的类型.中间是对输入的KV进行处理,此处是将传入的value用空格切割,返回一个String数组,在这里,输入的key是行号,value是文本文档中的一行文本.最后有一个context.write(),两个参数的类型应该与参数3,4匹配,此处将处理结果写入至Context,然后传输至reduce.这里context.write()就是将每个单词创建一个Text对象作为输出key,创建一个LongWritable对象,将值设为1,作为输出value,在reduce()中,会对同key的value进行累加,以获得(单词,出现次数)这样的结果.
Reduce
我们编写的reduce同样需要继承org.apache.hadoop.mapreduce.Reducer,我们也来看看它的源码:
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
public abstract class Context
implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
除了Context实现换成了ReduceContext,与run()中的逻辑略有不同之外,其余的都跟Mapper中一样.
再来看看词频统计的reduce,同样是使用我编写的代码为例:
class Reduce extends Reducer<Text,LongWritable,Text,LongWritable>{
// 1 2 3 4
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
long count = 0;
for (LongWritable number: values)
count += number.get();
context.write(key,new LongWritable(count));
}
}
同样没有重写setup()与cleanup().传入的KV的类型,参数1,2对应了map的参数3,4.而最终的输出结果,参数3,4就可以定义为自己所需要的类型.整个方法的逻辑是,首先定义一个long类型的数据用来统计数据,在for循环中进行累加,最后在context.write()中写入key与它出现的次数.
驱动
public class WordCount {
public static void main(String[] args) throws Exception{
String inputPath = "E:\\Hadoop\\READMEtxt"; //1
String outputPath = "E:\\Hadoop\\output"; //2
Job job = Job.getInstance(new Configuration()); //3
job.setJarByClass(WordCount.class); //4
job.setMapperClass(Map.class); //5
job.setReducerClass(Reduce.class); //6
job.setCombinerClass(Reduce.class); //7
job.setMapOutputKeyClass(Text.class); //8
job.setMapOutputValueClass(LongWritable.class); //9
job.setOutputKeyClass(Text.class); //10
job.setOutputValueClass(LongWritable.class); //11
FileInputFormat.addInputPath(job,new Path(inputPath)); //12
FileOutputFormat.setOutputPath(job,new Path(outputPath)); //13
job.waitForCompletion(true); //14
}
}
1和2就是输入和输出路径,注意输出路径不能存在,否则抛出FileAlreadyExistsException(文件已经存在).为什么要这样,因为万一输入的路径是其他作业的执行结果,如果不抛出异常并中断程序,那么那个作业的结果就会被这个作业覆盖.
3是创建一个Job对象,这里省略了Configuration configuration = new Configuration(),而改为直接在方法中创建Configuration目的是传入配置信息.还可以使用newJob(),不过该构造方法已经被标记为弃用,所以最好使用静态方法newInstance().
4是设置Jar包的主类.作业是以Jar的形式上传至集群的,这里需要设置程序的入口类,即main()所在类.
5,6分别是设置Mapper类与Reducer类,这里传入自己编写的Map类和Reduce类就可以了.
7是设置Combiner类,一般设置为Reduce类,前面说过Combiner不是必须的,因为有些作业并不适用.
8,9是设置Mapper输出的key,value类型,应该与map的参数3,4一致.
10,11是设置Reducer输出的key,value类型,应该与reduce的参数3,4一致.
12是设置输入文件路径,需要传入作业的Job对象,以及一个Path对象,Path中设置了作业的输入文件路径,可以使用正则表达式匹配,这里使用了第1行的字符串.这里的Path与Java的File不同,不可混用.注意,输入路径可以设置多个.
13是设置作业结果的输出路径,同样需要传入Job对象与Path对象,这里使用了第2行的字符串.注意,输出路径只可以有一个.
14是提交作业的方法,也可以使用submit().
Job对象还有很多方法可以调用,这里由于不需要就没有设置,大家可以自己看看还有什么方法.
到了这里,一个完整的MapReduce程序就编写完毕了,下一步就是上传至集群并执行.下一篇就来看看作业是怎么提交的.