我们先来看一下新版本中的Mapper代码:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public class Context extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public Context(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN,VALUEIN> reader, RecordWriter<KEYOUT,VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) throws IOException, InterruptedException { super(conf, taskid, reader, writer, committer, reporter, split); } } /** * 在Map Task任务开始执行的时候首先会调用该方法,只执行一次 * 主要用于全局变量或重量级的操作的初始化,如集成HBase的时候,生成HTablePool * 如pool = new HTablePool(); * 开发者一般可以不override该方法 * Called once at the beginning of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * 开发者在该方法中来处理自己需要关注业务逻辑 * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value);//context.write()执行后开始map断的shuffle处理过程。 } /** * Called once at the end of the task. * 在Map Task任务执行结束的时候调用该方法,且只执行一次 * 该方法用于释放在setup()中初始化的一些重量级的资源 * 一般情况下,开发者可以不用override该方法 */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * 该方法由框架调用,对于初级的Hadoop开发者而言,可以不需要修改该方法,但是对于 * 资深的Hadoop开发者来说,可以重写该方法以达到完全精确控制整个Mapper的处理流程 * Expert users can override this method for more complete control over the * execution of the Mapper. * @param context * @throws IOException */ public void run(Context context) throws IOException, InterruptedException { //Map Task执行流程的第一步 setup(context); //第二步,循环调用map()方法来专注于开发者的业务逻辑处理 while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } //第三步,清除Task的上下文信息或释放全局的重量级的资源 cleanup(context); } }
我们再来看一看Mapper的处理流程:
在整个Map Task的处理流程中,由几点需要特别注意:
1)、Map处理的中间结果会以临时数据文件方式被保存在linux的本地文件系统上,而非HDFS文件系统上。
2)、Map Task处理过的数据会溢写超过内存缓冲区阀值的数据,经排序、spill、和合并操作,经所有的临时的中间数据文件合并成一个大文件和一个索引文件,具体过程详见 MapTask详解