MapReduce源码解读之客户端作业提交、计算Map数量

1. 入口:WCDriver.java

/**
 * 配置mr框架运行的一些参数,比如hdfs数据文件的目录,reduce文件上传到hdfs的目录
 * 相当于yarn集群的客户端,需要在此封装mr框架的一部分参数,指定jar包,最后提交给yarn集群
 * @author Lee
 *
 */
public class wordcountdriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	Configuration conf = new Configuration();
//	conf.set("mapreduce.framework", "yarn");
//	conf.set("yarn.resourcemanager.hostname", "hadoop@mini1");
	Job job = Job.getInstance(conf);
	//指定本程序的jar包运行的路径
	job.setJarByClass(wordcountdriver.class);
	
	//指定job使用的业务类
	job.setMapperClass(wordcountmapper.class);
	job.setReducerClass(Wordcountreducer.class);
	
	//指定输入数据的类型
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(IntWritable.class);
	
	//指定最终输出的输出类型
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);
	
	
	job.setCombinerClass(wordcountcombiner.class);
	//指定job的原始文件输入目录
	FileInputFormat.setInputPaths(job, new Path(args[0]));
	
	//指定job的输出结果目录
	FileOutputFormat.setOutputPath(job, new Path(args[1]));
	
	//将job中配置的相关参数,以及job所用的java类所在的jar包提交给yarn去运行
	Boolean bool = job.waitForCompletion(true);
	System.exit(bool?0:1);
}
}

:本人使用的版本为Hadoop2.6.5,所以源码分析也是基于Hadoop2.6.5,如果对源码感兴趣的话可以点击这里下载源码

首先值得一提的是Client提交作业的过程是不涉及具体的任务计算的,只有MapTask和ReduceTask涉及计算过程,Client只是实现了计算向数据移动以及计算的并行度设置,上面的代码中几乎都是Job的参数设置,只有到job.waitForCompletion(true);这一行代码才开始提交作业。所以我们进入这个方法(点击ctrl+鼠标左键)。

2. 进入waitForCompletion()方法

public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
    
    //这个方法才是真正完成提交的方法,而且这个方法是异步调用的
      submit();
    }
    if (verbose) {
    
   //用户传进来的参数,如果为真,则监控并打印任务信息
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

3. 进入submit()方法

/**
这个方法完成任务的提交过程
**/
public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    
    <font color = 'red'>//MapReduce2.0版本使用新的API,所以会调用这个方法</font>
    setUseNewAPI();
    
    //这个方法实例化了一个cluster
    connect();
    
    //这里创建了一个提交者(submitter),它封装了文件系统和ResourceManager的客户端
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
      //这个方法是非常重要的方法,查看方法说明(ctrl+q)可以看到它完成了5件事,所以接下来会详细讲这个方法完成的工作
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

4. 进入submitJobInternal()方法

 /**
 //检查输入输出路径是否正确
 1. Checking the input and output specifications of the job.
 //计算输入数据的切片
 2. Computing the InputSplits for the job.
//为分布式计算设置缓存和内存进行加速(这个方法不是必掌握的)
 3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.
 //将jar包和配置文件等信息拷贝到HDFS目录中
 4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
//提交作业
 5. Submitting the job to the JobTracker and optionally monitoring it's status.
 **/


JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    
    //存放jar包和配置文件的路径
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
         
          int keyLen = CryptoUtils.isShuffleEncrypted(conf) 
              ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, 
                  MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
              : SHUFFLE_KEY_LENGTH;
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(keyLen);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }

		//将jar包和配置文件上传到HDFS
      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      
      //这个方法返回了map的数量,所以我们进入这个方法看如何得到map数量的
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      
  

5. 进入 writeSplits()方法

