三-中下-0, MapReduce - Job提交和切片流程源码详解

3.1.2 Job提交流程源码和切片源码详解

3.1.2.0 Job提交(包括切片)流程思维导图

在这里插入图片描述

3.1.2.1 Job提交流程源码详解

1. waitForCompletion()
  1. 我们从Driver类, job.waitForCompletion处打断点进入方法内.
    public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
    
    
      判断状态是否为`DEFINE`
      //JobState.DEFINE 相当于还没有被占用,
      //在后面job提交过程中,它会将这个状态置为JobState.RUNNING(正在运行
        if (this.state == Job.JobState.DEFINE) {
    
    
            this.submit();
        }

        if (verbose) {
    
    
            this.monitorAndPrintJob(); // verbose为true, 就会在控制台打印具体的job相关信息
        } else {
    
    
         .......
        }
        return this.isSuccessful();
    }
2. submit()
  1. this.submit()处打断点, 进入到submit()内部
    public void submit() throws IOException, InterruptedException, ClassNotFoundException {
    
    
        this.ensureState(Job.JobState.DEFINE); //再次确认Job此时的状态
        this.setUseNewAPI();// 解决新旧API的兼容性
        this.connect(); //此处继续打断点, 等待进入
        final JobSubmitter submitter = this.getJobSubmitter(this.cluster.getFileSystem(), this.cluster.getClient());
        this.status = (JobStatus)this.ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
    
    
            public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
    
    
                return submitter.submitJobInternal(Job.this, Job.this.cluster);
            }
        });
        this.state = Job.JobState.RUNNING;
        LOG.info("The url to track the job: " + this.getTrackingURL());
    }
3. connect()
  1. 在submit()中继续往下执行, 跳过ensureState(), setUseNewAPI()进入到connect()内部;
  • connect() 方法用于网络连接,根据运行环境的不同创建不同的对象,本地创建LocalJobRunner, 集群上创建的是yarnRunner 。
  • connect() 方法首先判断,集群是否已经创建, 如果集群为空就创建一个集群
    private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException {
    
    
        if (this.cluster == null) {
    
    
            this.cluster = (Cluster)this.ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
    
    
                public Cluster run() throws IOException, InterruptedException, ClassNotFoundException {
    
    
                    return new Cluster(Job.this.getConfiguration()); //断点, 集群的初始化工作
                }
            });
        }

    }
  1. 在connect方法的return new Cluster(Job.this.getConfiguration());处打上断点, 并按住ctrl点击Cluster进入到Cluster类,
  • 集群的创建和初始化
    public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
    
    
        this.fs = null;
        this.sysDir = null;
        this.stagingAreaDir = null;
        this.jobHistoryDir = null;
        this.providerList = null;
        this.conf = conf;
        this.ugi = UserGroupInformation.getCurrentUser();
        this.initialize(jobTrackAddr, conf); //继续打断点
    }
  • this.initialize(jobTrackAddr, conf);上再打上一个断点.
4. initialize()
  1. 往下跳过执行直到来到initialize方法上, 该方法的唯一任务是确定客户端通信协议提供者,并通过该对象的create方法构造客户端通信协议对象实例client。核心目的也就是要判断当前客户端是本地客户端还是Yarn客户端


  • initialize的方法内部:
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
    
    
        this.initProviderList();
        IOException initEx = new IOException("Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.");
        if (jobTrackAddr != null) {
    
    
            LOG.info("Initializing cluster for Job Tracker=" + jobTrackAddr.toString());
        }

        Iterator var4 = this.providerList.iterator();

        while(var4.hasNext()) {
    
    
            ClientProtocolProvider provider = (ClientProtocolProvider)var4.next();
            LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName());
            ClientProtocol clientProtocol = null;

            try {
    
    
                if (jobTrackAddr == null) {
    
    
                    clientProtocol = provider.create(conf);
                } else {
    
    
                    clientProtocol = provider.create(jobTrackAddr, conf);
                }

                if (clientProtocol != null) {
    
    
                    this.clientProtocolProvider = provider;
                    this.client = clientProtocol;
                    LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider");
                    break;
                }

                LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol");
            } catch (Exception var9) {
    
    
                String errMsg = "Failed to use " + provider.getClass().getName() + " due to error: ";
                initEx.addSuppressed(new IOException(errMsg, var9));
                LOG.info(errMsg, var9);
            }
        }

        if (null == this.clientProtocolProvider || null == this.client) {
    
    
            throw initEx;
        }
    }

    ClientProtocol getClient() {
    
    
        return this.client;
    }

    Configuration getConf() {
    
    
        return this.conf;
    }

    public synchronized void close() throws IOException {
    
    
        this.clientProtocolProvider.close(this.client);
    }

  • 了解了上面的开胃小菜(客户端与集群的连接过程), 我们正式进入Job提交过程的源码
5. submitJobInternal()
  1. 从第5步不断的跳过直到回到submit()方法内, 在submitJobInternal()上打断点, 如下图所示:

  1. 进入submitJobInternal方法体内我们继续在下图两处打上断点:

6. checkSpecs()
  1. 首先我们进入checkSpecs()方法, 此方法的作用是用来检查输出路径;
  • 打断点查看output.checkOutputSpecs(job);

  • checkOutputSpecs(job)的方法体

7. submitJobInternal, JobId的生成.
  1. 检查输出目录合法性之后, 我们回到submitJobInternal继续往下执行, 具体工作如下图所示:

