1、任务执行环境
Hadoop为map任务或reduce任务提供运行环境相关信息。例如,map任务可以知道它处理的文件的名称(参见输入分片与记录一节),map任务或reduce任务可以得知任务的尝试次数。下表中的属性可以从作业的配置信息中获得,在老版本的MapReduceAPI中通过为Mapper或Reducer提供一个configure()方法实现(其中,配置信息作为参数进行传递),便可获得这一信息。在新版本的API中,这些属性可以从传递给Mapper或Reducer的所有方法的相关对象中获取。
任务执行环境的属性
属性名称 | 类型 | 说明 | 范例 |
mapreduce.job.id | string | 作业ID | job_290811201130_0004 |
mapreduce.task.id | string | 任务ID | task_290811201130_0004_m_000003 |
mepreduce.task.attemp.id | string | 任务尝试ID | attempt_290811201130_0004_m_000003_0 |
mepreduce.task.partition | int | 作业中任务的索引 | 3 |
mapreduce.task.ismap | boolean | 此任务是否是map任务 | true |
Hadoop设置作业配置参数作为streaming程序的环境变量。但它用下划线来代替非字母数字的符号,以确保名称的合法性。下面这个Python表达式解释了如何用PythonStreaming脚本来检索mapreduce.job.id属性的值。
os.environ["mapreduce_job_id"]
也可以应用streaming启动程序的-cmdenv选项,来设置MapReduce所启动的streaming进程的环境变量(一次设置一个变量)。比如,下面的语句设置了MAGIC_PARAMETER环境变量:
-cmdenv MAGIC_PARAMETER=abracadabra
2、推测执行
MapReduce模型将作业分解成任务,然后并行地运行任务以使作业的整体执行时间少于各个任务顺序执行的时间。这使作业执行时间对运行缓慢的任务很敏感,因为只运行一个缓慢的任务会使整个作业所用的时间远远长于执行其他任务的时间。当一个作业由几百或几千个任务组成时,可能出现少数“拖后腿”的任务,这是很常见的。任务执行缓慢可能有多种原因,包括硬件老化或软件配置错误,但是,检测具体原因很困难,因为任务总能够成功完成,尽管比预计执行时间长。Hadoop不会尝试诊断或修复执行慢的任务,相反,在一个任务运行比预期慢的时候,它会尽量检测,并启动另一个相同的任务作为备份。这就是所谓的任务的“推测执行”(speculative execution)。
必须认识到一点:如果同时启动两个重复的任务,它们会互相竞争,导致推测执行无法工作。这对集群资源是一种浪费。相反,调度器跟踪作业中所有相同类型(map和reduce)任务的进度,并且仅仅启动运行速度明显低于平均水平的那一小部分任务的推测副本。一个任务成功完成后,任何正在运行的重复任务都将被中止,因为已经不再需要它们了。因此,如果原任务在推测任务前完成,推测任务就会被终止;同样,如果推测任务先完成,那么原任务就会被中止。
推测执行是一种优化措施,它并不能使作业的运行更可靠。如果有一些软件缺陷会造成任务挂起或运行速度减慢,依靠推测执行来避免这些问题显然是不明智的,并且不能可靠地运行,因为相同的软件缺陷可能会影响推测式任务。应该修复软件缺陷,使任务不会挂起或运行速度减慢。
在默认情况下,推测执行是启用的。可以基于集群或基于每个作业,单独为map任务和reduce任务启用或禁用该功能。相关的属性如下表所示。
推测执行的属性
属性名称 | 类型 | 默认值 | 描述 |
mapreduce.map.speculative | boolean | true | 如果任务运行变慢,该属性决定着是否要启动map任务的另外一个实例 |
mapreduce.reduce.speculative | boolean | true | 如果任务运行变慢,该属性决定着是否要启动reduce任务的另一个实例 |
yarn.app.mapreduce.am.job.speculator.class | Class | org.apache.hadoop.map.reduce.v2.app.speculate.DefaultSpeculator | Speculator类实现推测执行策略(只针对,MapReduce2) |
yarn.app.mapreduce.am.job.estimator.class | Class | org.apache.hadoop.map.reduce.v2.app.speculate.LegacyTaskRuntiemEstimator | speculator实例使用的TaskRuntimeEstimator的实现,提供任务运行时间的估计值(只针对MapReduce2) |
对于reduce任务,关闭推测执行是有益的,因为任意重复的reduce任务都必须将取得map输出作为最先的任务,这可能会大幅度地增加集群上的网络传输。
关闭推测执行的另一种情况是为了非幂等(nonidempotent)任务。然而在很多情况下,将任务写成幂等的并使用0utputCamlitter来提升任务成功时输出到最后位置的速度,这是可行的。详情将在下一节介绍。
3、关于OutputCommitters
HadoopMapReduce使用一个提交协议来确保作业和任务都完全成功或失败。这个行为通过对作业使用0utputCommitter来实现,在老版本MapReduceAPI中通过调用JobConf的set0utputCmmitter()或配置中的mapred.output.committer.class来设置。在新版本的MapReduceAPI中,OutputCommitter由OutputCommitter通过它的getOutputCommitter()方法确定。默认值为FileOutputCommitter,这对基于文件的MapReduce是适合的。可以定制已有的OutputCommitter,或者在需要时还可以写一个新的实现以完成对作业或任务的特别设置或清理。
OutputCommitter的API如下所示(在新旧版本中的MapReduceAPI中):
@Public
@Stable
public abstract class OutputCommitter extends org.apache.hadoop.mapreduce.OutputCommitter {
public OutputCommitter() {
}
public abstract void setupJob(JobContext var1) throws IOException;
/** @deprecated */
@Deprecated
public void cleanupJob(JobContext jobContext) throws IOException {
}
public void commitJob(JobContext jobContext) throws IOException {
this.cleanupJob(jobContext);
}
public void abortJob(JobContext jobContext, int status) throws IOException {
this.cleanupJob(jobContext);
}
public abstract void setupTask(TaskAttemptContext var1) throws IOException;
public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;
public abstract void commitTask(TaskAttemptContext var1) throws IOException;
public abstract void abortTask(TaskAttemptContext var1) throws IOException;
/** @deprecated */
@Deprecated
public boolean isRecoverySupported() {
return false;
}
public boolean isRecoverySupported(JobContext jobContext) throws IOException {
return this.isRecoverySupported();
}
public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException {
return false;
}
public boolean isCommitJobRepeatable(org.apache.hadoop.mapreduce.JobContext jobContext) throws IOException {
return this.isCommitJobRepeatable((JobContext)jobContext);
}
public void recoverTask(TaskAttemptContext taskContext) throws IOException {
}
public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext) throws IOException {
this.setupJob((JobContext)jobContext);
}
/** @deprecated */
@Deprecated
public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context) throws IOException {
this.cleanupJob((JobContext)context);
}
public final void commitJob(org.apache.hadoop.mapreduce.JobContext context) throws IOException {
this.commitJob((JobContext)context);
}
public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, State runState) throws IOException {
int state = JobStatus.getOldNewJobRunState(runState);
if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
throw new IOException("Invalid job run state : " + runState.name());
} else {
this.abortJob((JobContext)context, state);
}
}
public final void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) throws IOException {
this.setupTask((TaskAttemptContext)taskContext);
}
public final boolean needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) throws IOException {
return this.needsTaskCommit((TaskAttemptContext)taskContext);
}
public final void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) throws IOException {
this.commitTask((TaskAttemptContext)taskContext);
}
public final void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) throws IOException {
this.abortTask((TaskAttemptContext)taskContext);
}
public final void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) throws IOException {
this.recoverTask((TaskAttemptContext)taskContext);
}
public final boolean isRecoverySupported(org.apache.hadoop.mapreduce.JobContext context) throws IOException {
return this.isRecoverySupported((JobContext)context);
}
}
setupJob()方法在作业运行前调用,通常用来执行初始化操作。当OutputCommitter设为FileOutputCommitter时,该方法创建最终的输出目录${mapreduce.output.fileoutputformat.outputdir},并且为任务输出创建一个临时的工作空间,_temporary,作为最终输出目录的子目录。
如果作业成功,就调用commitJob()方法,在默认的基于文件的实现中,它用于删除临时的工作空间并在输出目录中创建一个名为_SUCCESS的隐藏的标志文件,以此告知文件系统的客户端该作业成功完成了。如果作业不成功,就通过状态对象调用abortJob()方法,意味着该作业是否失败或终止(例如由用户终止)。在默认的实现中,将删除作业的临时工作空间。
在任务级别上的操作与此类似。在任务执行之前先调用setupTask()方法,默认的实现不做任何事情,因为针对任务输出命名的临时目录在写任务输出的时候被创建。
任务的提交阶段是可选的,并通过从needsTaskCommit()返回的false值关闭它。这使得执行框架不必为任务运行分布提交协议,也不需要commitTask()或者abortTask()。当一个任务没有写任何输出时,OutputCommitter将跳过提交阶段。
如果任务成功,就调用commitTask(),在默认的实现中它将临时的任务输出目录(它的名字中有任务尝试的ID,以此避免任务尝试间的冲突)移动到最后自拙路径${mapreduce.output.fileoutputformat.outputdir}。否则,执行框架调用abortTask(),它负责删除临时的任务输出目录。
执行框架保证特定任务在有多次任务尝试的情况下,只有一个任务会被提交,其他的则被取消。这种情况是可能出现的,因为第一次尝试出于某个原因而失败(这种情况下将被取消),提交的是稍后成功的尝试。另一种情况是如果两个任务尝试作为推测副本同时运行,则提交先完成的,而另一个被取消。
任务附属文件
对于map任务和reduce任务的输出,常用的写方式是通过OutputCommitter来收集键-值对。有一些应用需要比单个键-值对模式更灵活的方式,因此直接将map或reduce任务的输出文件写到分布式文件系统中,如HDFS。还有其他方法用于产生多个输出,详情参见8.3节(输出格式)。注意,要确保同一个任务的多个实例不向同一个文件进行写操作。如前所述,OutputCommitter协议解决了该问题。如果应用程序将附属文件导人其任务的工作目录中,那么成功完成的这些任务就会将其附属文件自动推送到输出目录,而失败的任务,其附属文件则被删除。
任务通过从作业配置文件中查询mapreduce.task.output.dir属性值找到其工作目录。另一种方法,使用Java API的MapReduce程序可以调用File0utputFornat上的getWorkOutputPath()静态方法获取描述工作目录的Path对象。执行框架在执行任务之前首先创建工作目录,因此不需要我们创建。
举一个简单的例子,假设有一个程序用来转换图像文件的格式。一种实现方法是用一个只有map任务的作业,其中每个map指定一组要转换的图像(可以使用NlineInputFormat,详情参见8.2.2文本输入节)。如果map任务把转换后的图像写到其工作目录,那么在任务成功完成之后,这些图像会被传到输出目录。