在MapReduce中,OutputFormat主要用于描述经reducer处理后输出的数据的格式,它会将用户提供的key/value键值对而写入到特定格式的文件中。
我们先看一下在1.0老版本中的OutputFormat接口的代码:
public interface OutputFormat<K, V> { /** * 返回一个RercordWriter类的对象,并有该对象接收用户的key/value数据对儿,然后 * 按指定的格式将数据写入文件 * * @param ignored * @param job configuration for the job whose output is being written. * @param name the unique name for this part of the output. * @param progress mechanism for reporting progress while writing to file. * @return a {@link RecordWriter} to write the output for the job. * @throws IOException */ RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress)throws IOException; /** * 该方法一般用在用户作业被提交到JobTracker之前,由JobClient自动调用该接口来检查 * 数据目录是否合法 * * <p>This is to validate the output specification for the job when it is * a job is submitted. Typically checks that it does not already exist, * throwing an exception when it already exists, so that output is not * overwritten.</p> * * @param ignored * @param job job configuration. * @throws IOException when output should not be attempted */ void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException; }
Hadoop自带了很多的OutputFormat的实现,它们与InputFormat实现县对应。这里我们就着重学习所有基于文件的OutputFormat的实现基类FileOutputFormat类。
FileOutputFormat有两个最主要的功能:
1)、实现checkOutputSpecs()方法,该方法是在作业运行之前被调用的,默认会检查用户的输出目录是否存在,如果存在则爆出异常,防止该输出目录中已有数据被覆盖。
2)、处理side-effect file,该文件并不是任务的最在中输出文件,而是具有特殊用途的专属文件,其典型应用是执行推测式任务。
在Hadoop中,有硬件或软件再或数据的原因,一个作业的某些子任务可能会慢于其他同类型的子任务,这些“慢子任务”明显会拖慢整个作业的执行进度,因此hadoop对这些慢任务实行推测式执行,即在另外一个节点上启动一个和慢子任务相同的任务——该任务就是推测式任务,最先完成的任务的计算结果后就被认为是这个数据块最终的处理结果,然后系统会杀死还没执行完的慢子任务。
为了防止两个相同的子任务同时往一个输出文件中写入数据而发生冲突,FileOutputFormat会为每一个Reduce Task的输出数据创建一个side-effect file文件,并将产生的数据临时写入该文件,然后带Reduce Task完成后,在将之移动到最终的输出目录。默认情况下,当MapReduce作业完成后,会在最终的结果目录${mapred.out.dir}下生成一个空的_SUCCESS文件,该文件主要作为一个作业完成的标志。如果使用Oozie,那么Oozie就是通过最终结果目录下的_SUCCESS文件的存在来判断作业是否执行完成的。
上述过程都是由一个叫OutputCommiter的接口实现的,Hadoop提供了默认的该接口的实现类FileOutputCommiter,当然开发者也可以根据需求来编写自己的OutputCommiter实现类,并通过${mapred.output.commiter.class}指定,下面是接口中的方法及FileOutputCommiter中的实现:
方法 | 何时使用 | FileOutputCommiter中的实现 |
setupJob | 作业初始化 | 创建临时目录${mapred.out.dir}_temorary |
commitJob | 作业成功完成 | 删除临时目录,并在${mapred.out.dir}下创建空_SUCCESS文件 |
abortJob | 作业运行失败 | 删除临时目录 |
setupTask | 任务初始化 | 不进行任何操作 |
needsTaskCommit | 判断是否要提交结果 | 只要存在side-effect file,就返回true |
commitTask | 任务运行完成 | 提交结果,将side-effect file移到${mapred.out.dir}目录下 |
abortTask | 任务允许失败 | 删除任务的side-effect file |
对于1.0的新版本,除了将接口变成抽象类外,还添加了一个getOutputCommitter()方法:代码如下:
public abstract class OutputFormat<K, V> { /** * 返回一个RercordWriter类的对象,并有该对象接收用户的key/value数据对儿,然后 * 按指定的格式将数据写入文件 * * @param context the information about the current task. * @return a {@link RecordWriter} to write the output for the job. * @throws IOException */ public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context ) throws IOException, InterruptedException; /** * 作业提交前由JobClient调用来检查输出目录是否存在,存在就报异常放在数据覆盖 * * <p>This is to validate the output specification for the job when it is * a job is submitted. Typically checks that it does not already exist, * throwing an exception when it already exists, so that output is not * overwritten.</p> * * @param context information about the job * @throws IOException when output should not be attempted */ public abstract void checkOutputSpecs(JobContext context ) throws IOException, InterruptedException; /** * Get the output committer for this output format. This is responsible * for ensuring the output is committed correctly. * @param context the task context * @return an output committer * @throws IOException * @throws InterruptedException */ public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context ) throws IOException, InterruptedException; }