JobInProgress在正式为TaskTracker节点分配一个任务之前,它都会先检查自己是否应该给该计算节点分配自己的计算任务。当这个计算节点满足要求时,JobInProgress才会为它分配任务;否则,直接返回。而这个判断的标准主要依据以下两点:
1.TaskTracker节点所在的Host上的所有计算节点执行该作业的任务实例失败的次数;
2.JobInProgress黑名单中的TaskTracker数量;
这里先来解释一下“黑名单”,它是指当一个Host上的TaskTracker节点执行该作业的任务发生失败的次数超过一定阈值时,这个Host和它上面的所有TaskTracker节点就会进入这个作业对应的JobInProgress中的黑名单。不难看出,每一个作业的JobInProgress都有一份自己独有的黑名单。那么,上面的这两条判断标准就很容易理解了,第一点实际上就是判断这个TaskTracker节点在不在黑名单中,第二点是如果上了JobInProgress黑名单的TaskTracker数量过多的话就应该忽略该TaskTracker节点是否在黑名单中都应该可以给它分配任务,因为如果不这样做的话就有可能造成其它的计算节点负载过重以及增大该作业的响应延时,同时上了黑名单的TaskTracker节点执行该作业的任务失败的概率比较的大,但不一定会失败。判断的具体细节如下:
private static final double CLUSTER_BLACKLIST_PERCENT = 0.25; private boolean shouldRunOnTaskTracker(String taskTracker) { int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker); if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) { if (LOG.isDebugEnabled()) { String flakyTracker = convertTrackerNameToHostName(taskTracker); } return false; } return true; }
关于Host及其上的TaskTracker节点上作业黑名单的阈值是由用户在提交作业时设置的,用户对该参数的设置既可以通过JobConf的setMaxTaskFailuresPerTracker()方法也可以作业配置文件中的mapred.max.tracker.failures来设置。这里可能有人会问,JobInProgress究竟是如何知道一个TaskTracker节点执行它的一个任务发生了失败呢?
当一个TaskTracker节点向JobTracker节点发送心跳包的时候,它会向JobTracker节点报告自己正在执行的各个任务状态,JobTracker节点在收到任务状态报告之后就会将它们通知给对应的JobInProgress,之后JobInProgress就知道自己的那些任务实例被该TaskTracker节点执行失败了。另外,JobInProgress还有一个途径可以知道那些计算节点执行它的任务实例失败了。前面讲过,JobTracker节点后台运行着一个检测线程,用来检测那些TaskTracker节点失效了,当它发现一个TaskTracker节点失效时,则该TaskTracker节点当前未完成的任务实例都被看做是执行失败,所以也就会通知这些任务实例所属的JobInProgress关于其任务实例执行失败的情况。JobInProgress对Host及计算节点上黑名单处理细节如下:
void addTrackerTaskFailure(String trackerName) { if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) { //解析该TaskTracker节点所在的主机名(IP地址) String trackerHostName = convertTrackerNameToHostName(trackerName); //该主机上的TaskTracker节点执行该作业任务实例已失败的次数 Integer trackerFailures = trackerToFailuresMap.get(trackerHostName); if (trackerFailures == null) { trackerFailures = 0; } trackerToFailuresMap.put(trackerHostName, ++trackerFailures); //该Taskracker节点能否上黑名单 if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) { ++flakyTaskTrackers; LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'"); } } }
从上面的源代码可以看出,一个JobInProgress黑名单中的TaskTracker数量是有限制的,原因前面也说过,不可能让集群中所有的TaskTracker节点上它的黑名单,否则最后谁来执行它呢! 对于计算节点的检测,除了后台检测线程可以发现外,还有一个地方也可以检测到,就是当一个TaskTracker节点向JobTracker发送心跳包时,如果该节点是刚重启或者是重新初始化,同时JobTracker上又有该节点的记录,那么JobTracker就认为该节点以前的一个版本已经失效了。
除了每一个JobInProgress有自己的黑名单之外,JobTracker节点也有一份黑名单,也就是集群黑名单,它们的相同之处在于这份黑名单都是以Host为单位的,即如果一个Host上了黑名单,那么它上面的所有TaskTracker节点也就随之上了黑名单了。如果一个TaskTracker节点上了集群的黑名单,则它不会被分配任何作业的任务,而如果它上了某一个JobInProgress的黑名单,顶多只是该JobInProgress不会给它分配任务而已(前提是该黑名单不大)。因此,对于那些上了集群黑名的TaskTracker节点,它们的计算能力也就不能算作整个集群的计算能力了,这也使得它不得不限制集群黑名单的大小,否则最后就没有计算节点来完成用户提交的作业了。同样的问题,集群中的黑名单是哪儿来的呢?
首先,集群中的黑名单只来源于JobInProgress中的黑名单,而且该JobInProgress必须最终被成功执行,这个原因其实很有意思。如果一个作业最终没有被成功完成的话,那么就不是集群中的计算节点的问题,而很有可能是这个作业本身的问题,也就不能代表该JobInProgress黑名单中的TaskTracker节点丧失了一部分或全部的计算能力。这个集群的黑名单是由FaultyTrackersInfo对象类管理的,它处理一个Host上黑名单的操作很简单:
private int MAX_BLACKLISTS_PER_TRACKER = 4; private double AVERAGE_BLACKLIST_THRESHOLD = 0.50; private static double MAX_BLACKLIST_PERCENT = 0.50; /*判断一个主机是否应该上集群的黑名单*/ private boolean shouldBlacklist(String hostName, int numFaults) { if (numFaults >= MAX_BLACKLISTS_PER_TRACKER) { // calculate avgBlackLists long clusterSize = getClusterStatus().getTaskTrackers(); long sum = 0; for (FaultInfo f : potentiallyFaultyTrackers.values()) { sum += f.getFaultCount(); } double avg = (double) sum / clusterSize; long totalCluster = clusterSize + numBlacklistedTrackers; if ((numFaults - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) && numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) { return true; } } return false; } /*更新集群的计算能力及黑名单中的TaskTracker数量*/ private void removeHostCapacity(String hostName) { synchronized (taskTrackers) { // remove the capacity of trackers on this host for (TaskTrackerStatus status : getStatusesOnHost(hostName)) { totalMapTaskCapacity -= status.getMaxMapTasks(); totalReduceTaskCapacity -= status.getMaxReduceTasks(); } numBlacklistedTrackers += uniqueHostsMap.remove(hostName); } } void incrementFaults(String hostName) { synchronized (potentiallyFaultyTrackers) { FaultInfo fi = potentiallyFaultyTrackers.get(hostName); if (fi == null) { fi = new FaultInfo(); potentiallyFaultyTrackers.put(hostName, fi); } int numFaults = fi.getFaultCount(); ++numFaults; fi.setFaultCount(numFaults); fi.setLastUpdated(System.currentTimeMillis()); if (!fi.isBlacklisted()) { if (shouldBlacklist(hostName, numFaults)) { removeHostCapacity(hostName); fi.setBlacklist(true); } } } }
在上面的代码中,有三个参数决定了一个Host是否能够上黑名单,其中有两个参数同时还可以通过JobTracker的配置文件来设置:
MAX_BLACKLISTS_PER_TRACKER = conf.getInt("mapred.max.tracker.blacklists", 4); AVERAGE_BLACKLIST_THRESHOLD = conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f);
当一个Host及其上的TaskTracker节点上了集群的黑名单之后,就很难从该黑名单中消除,除非重启该Host上的计算节点,请注意这里说的是重启而不是重新初始化TaskTracker节点。当JobTracker节点发现一个刚重启并且是第一次发送心跳包时,它会首先从黑名单上清除该几点所在的Host及其上的所有TaskTracker节点,并将这些TaskTracker节点的计算能力加入到集群中,这个过程如下:
private void addHostCapacity(String hostName) { synchronized (taskTrackers) { int numTrackersOnHost = 0; // add the capacity of trackers on the host for (TaskTrackerStatus status : getStatusesOnHost(hostName)) { totalMapTaskCapacity += status.getMaxMapTasks(); totalReduceTaskCapacity += status.getMaxReduceTasks(); numTrackersOnHost++; } uniqueHostsMap.put(hostName, numTrackersOnHost); numBlacklistedTrackers -= numTrackersOnHost; } void markTrackerHealthy1(String hostName) { synchronized (potentiallyFaultyTrackers) { FaultInfo fi = potentiallyFaultyTrackers.remove(hostName); if (fi != null && fi.isBlacklisted()) { addHostCapacity(hostName); } } }