/**
这个方法完成了任务切片工作
*/
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    //这里判断我们是否使用了新的API
    if (jConf.getUseNewMapper()) {
    //我们使用的是新的API,所以会调用这个方法,进入这个方法
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

6. 进入writeNewSplits()方法

/**
这个方法完成对文件的切片工作
**/
 private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
      //这里取出Job的Configuration信息
    Configuration conf = job.getConfiguration();

	//这里通过反射来获取输入的格式化类,我们进入getInputFormatClass()方法
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

7.进入getInputFormatClass()方法

  /**
  这个方法由Job的父类JobContextImpl实现
  **/
  public Class<? extends InputFormat<?,?>> getInputFormatClass() 
     throws ClassNotFoundException {
    return (Class<? extends InputFormat<?,?>>) 
    // 这里可以看到,如果我们没有设置InputFormat类,默认会设置为TextInputFormat
      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
  }

8. 回到writeNewSplits()方法

/**
这个方法完成对文件的切片工作
**/
 private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
      //这里取出Job的Configuration信息
    Configuration conf = job.getConfiguration();

	//这里通过反射来获取输入的格式化类,默认为TextInputFormat
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
      
	//创建一个List来保存切片信息,这个方法用来切片,我们进入这个方法
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    

9. 进入getSplits()方法(最重要)

/** 
   * 生成文件列表并对它们进行切片,这个方法在FileInputFormat类中实现
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
    
    //获取一个由程序自己设置的一个minSize,这个值固定为1,然后获取用户设置的MinSpitSize,如果用户未设置,则默认值为1,二者相比较,取最大值
    //用户可使用Fileinputformat.setMinsplitsize()来设置minsize
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    
	//获取一个由用户设置的MaxSize,如果用户未设置,则默认为Long.MAX_VALUE
    long maxSize = getMaxSplitSize(job);

    // 生成切片信息
    //创建一个List用于存放切片
    List<InputSplit> splits = new ArrayList<InputSplit>();
    
	//创建一个List用于存放所有需要进行计算的文件,里面保存了文件的元数据信息
    List<FileStatus> files = listStatus(job);

	//循环遍历文件,取出文件的路径和大小
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
        //通过path来创建一个FlieSystem实例,因为job里已经封装了FS.defaultFS配置信息
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          
		//这里就得到了所有块的文件清单
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //拿到文件清单之后,就判断这个文件是否可以做切片,如果可以,就进行切片
        if (isSplitable(job, path)) {
        
        //根据当前处理的文件,将文件的块的大小取出来
          long blockSize = file.getBlockSize();
		
		//这个方法返回切片的大小方法体为Math.max(minSize, Math.min(maxSize, blockSize));
		//这意味着可以人为设定splitsize的大小,调大splitsize改minsize,调小splitsize改maxsize
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          
			//SPLIT_SLOP默认值为1.1
			//如果文件剩余大小/splitsize>1.1就继续切分,否则直接返回
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
                        
			//每次切分完,都要从文件大小中减去一个splitsize,因为文件被'切'掉了一部分(只是逻辑上的切分,并未真的进行物理切分)
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {

			//获取块的索引,第一次获取时,length=bytesRemaining,所以索引为0
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            
			//每循环一次,就向splits加入一个文件的位置信息,偏移量增加一个splitsize,获取到这个文件的所有副本所在的节点位置hosts(注意,是所有存储了这个文件的节点host,不是指单独的某一台)
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.elapsedMillis());
    }
    
    //至此,我们 已经得到了splits,splits的长度就是map任务的数量,下面我们回到writeNewSplits()方法
    return splits;
  }

10. 回到writeNewSplits()方法

/**
这个方法完成对文件的切片工作
**/
 private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
      //这里取出Job的Configuration信息
    Configuration conf = job.getConfiguration();

	//这里通过反射来获取输入的格式化类,默认为TextInputFormat
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
      
	//创建一个List来保存切片信息,这个方法用来切片,我们进入这个方法
    List<InputSplit> splits = input.getSplits(job);

	//这里将splits清单转成Array
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());

	//将文件上传路径,配置信息,FileSystem切片清单传入createSplitFiles()方法,这里会生成split的一些配置信息并写入job.split文件
	//然后我们回到submitJobInternal()方法
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

11. 最后回到submitJobInternal()方法

      //这个方法返回了map的数量,所以我们进入这个方法看如何得到map数量的
      int maps = writeSplits(job, submitJobDir);
	  //这里我们已经得到了map的数量,将其写入conf中
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

	  //接下来一大部分是配置信息,可略过

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // 将文件的配置信息写入到路径中
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());

	  //submitClient开始提交作业,剩下的工作由Application Master接手
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }

至此,MapReduce源码解读之客户端作业提交、计算Map数量部分的源码就分析完毕了。

发布了17 篇原创文章 · 获赞 0 · 访问量 345

猜你喜欢

转载自blog.csdn.net/qq_37163925/article/details/105616357