和学习 Mapper组件详解一样,我们先来看看1.0新版本中Reduce代码:
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public class Context extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public Context(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter<KEYOUT,VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass ) throws IOException, InterruptedException { super(conf, taskid, input, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); } } /** * Called once at the start of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * This method is called once for each key. Most applications will define * their reduce class by overriding this method. The default implementation * is an identity function. */ @SuppressWarnings("unchecked") protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } } /** * Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Advanced application writers can use the * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to * control how the reduce task works. */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); } cleanup(context); } }
Reduce中的方法和Mapper中的方法基本一样,使用也擦不多。
对于MapReduce编程模型中的Mapper和Reduce两个组件来说,分别对应Map Task和Reduce Task。在一个MapReduce,用户可以控制Reduce Task作业的个数,也就是说,用户可以让一个作业中的reducer的个数为零,但是却不能手工的决定Map Task的任务数目,而只能同配置参数(详情见 InputFormat组件)来间接控制mapper的个数。
但是在真正的执行Reduce组件中的reduce()方法来处理用户关注的业务逻辑之间,会先执行reducer端的shuffle阶段,其流程图如下所示: