MapReducer全流程

一、MR的宏观流程

1.两个阶段 Map阶段和Reduce阶段
2.一个MapReduce任务为一个Job,一个Job在执行不同的阶段时,启动若干Task
Map阶段启动的进程称为MapTask
MapTask启动的数量取决于切片数,切N片,启动N个MapTask
Reduce阶段启动的进程称为ReduceTask
ReduceTask启动的进程数量由开发人员自己设置
Job.setNumReduceTask(int n);

3.在学习MR时,在Map和Reduce之间,有一个讲Map输出的数据进行分区和排序和传输的过程
这个过程很重要,因此将这个阶段,单独命名为shuffle!

在两个阶段的划分的基础上,再细分为 Map----------shuffle------------Reduce

shuffle阶段不会启动单独的进程来完成,shuffle横跨了MapTask和ReduceTask!

4.官方的阶段划分
Map阶段: map,sort
Reduce阶段: copy,sort,reduce

	map(map阶段)---sort|copy|sort(shuffle阶段)---reduce(reduce阶段)

二、MapReduce有两种运行模式

1.local(本地模式): 使用LocalJobRunner提交时,Job就在本地运行!
在本地以多线程模拟MapTask和ReduceTask

2.YARN(在YARN上运行): 使用YARNRunner提交时,此时Job在运行之前,会初始化
MRAppMaster进程,由这个进程向RM申请运行所有Task的资源!

					RM将申请分配给NM,NM提供的资源会封装到Container中,此时启动Task进程!

三、MR运行的宏观流程

1.提交Job的阶段

​ a)job.waitForCompletion()
​ b)确定job的状态不是running,构建Cluster(代表集群)
​ c)确定Job的运行模式,如果是本地模型,使用LocalJobRunner提交运行
​ 否则使用YARNRunner提交运行

	b)使用提交器,提交Job

2.Job在提交之前的预处理阶段 submitJobInternal()

​ a)获取输出格式,调用输出格式的checkOutputSpecs()检查输出目录
​ 不能为空,也不能已经存在
​ b)确定当前Job存放临时文件的目录submitJobDir
​ 如果是本地文件系统,默认在idea所在的磁盘的tmp目录下
​ 如果是HDFS,默认在hdfs的/tmp下
​ c) int maps = writeSplits(job, submitJobDir);
​ 切片,生成切片信息,设置MapTask的数量
​ d) 在Job的作业目录上生成Job的配置信息文件
​ job.xml: 记录了Job所有的配置,这些配置来自于xxx-default.xml和xxx-site.xml
​ job.split:切片对象
​ job.splitmetainfo:切片对象的属性说明

如果此方法结束,相当于在YARN上运行时,此时准备申请MRAppMaster进程!

3.构建可以运行的Job

​ LocalJobRunner.Job: 功能类似于MRAppMaster,负责整个Job的运行的申请,提交等操作!

​ a)new Job()

​ b)执行Job.start() 相当于启动了MRAppMaster

​ c) 进入Job.run() 相当于开始让MRAppMaster干活

4.开始运行Map阶段

​ MRAppMaster是Job的管理者,一个Job启动一个MRAppMaster,负责JobMap和Reduce阶段Task的资源申请,运行,监控等。

​ LocalJobRunner.Job.run()

public void run() {
      
      
      try {
          //根据之前生成的job.split和job.splitinfo文件创建TaskSplitMetaInfo[]
          // 之前切几片,TaskSplitMetaInfo[]中就有几个切片的元数据对象
        TaskSplitMetaInfo[] taskSplitMetaInfos = 
          SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);

       
		// Map用来保存所有MapTask生成的结果文件的影响信息
        Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
            Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
        
          // 根据有几个切片,就创建几个MapTask的线程
        List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
            taskSplitMetaInfos, jobId, mapOutputFiles);
              
        initCounters(mapRunnables.size(), numReduceTasks);
        ExecutorService mapService = createMapExecutor();
         // 运行MapTask
        runTasks(mapRunnables, mapService, "map");

          // 如果有reduce阶段,运行ReduceTasks
        try {
          if (numReduceTasks > 0) {
            List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
                jobId, mapOutputFiles);
            ExecutorService reduceService = createReduceExecutor();
              //运行ReduceTask
            runTasks(reduceRunnables, reduceService, "reduce");
          }
        } finally {
          for (MapOutputFile output : mapOutputFiles.values()) {
            output.removeAll();
          }
        }
        
    }

四、核心配置

1.如何确定当前Job使用什么样的输出格式?

​ 由job.getOutputFormatClass(),此方法从参数中默认读mapreduce.job.outputformat.class

,如果没有配置,默认使用TextOutputFormat

public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
     throws ClassNotFoundException {
    return (Class<? extends OutputFormat<?,?>>) 
      conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
  }

默认在自己不设置的的情况下,默认使用TextOutputFormat.class!

2.如何确定有没有reduce阶段

​ Job.getNumReduceTask()获取当前Job要启动多少个ReduceTask!

//mapreduce.job.reduces
public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }

3.如何获取Mapper

 public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
     throws ClassNotFoundException {
    return (Class<? extends Mapper<?,?,?,?>>) 
         //mapreduce.job.map.class
      conf.getClass(MAP_CLASS_ATTR, Mapper.class);
  }

4.如何获取输入格式

​ 尝试获取配置中设置的mapreduce.job.inputformat.class的值,如果没有设置默认使用TextInputFormat

public Class<? extends InputFormat<?,?>> getInputFormatClass() 
     throws ClassNotFoundException {
    return (Class<? extends InputFormat<?,?>>) 
      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
  }

​ 默认TextInputFormat使用LineRecordReader!

5.切片的说明

​ 通常情况下,如果是从文件中获取切片,那么使用FileSplit作为切片对象!

​ FileSplit有三个关键的属性:

​ Path p: 当前切片属于哪个文件

​ int start: 当前切片的起始位置是从这个文件的哪个位置开始

​ int length: 当前切片的长度

​ 从指定文件的start位置开始,读取length的长度,就属于当前切片!

​ 切片和块没有任何关系!只不过每次从指定文件中读取每片数据时,数据实际上以块的形式存储在HDFS,

读取数据时,会访问到指定的块!具体哪个切片对应哪些块,取决于切片如何切,没有直接的对应关系!

​ 注意:默认情况下,使用FileInputFormat,在切片时,默认以块大小作为片大小!

​ 每片刚好是读取一个块!

五、Map阶段的运行

1.MapTaskRunnable.run()

查看LocalJobRunner.Job.MapTaskRunnable,类似于封装了Container,在其中运行了MapTask!

public void run() {
        try {
          TaskAttemptID mapId = new TaskAttemptID(new TaskID(
              jobId, TaskType.MAP, taskId), 0);
          LOG.info("Starting task: " + mapId);
          mapIds.add(mapId);
            //创建一个 MapTask对象,这个对象代表当前的MapTask,负责这个Task的总的运行流程
          MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
            info.getSplitIndex(), 1);
         ......
             //运行MapTask
            map.run(localConf, Job.this);
           
    }

2.MapTask.run()

注意: 如果有reduce,那么Map阶段分为两个阶段

​ map------67%

​ sort-------33%

​ 如果没有reduce,Map阶段只有map一个阶段

​ 只有有reduce阶段时,数据才会排序!如果没有reduce阶段数据是按照读入的顺序处理后,输出!

@Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;
	//定义了整个 MapTask的阶段划分
    if (isMapTask()) {
      // If there are no reducers then there won't be any sort. Hence the map 
      // phase will govern the entire attempt's progress.
      if (conf.getNumReduceTasks() == 0) {
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        mapPhase = getProgress().addPhase("map", 0.667f);
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
    .......

    if (useNewApi) {
        // 开始运行Mapper
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

3.运行Mapper

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper 一个MapTask只会创建一个Mapper对象            
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format 创建输入格式对象
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split 重建当前MapTask的切片
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

     //构建MapTask的输入对象,负责整个MapTask的输入工作,RecordReader由input负责进行调用读取数据
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    //构建MapTask的输出对象
    // get an output object
    if (job.getNumReduceTasks() == 0) {
        //如果没有reduce阶段,由Map收集输出的数据,直接输出
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
        // 创建记录收集器
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);

          //构建Mapper中使用的context对象,代表MapTask的上下文(来龙去脉),
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
        // 会执行输入过程中所需要组件的一系列初始化
        // 调用RecordReader.initialize()
      input.initialize(split, mapperContext);
        // 调用自己编写的Mapper的run()
      mapper.run(mapperContext);
      mapPhase.complete();
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
  }

4.Mapper.run()

​ Mapper在运行时:

​ KEYIN-VALUEIN: 取决于输入格式的RR的设置!

​ 默认用LongWritable-Text

public void run(Context context) throws IOException, InterruptedException {
    //在map()之前只被调用一次
    setup(context);
    try {
        //调用RecoredReader的nextKeyValue()
      while (context.nextKeyValue()) {
          //每读取一对输入的KEYIN-VALUEIN,执行一次map记录
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
        //在map()之后被调用1次
      cleanup(context);
    }
  }

六、MapTask的shuffle

1.缓冲区对象的构建

NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
    //创建输出中的缓冲区对象,这个缓冲区不仅可以用来收集数据还会对数据进行排序
      collector = createSortingCollector(job, reporter);
   
      }
    }

2.确定Partitioner

 // 根据reduceTask的数量,确定Map阶段总的分区数(不等于实际上数据的分区)
      partitions = jobContext.getNumReduceTasks();
// reduceTask个数>1,就使用用户配置的Partitioner
      if (partitions > 1) {
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };

从配置中获取用户定义的Partitioner

public Class<? extends Partitioner<?,?>> getPartitionerClass() 
     throws ClassNotFoundException {
    //获取mapreduce.job.partitioner.class值,如果没有设置,使用HashPartitioner作为默认
    return (Class<? extends Partitioner<?,?>>) 
      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
  }

HashPartitioner的工作原理: key相同的会分到同一个区

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

3.缓冲区的初始化

​ 默认使用MapOutPutCollector作为缓冲区对象!

public void init(MapOutputCollector.Context context
                    ) throws IOException, ClassNotFoundException {
     
     
      partitions = job.getNumReduceTasks();
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

      //sanity checks
    // 从配置中获取溢写的百分比,默认读取mapreduce.map.sort.spill.percent,如果没有配置,使用
     // 0.8作为百分比
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
    // 缓冲区的初始化大小,默认读取mapreduce.task.io.sort.mb,如果没有配置,默认使用100
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
    
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
    
      // 默认使用快排
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);
     

      // k/v serialization
    // 确定key的比较器
      comparator = job.getOutputKeyComparator();
    // 获取Mapper输出的key-value的类型
      keyClass = (Class<K>)job.getMapOutputKeyClass();
      valClass = (Class<V>)job.getMapOutputValueClass();
    
      serializationFactory = new SerializationFactory(job);
    //根据key的类型返回指定的序列化器
      keySerializer = serializationFactory.getSerializer(keyClass);
      keySerializer.open(bb);
      valSerializer = serializationFactory.getSerializer(valClass);
      valSerializer.open(bb);

     

      // compression 在mapper的输出阶段使用压缩
      if (job.getCompressMapOutput()) {
        Class<? extends CompressionCodec> codecClass =
          job.getMapOutputCompressorClass(DefaultCodec.class);
        codec = ReflectionUtils.newInstance(codecClass, job);
      } else {
        codec = null;
      }

      // combiner  设置combiner
      final Counters.Counter combineInputCounter =
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
      combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                             combineInputCounter,
                                             reporter, null);
      if (combinerRunner != null) {
        final Counters.Counter combineOutputCounter =
          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
      } else {
        combineCollector = null;
      }
    }

4.确定key的比较器

 public RawComparator getOutputKeyComparator() {
     //尝试获取参数中配置的mapreduce.job.output.key.comparator.class,作为比较器,
     //如果没有定义,默认为null,定义的话必须是RawComparator类型
    Class<? extends RawComparator> theClass = getClass(
      JobContext.KEY_COMPARATOR, null, RawComparator.class);
     //如果用户配置,就实例化此类型的一个对象
    if (theClass != null)
      return ReflectionUtils.newInstance(theClass, this);
     // 判断Mapper输出的key是否是writableComparable类型的子类,
     //如果是,就默认由系统提供比较器,如果不是就抛异常!
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
  }

如何在Map阶段定义key的排序:

​ ①要比较的字段,设置为key

​ ②提供一个RawComparator的key的比较器!

​ 或让key实现WritableComparable接口!

根据比较器的compareTo()对key进行比较!

​ 如果比较的条件是多个,称为二次排序!

5.序列化

①什么时候需要序列化

​ 有reduce阶段时,需要Map输出的key-value实现序列化

②怎么实现

​ 实现Writable接口!

③是否必须实现?

​ 不是!

④什么情况下,可以不实现Writable接口,为什么一定要实现Writable接口

​ 只有实现了Writable接口,hadoop才会自动提供基于Writable接口的序列化器!

​ 如果自己提供序列化器,就可以不是先Writable接口

6.总结

①先分区,每个输出的key-value在写出时,先调用partitioner计算分区号,再收集到缓冲区中

②数据被收集进入缓冲区中,当缓冲区达到溢写的条件时,会调用 Sorter对当前缓冲区中的所有的数据进行排序

​ 只排索引(记录排好序的索引的信息)

③按照分区号,从0号开始依次溢写,每次溢写之前,如果设置了Combiner,先Conbine再溢写。

​ 每次溢写会产生一个spillx.out的文件

④所有的数据全部收集到缓冲区后,会执行最后一次flush(),将不满足溢写条件的缓存中的残余数据再次溢写

⑤flush()之后,会调用mergeParts()进行合并

​ 在合并时,先将多个文件的同一个分区的数据进行合并,合并后再排序!

​ 之后讲所有分区的数据溢写为一个final.out文件!

​ 在溢写之前,如果设置了Combiner,并且之前溢写的片段个数>=3,此时会再次调用Combiner,Combine后再溢写

一句话: 分区—排序—按照分区号溢写(combine)… 合并—溢写[combine]

七、ReduceTask的运行

1.ReduceTaskRunable.run()

public void run() {
        try {
          TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
              jobId, TaskType.REDUCE, taskId), 0);
          
			//创建reduceTask对象
          ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
              reduceId, taskId, mapIds.size(), 1);
          ......
            try {
             // 运行reduceTask.run()
              reduce.run(localConf, Job.this);

    }

2.ReduceTask.run()

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    //阶段定义划分
    if (isMapOrReduce()) {
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
   .....
    
    // Initialize the codec  如果MapTask写出有使用压缩,在此时获取压缩格式的编解码器进行解压缩
    codec = initCodec();
    RawKeyValueIterator rIter = null;
    //定义shuffle阶段的消费者线程,从MapTask输出的结果中将指定分区的数据拷贝到ReduceTask
    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    
    // 定义combiner,在reduceTask端合并多个MapTask同一分区的数据时,如果reduceTask内存不够
    //会发生溢写,在每次溢写前,还会调用combiner!
    Class combinerClass = conf.getCombinerClass();
    CombineOutputCollector combineCollector = 
      (null != combinerClass) ? 
     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

    
   //初始化shuffle线程,调用其run()
    rIter = shuffleConsumerPlugin.run();

    // free up the data structures   shuffle阶段已经完成了copy过程
    mapOutputFilesOnDisk.clear();
    
    // 在shuffle中,sort也已经完成。已经按照Mapper的输出,对所有的数据进行了整体的排序
    sortPhase.complete();                         // sort is complete
    setPhase(TaskStatus.Phase.REDUCE); 
    statusUpdate(umbilical);
    
    //获取Mapper输出的key-value的类型
    Class keyClass = job.getMapOutputKeyClass();
    Class valueClass = job.getMapOutputValueClass();
    
    //定义分组比较器
    RawComparator comparator = job.getOutputValueGroupingComparator();

    if (useNewApi) {
        //运行Reducer
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }

    shuffleConsumerPlugin.close();
    done(umbilical, reporter);
  }

3.获取分组比较器

 public RawComparator getOutputValueGroupingComparator() {
     //先获取用户定义的比较器,从配置中获取mapreduce.job.output.group.comparator.class
     //参数,必须是RawComparator类型
    Class<? extends RawComparator> theClass = getClass(
      JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
     //如果用户没有定义,默认使用Map阶段key的比较器
    if (theClass == null) {
      return getOutputKeyComparator();
    }
    //如果用户定义了,就使用用户的比较器
    return ReflectionUtils.newInstance(theClass, this);
  }

4.runNewReducer

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass
                     ) throws IOException,InterruptedException, 
                              ClassNotFoundException {
    // wrap value iterator to report progress.
   //封装key-value的迭代器,让迭代器可以报告进度
    final RawKeyValueIterator rawIter = rIter;
    rIter = new RawKeyValueIterator() {
      public void close() throws IOException {
        rawIter.close();
      }
        //每次迭代key-value时,并不是将数据读取后直接封装为key-value
        //而是获取当前key-value的byte[]的内容,再使用反序列化
        //把这部分内容的属性设置到key-value的实例中
      public DataInputBuffer getKey() throws IOException {
        return rawIter.getKey();
      }
      public Progress getProgress() {
        return rawIter.getProgress();
      }
      public DataInputBuffer getValue() throws IOException {
        return rawIter.getValue();
      }
      public boolean next() throws IOException {
        boolean ret = rawIter.next();
        reporter.setProgress(rawIter.getProgress().getProgress());
        return ret;
      }
    };
    // 创建上下文
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
          getTaskID(), reporter);
    // 实例化reducer对象,一个reduceTask只会创建一个reducer对象
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
    job.setBoolean("mapred.skip.on", isSkipping());
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
                                  //reduce中的context
    org.apache.hadoop.mapreduce.Reducer.Context 
         reducerContext = createReduceContext(reducer, job, getTaskID(),
                                               rIter, reduceInputKeyCounter, 
                                               reduceInputValueCounter, 
                                               trackedRW,
                                               committer,
                                               reporter, comparator, keyClass,
                                               valueClass);
    try {
        //运行reducer.run()
      reducer.run(reducerContext);
    } finally {
      trackedRW.close(reducerContext);
    }
  }

5.Reducer.run()

public void run(Context context) throws IOException, InterruptedException {
    //在reduce()之前,调用一次setUp()
    setup(context);
    try {
        //判断数据中是否有和当前读取的key相同的key-value,如果读到了相同的key-value,进入一次reduce()
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
         //在reduce()之前,调用一次cleanUp()
      cleanup(context);
    }
  }

八、自定义组件

1.自定义输入格式

核心提供一个RecordReader

​ initialize(): 不需要手动调用,框架在运行时,在进入 Mapper处理之前会自动调用!

​ boolean nextKeyValue(): 读取输入的记录,封装为key-value对,如果封装完成,返回true,否则返回false

​ 只有返回true,才会调用map()

2.自定义Partitioner

​ 继承Partitioner,实现public int getPartition(Text key, FlowBean value, int numPartitions)!

​ 注意: 分区号必须为int型的值,且必须符合 0<= partitionNum < numPartitions

if (partition < 0 || partition >= partitions) {
        throw new IOException("Illegal partition for " + key + " (" +
            partition + ")");
      }

3.实现RawComparator

​ 对Flowbean的sumFlow不使用Key实现WriableComparable接口的方式!进行降序的全排序!

/**
 * @author: WangMC
 * @date: 2020/3/2 21:56
 * @description:
 */
public class RawComparatorImpl implements RawComparator<FlowBean> {


    private  FlowBean key1 = new FlowBean();
    private  FlowBean key2 = new FlowBean();
    private DataInputBuffer buffer = new DataInputBuffer();


    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

        try {
            buffer.reset(b1, s1, l1);                   // parse key1
            key1.readFields(buffer);

            buffer.reset(b2, s2, l2);                   // parse key2
            key2.readFields(buffer);

            buffer.reset(null, 0, 0);                   // clean up reference
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        return compare(key1, key2);                   // compare them
    }


    @Override
    public int compare(FlowBean o1, FlowBean o2) {
        int result;

        // 按照总流量大小,倒序排列
        if (o1.getSumFlow() > o2.getSumFlow()) {
            result = -1;
        } else if (o1.getSumFlow() < o2.getSumFlow()) {
            result = 1;
        } else {
            // 二次排序 按下行流量倒序排序
            result = o1.getDownFlow() > o2.getDownFlow() ? -1 : 1;
        }
        return result;
    }


}


发布了24 篇原创文章 · 获赞 27 · 访问量 6941

猜你喜欢

转载自blog.csdn.net/qq_39261894/article/details/104630148