注意: 此时还未在主机中真正的创建以JobId为名称的目录!!!

8. copyAndConfigureFiles
  1. submitJobInternal()中继续往下跳过执行, 直到看到copyAndConfigureFiles,打断点如下所示:

  2. 从第九步, 先进入copyAndConfigureFiles, 然后在uploadResources(job, jobSubmitDir)这一行继续打断点并进入, 最后进入uploadResourcesInternal(Job job, Path submitJobDir)方法.


  • uploadResourcesInternal(Job job, Path submitJobDir)方法中完成了对job临时工作目录的创建工作.
  • 同时在此方法的末尾, 完成了把相关的文件提交到集群.(本地集群不会上传jar包)
7. submitJobInternal, 切片的生成,配置文件的生成
  1. copyAndConfigureFiles()中, 我们完成了对job临时工作目录的创建, 并提交了一些必要的文件到集群中.回到submitJobInternal继续往下执行, writeSplits()

  2. 继续往下执行, writeConf()

  3. 返回到submit()

  4. 从submit()中退出, 兜兜转转, 又回到了最初的起点waitForCompletion(),

  • 返回成功信息, 并清空临时的job工作目录

总结: Job提交过程 主要就是将Jar包(非本地集群)\切片文件\XML配置文件提交到MapReduce运行的服务器,之后监控Job运行状态,返回运行信息

3.1.2.2 切片源码详解(以FileInutFormat为例)

1. submitJobInternal(job, cluster)
  1. submitJobInternal(job, cluster)方法中, 我们打断点如下图所示:

2. writeSplits(job, submitDir)
  1. 进入到writeSplits(job, submitDir)内部, 打断点如图所示:

3. writeNewSplits(job, jobSubmitDir) -> getSplits()
  1. 进入到writeNewSplits(job, jobSubmitDir)), 继续在getSplits()上打断点, 并进入到FileInputFormat类的getSplits()方法内部, 此方法主要是完成对各个输入文件的切片操作, 这个实现类的切片规则详见本文的末尾.
public List<InputSplit> getSplits(JobContext job) throws IOException {
    
    
        StopWatch sw = (new StopWatch()).start();
        long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));//切片最小尺寸
        long maxSize = getMaxSplitSize(job); //切片最大尺寸
        List<InputSplit> splits = new ArrayList();
        List<FileStatus> files = this.listStatus(job);
        boolean ignoreDirs = !getInputDirRecursive(job) && job.getConfiguration().getBoolean("mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs", false);
        Iterator var10 = files.iterator(); 
        //对多个文件分别进行遍历切片处理, 一个文件就是迭代器的一次迭代遍历, 
        //每个文件单独执行切片过程
        while(true) {
    
    
            while(true) {
    
    
                while(true) {
    
    
                    FileStatus file;
                    do {
    
    
                        if (!var10.hasNext()) {
    
    
                            job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
                            sw.stop();
                            if (LOG.isDebugEnabled()) {
    
    
                                LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
                            }

                            return splits;
                        }

                        file = (FileStatus)var10.next();
                    } while(ignoreDirs && file.isDirectory());

                    Path path = file.getPath();
                    long length = file.getLen();
                    if (length != 0L) {
    
    
                        BlockLocation[] blkLocations;
                        if (file instanceof LocatedFileStatus) {
    
    
                            blkLocations = ((LocatedFileStatus)file).getBlockLocations();
                        } else {
    
    
                            FileSystem fs = path.getFileSystem(job.getConfiguration());
                            blkLocations = fs.getFileBlockLocations(file, 0L, length);
                        }
                        //判断文件是否是可以切割的
                        if (this.isSplitable(job, path)) {
    
    
                            //获取块大小(因为我们此时采用的是本地集群, 资源不是很充足, 所以块大小默认是32MB)(33554432-32MB)
                            long blockSize = file.getBlockSize();
                            //获取切片大小, 也是32MB,因为块大小=切片大小
                            long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

                            long bytesRemaining;//每次切片后, 剩余文件块的大小
                            int blkIndex;
/重点: 一个文件32.1MB, 切片大小32MB, 是否应该是切片为一片32MB, 一片0.1MB呢? 
//解决办法如下面代码所示: 剩余的文件块大小(bytesRemaining)除以切片大小(splitSize), 
//如果>1.1的话, 就继续切片, 否则不切了
                            for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
    
    
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }

                            if (bytesRemaining != 0L) {
    
    
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }
                        } else {
    
    
                            if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) {
    
    
                                LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
                            }

                            splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
                        }
                    } else {
    
    
                        splits.add(this.makeSplit(path, 0L, length, new String[0]));
                    }
                }
            }
        }
    }
4. writeNewSplits(job, jobSubmitDir) --> createSplits(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
  • 切片操作完成后, 方法退回到writeNewSplits()并继续向下执行

  • 执行到断点后, 可以发现, 切片文件已经在临时工作目录中建立了

切片重点,总结

  1. 每一个文件都是单独进行切片的;
  1. 切片大小取决于Math.max(minSize, Math.min(maxSize, blockSize))
  1. 判断是否还继续进行切片的标准就是: 每次切片后剩余文件大小/切片大小的值是否 > 1.1

3.1.3 FileInputFormat类的切片过程

在这里插入图片描述

猜你喜欢

转载自blog.csdn.net/nmsLLCSDN/article/details/118659739