Hadoop02学习之MapReduce&YARN

1. MapReduce

MapReduce原语:
输入(格式化k,v)数据集 → map映射成一个中间数据集(k,v)→ reduce
“相同”的key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算。

MapReduce的含义:MapTask&ReduceTask:
具体的原理:
一个数据块可以分为多个切片,一个切片对应一个map进行处理,map进行处理后的数据会变为k&v键值对的数据,经过排序之后会变为含有某种人为设定的排序的键值对数据,经过reduce处理,reduce的数量是由人为设定的,reduce最重要的就是需要根据原语来设定,一组数据只能经由一个reduce进行汇聚处理,不能一组数据经由多个reduce进行处理,这样会导致输出数据不一致。进入到reduce进行处理之前,会进行一个归并排序,将所有输入到此reduce中处理的数据整体变为有序,便于reduce处理。

各个模块之间的对应关系:

模块A 模块B 对应关系 含义
block块 split分片 N:1 N个块对应一个分片
1:1 一个块对应一个分片
1:N 一个块对应多个分片
split分片 map 1:1 分片决定map数
map reduce N:1 N个map对应一个reduce
N:N N个map对应N个reduce
1:1 一个map对应一个reduce
1:N 一个map对应N个reduce
group(key) partition(reduce) 1:1 一个分组对应一个reduce
N:1 N个分组对应一个reduce
N:N N个分组对应N个reduce
1:N 一个分组对应N个reduce违背了原语,数据会错误
partition outputfile 1:1 一个reduce对应一个输出

在这里插入图片描述

2. Shuffle拷贝机制

Hadoop:放大实现机制
Shuffler<洗牌>:框架内部实现机制
分布式计算节点数据流转:连接MapTask与ReduceTask

输入分片split数据,以记录的形式传入到map中进行处理,输出的时候会触发算法机制,对数据进行一个排序,输出k&v&p(p是分区)三维度的数据,传入后的数据需要写入内存,如果是每产生一条就写一条的话,对于本地内存的I/O访问太过于频繁,所以采用内存缓冲区(buffer)的机制,当buffer缓存区满了的时候(之前所有的操作都是基于内存中的),就flush刷成一个小文件(存在磁盘),小文件是先按照p也就是分区来,然后按照k来排序,形成一个个有序的小文件(内部有序,外部无序),map一直处理下去,直到没有文件为止。然后归并到一个文件,此时文件排序是内部有序外部无序的,就是将刚刚的小文件进行一个合并操作。此时reduce task从所有的文件中拉取属于自己分区的数据,按照之前排序的规则进行以此合并(不具备重新排序的能力),合并成两个文件,此时的两个文件也是内部有序,外部无序的状态,此时只需要做一次reduce就可以获取到需要的数据,此时的合并与计算是同时的,也就是少了一次两个文件合并成一个文件再送入reduce中进行处理的过程。
在这里插入图片描述
示例:
首先对输入的数据做一个分片处理,map进行处理,形成k&v的形式数据,通过洗牌机制,各自拉取属于自己分区的数据,进行合并计算,最后输出结果。
在这里插入图片描述
释义:

  • Map:
    • 读懂数据
    • 映射为KV模型
    • 并行分布式
    • 计算向数据移动
  • Reduce:
    • 数据全量(一组数据集)/分量(数据集中的一组)加工
    • Reduce中可以包含不同的key
    • 相同的key汇聚到一个reduce中
    • 相同的key调用一次reduce方法
    • 排序实现Key的汇聚
  • K,V使用自定义数据类型
    • 作为参数传递,节省开发成本,提高程序自由度
    • Writable序列化:能使分布式程序数据交互
    • Comparable比较器:实现具体排序(字典序,数值序等)

3. Hadoop运行架构分析

Hadoop 1.x的运行架构(体现计算向数据移动
客户端:以作业为单位、规划作业计算分布、提交作业资源到HDFS、最终提交作业到JobTracker
JobTracker:核心,主节点,单点的、调度所有的作业、监控整个集群的资源负载
TaskTracker:从节点,自身节点资源管理、与JobTracker之间有心跳机制,向其会报资源,获取Task任务

弊端:

  • JobTracker:负载过重、存在单点故障
  • 资源管理与计算调度强耦合,其他计算框架需要重复实现资源管理
  • 不同框架对资源不能全局管理(主要涉及到TaskTracker在不同的框架JobTracker内的调度选择问题)

Hadoop架构原理的说明:
Clients首先需要规划作业计算分布、获取到作业的基本配置、然后将作业资源(jar包、配置信息、切片清单)提交到HDFS的某个目录上,副本数默认为10,如果有很多map来拉取的时候可以到不同的节点拉取,避免多个map到同一个节点读取文件信息的时候单个节点的资源受限问题发生,客户端会连接JobTracker,TaskTracker与JobTracker之间基于心跳机制,TaskTracker(与DataNode是一对一的,汇报的资源也是DataNode的资源)向JobTracker汇报自己的资源信息,同时从JobTracker中获取调度的任务,如果发现有任务,就自行去HDFS下载运行任务所需要的资源包,然后在本地启动一个jvm执行任务。JobTracker的任务调度主要是针对MapTask与ReduceTask来的,所以被分配任务的TaskTracker执行的任务也就在这种两种之内了。

在1.x中存在的问题在上面的弊端中也有提及,最主要的是,当有多个JobTracker的时候,此时如果JobTracker选择的是第一个TaskTracker进行处理的话,如果此时有一个/duanxi的JobTracker也需要调度分配任务,此时对于它来说,第一个TaskTracker是空闲的,实质不是,这是因为他不能知晓第一个TaskTracker是否被任务调度中,因此在2.x中引入了YARM资源管理器,有效的避免了这种情况的发生。
在这里插入图片描述
计算框架Mapper:
客户端需要获取一个文件中某个资源的大小,此时会将任务传递下去,JobTracker接收到任务之后,通过与TaskTracker之间的心跳机制,给各个TaskTracker分配计算任务,此时他们会从HDFS中下载属于自己计算的一块资源,开启jvm进行计算。
在这里插入图片描述
计算框架Reducer
当mapper计算任务完毕之后,JobTracker会分配任务给TaskTracker执行ReduceTask,此时根据JobTracker调度的ReduceTask个数,拉取MapTask计算出的数据到ReduceTask进行汇聚,将计算结果写入到HDFS中,客户端从中获取到需要的数据。
在这里插入图片描述

4. YARN资源管理器

YARN:(Yet Another Resource Negotiator,另一种资源协调者)
YARN主要是用来解决在1.x架构中遗留的问题,就是JobTracker作为资源调度者还负责资源的监控,功能过于冗余,此YARN通过ResourceManager来资源管理,有效的避免了这类问题的发生,而且对于系统来说对于DataNode的使用也相对均衡。

具体的工作流程:
当客户端发送一个请求到Resourcemanager中,告知我需要某种资源,此时不像1.x中由JobTracker来调度了,此时RM接收到以后,会在DataNode中选择一个不忙的节点,开辟一个进程Application Master,其任务是与来自ResourceManager的资源进行协商,并与NodeManager一起执行和监视任务,此时它会获取到切片的清单,此时会与RM请求,询问RM自己切片中的清单应该在哪去处理,请求RM根据自己资源的情况调度相应的DataNode来创建MapTask任务,这样的就叫做容器,RM会帮助其开辟container,然后将这些container返回给Application Master,此时容器总的处理情况会返回给Application Master。
如果此时多出一个客户端,需要走一趟上面的运行流程。加入了RM以后,对于DataNode的压力过大问题可以得到很好的解决。
NodeManager会返回RM其所在节点的状态。
在这里插入图片描述
MRv2 On YARN:

  • YARN:解耦资源与计算
  • ResourceManager:主,核心,主要负责集群节点资源管理
  • NodeManager:给RM汇报资源,与DataNode是一比一配置,管理container的生命周期,计算框架中的角色都以container表示
  • Container:【节点NM,CPU,MEM,I/O大小,启动命令】
    • 默认NodeManager启动线程监控container的大小,超出申请资源额度则直接kill(这种一般是人为对资源大小计算不当造成的)
    • 支持linux内核的cgroup
  • MR:
    • MR-ApplicationMaster-Container
    • 以作业为单位,避免单点故障的发生,负载到不同的节点
    • 创建Task需要和RM申请资源(Container)
    • Task Container
  • Client:
    • RM-Client:请求资源创建AM
    • AM-Client:与AM交互

YARN可以实现高可用,在后面会配置实现。
在这里插入图片描述

4.1 资源管理器高可用的实现

配置ResourceManager:
主要是配置yarn-site.xml与mapred-site.xml
mapred-site.xml

<property>
            <name>mapreduce.framework.name</name>
            <value>yarn</value>
    </property>
 <property>
        <name>mapreduce.application.classpath</name>
               <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
    </property>

yarn-site.xml

<!-- 选择服务 -->
<property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
</property>
<!-- 开启resourceManager -->
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<!-- rm集群id -->
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>cluster1</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>node03</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>node04</value>
</property>
<!-- rm的web地址 -->
<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>node03:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>node04:8088</value>
</property>
<!-- zookeeper地址 -->
<property>
  <name>hadoop.zk.address</name>
  <value>node02:2181,node03:2181,node04:2181</value>
</property>
<property>
	<name>mapreduce.jobhistory.address</name>
	<value>node03:10020</value>
</property>
<property>
	<name>mapreduce.jobhistory.webapp.address</name>
	<value>node03:19888</value>
</property>
<property>
	<name>mapreduce.jobhistory.address</name>
	<value>node04:10020</value>
</property>
<property>
	<name>mapreduce.jobhistory.webapp.address</name>
	<value>node04:19888</value>
</property>
<!-- 白名单 -->
<property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
    </property>

在这里插入图片描述
上述的配置都是按照上图的配置来的,在hadoop的HA的存在下添加了RS与NM的配置,其中NodeManager与DataNode的配置是一比一的。
使用start-yarn.sh可以开启服务,此时使用jps,在node03与node04上看是否含有resourceManager进程。没有的话,需要手动启动ResourceManager进程。hdfs --daemon start resourcemanager.
启动后可以进入到网址http://node04:8088/ 可以看到rm集群的详细信息。
在这里插入图片描述
与hadoop的ha不同的是,resourceManager的集群由于没有通过zkfc来做一个中转站传递信息,而至直接整合了zookeeper作为一个进程,在rm的standby中是不能够看见集群的任何信息的,standby节点一直处于等待的状态。
在这里插入图片描述
zkfc启动的命令:hdfs --daemon start zkfc如果配置了快速失败转移没有开启zkfc的话,配置的两个NameNode主从节点都将处在standby状态。

试验:
用resouceManager来进行文件的计算工作,具体的流程如下,首先需要在hdfs下建立一个文件夹,传入一个文件进入
在这里插入图片描述
进入到hadoop的目录下的share/hadoop/mapred中,启动其中的jar包,使用其中的wordcount程序。
在这里插入图片描述
在这里插入图片描述
此时在我们的集群网址上也可以看见
在这里插入图片描述
此时进入到http://node01:9870的文件系统可以看见我们设置的输出目录下多出了计算的文件。
在这里插入图片描述
完成试验。

5. 实现wordcount功能

手写wordcount程序并打包成jar文件在linux上运行。

// 用作单词统计
public class MyWC {
    // 客户端
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration(true);
        Job job = Job.getInstance(conf);

        // 强转获取job的配置信息
        JobConf jobConf = (JobConf) job.getConfiguration();
        job.setJobName("wordCount");
        job.setJarByClass(MyWC.class);
        // 设置文件的输入路径
        Path input = new Path("/user/root/sort/txt");
        FileInputFormat.setInputPaths(job, input);
        // 设置文件的输出路径,由于hadoop规定不允许输出目录为已存在的目录
        // 所以需要校验输出的文件目录是否存才,如果存在的话需要先删除
        Path output = new Path("/data/wc/output");
        // 从配置中获取到文件系统
        if (output.getFileSystem(conf).exists(output)) {
            // 保证递归删除
            output.getFileSystem(conf).delete(output, true);
        }
        FileOutputFormat.setOutputPath(job, output);
		// 设置mapper的类
        job.setMapperClass(MyMapper.class);
        // 设置输出的key和value的类型,方便reduce拉取属于自己的数据
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setReducerClass(MyReducer.class);
        // 等待完成
        job.waitForCompletion(true);
    }
}

Mapper类

public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        // 分割生成数据迭代器
        StringTokenizer itr = new StringTokenizer(value.toString());
        // 只要存在存在数据,设置数据,写入
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

Reduce类:编写reduce时候需要记住,相同的key为一组, 调用一次reduce方法,在方法内迭代这一组数据,进行计算

public class MyReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    	// 跌代进行计算,只要含有值就+下去
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        // 设置结果,写入,注意写入的格式为key result(int)
        result.set(sum);
        context.write(key,result);
    }
}

将上述三个类打包成jar包,传输到linux上运行,结果与之前直接使用的hadoop给出的example的jar包得出的结果一致。

6. Hadoop源码分析

6.1 client端源码分析

 // 主要实现提交工作到集群并等待集群处理完毕
 public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
}
--------------------------------------------------------------------------
// 提交到集群,然后立即返回
public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    // 设置使用新的api
    setUseNewAPI();
    // 与集群进行连接
    connect();
    // 根据集群获取到存储作业专用文件的文件系统以及客户端获取到工作提交者
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
      	// 工作提交者使用提交工作的内部方法进行工作向集群的提交(!!!有详情)
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
}
----------------------------------------------------------------------
JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 校验工作输出的规格(!!!有详情)
    checkSpecs(job);
    Configuration conf = job.getConfiguration();
    // 写入切片清单个数(!!!!有详情)
     int maps = writeSplits(job, submitJobDir);
    // 设置map的个数
     conf.setInt(MRJobConfig.NUM_MAPS, maps);
}
-------------------------------------------------------------------------
private void checkSpecs(Job job) throws ClassNotFoundException, 
      InterruptedException, IOException {
    // 获取配置
    JobConf jConf = (JobConf)job.getConfiguration();
    // Check the output specification 检查输出的规格
    if (jConf.getNumReduceTasks() == 0 ? // 获取对于使用reducer配置的码,默认为1
    // 为0的话使用此码进行mapper运算,否则进行reducer计算
        jConf.getUseNewMapper() : jConf.getUseNewReducer()) {
        // 获取输出实例,利用反射工具类获取一个新的实例
      org.apache.hadoop.mapreduce.OutputFormat<?, ?> output =
        ReflectionUtils.newInstance(job.getOutputFormatClass(),
          job.getConfiguration());
      output.checkOutputSpecs(job);
    } else {
      jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
    }
}
------------------------------------------------------------------------
public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
     throws ClassNotFoundException {
    return (Class<? extends OutputFormat<?,?>>) 
    // 获取format配置,如果有设置格式,则返回设置的格式,否则返回文本类型的format
      conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
}
-----------------------------------------------------------------------
 private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    // 根据前面设置的使用新的api,直接执行写入新的切片,注意默认为false
    // (!!!!有详情)
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    }
  }
 ------------------------------------------------------------------------
 private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
	// 根据输入文件获取切片列表(!!!有详情)
	// 在其父类中有此方法的实现!!!!!!
    List<InputSplit> splits = input.getSplits(job);
  }
 ------------------------------------------------------------------------
 //  最重要的方法
 public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    // 计算最大值,获取格式化之后切片的最小值,默认为1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    ------------------------------------------------------------------
    // 获取工作切片的最小值,最小的切片大小与1做对比获取最小的
    public static long getMinSplitSize(JobContext job) {
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
  }
------------------------------------------------------------------
	// 获取切片的最大值
    long maxSize = getMaxSplitSize(job);
------------------------------------------------------------------	
 	// 获取切片的最大值,将切片的最大值与long的最大值对比,返回的是long
	public static long getMaxSplitSize(JobContext context) {
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }
------------------------------------------------------------------	
    // 生成切片
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
	// 判断是否要无视其中的文件,多目录需要递归
    boolean ignoreDirs = !getInputDirRecursive(job)
      && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
    // 遍历所有的文件
    for (FileStatus file: files) {
      // 如果含有目录,则一直走到最底层文件再做处理
      if (ignoreDirs && file.isDirectory()) {
        continue;
      }
      // 获取文件路径
      Path path = file.getPath();
      // 获取文件的长度
      long length = file.getLen();
      // 如果长度不为零的话,进行块位置的获取
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          // 根据配置获取文件系统,以及获取所有的文件块资源信息
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        // 判断给定的文件名是否可以切分
        if (isSplitable(job, path)) {
          // 获取文件块的大小
          long blockSize = file.getBlockSize();
          // 根据块的大小,最小值,最大值获取分割的切片大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
		  // 依然存在的文件字节
          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          // 获取块位置的索引
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            // 添加(开始制作文件的切片)
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            // 每次减去一个切片的大小,上面在获取索引的时候就可以获取到块位置的索引
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } 
    return splits;
  }  

上述源码中,对于切片制作这个环节,切片制作需要的信息有文件路径,起始位置(偏移量),切片的大小,主机名列表,以及缓存副本的主机名列表

Client源码的大致流程为:创建工作,提交工作,调用内部方法处理工作,获取map的个数,通过写切片的方式获取切片的list列表,生成切片:根据块的大小和切片的大小计算每一个块的偏移量并且由此创建切片。

6.2 map-input的源码分析:

// 最后还是主要调用run方法来运行
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
 }
 ---------------------------MapTask中源码-------------------------------
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;
	// map任务
    if (isMapTask()) {
      // 如果不是reduce任务,所有的进度权重都给到map阶段
      if (conf.getNumReduceTasks() == 0) {
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        // reduce有排序和reduce阶段
        mapPhase = getProgress().addPhase("map", 0.667f);
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
    TaskReporter reporter = startReporter(umbilical);

    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
 }
 ---------------------------runNewMappper源码-----------------------------
 private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    // 获取任务上下文
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper 获取mapper,根据反射获取实例
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format 获取mapper的输入格式化
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split 重建输入的分片
    org.apache.hadoop.mapreduce.InputSplit split = null;
    // 获取切片中的位置信息与起始的偏移量
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);
	// 获取记录读取器
	// 上面对于输入的格式化和切片的重建工作,这里需要拿到流以后定义怎么去读这个输入文件
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    // get an output object
    if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }
	// 获取map的上下文
	// 上下文中含有输入输出, 则可以得出Mapper类中的content的getCurrentyKey
	// 实际上是取得输入对象的LineRecorder
    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
      // 根据分片与上下文初始化输入
      input.initialize(split, mapperContext);
      // 初始化以后运行上下文文件
      mapper.run(mapperContext);
      // map阶段结束
      mapPhase.complete();
      // 开始排序
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      // 输入关闭
      input.close();
      // 置为null
      input = null;
      // 输出关闭,其中包括数据的flush
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
 }
 ------------------------NewTrackingRecordReader源码------------------------
 NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
        org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
        TaskReporter reporter,
        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
        throws InterruptedException, IOException {
      List <Statistics> matchedStats = null;
      if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
        matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
            .getPath(), taskContext.getConfiguration());
      }
      fsStats = matchedStats;

      long bytesInPrev = getInputBytes(fsStats);
      // 在这里创建一个记录读取器,利用分片与任务上下文
      // real对象来源于LineRecorderReader
      this.real = inputFormat.createRecordReader(split, taskContext);
      long bytesInCurr = getInputBytes(fsStats);
      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
    }
 ---------------------------initialize源码---------------------------
 public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    // 获取分片文件以及上下文中获取到配置
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
    // 获取分片的起始与结束
    start = split.getStart();
    end = start + split.getLength();
    /// 获取文件路径
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    // 获取文件系统并且利用偏移量到每个分片的开头
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file);
    
    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
    if (null!=codec) {
      // ......
      } else {
        if (start != 0) {
          // So we have a split that is only part of a file stored using
          // a Compression codec that cannot be split.
          throw new IOException("Cannot seek in " +
              codec.getClass().getSimpleName() + " compressed stream");
        }

        in = new SplitLineReader(codec.createInputStream(fileIn,
            decompressor), job, this.recordDelimiterBytes);
        filePosition = fileIn;
      }
    } else {
      // 偏移到起始位置
      fileIn.seek(start);
      in = new UncompressedSplitLineReader(
          fileIn, job, this.recordDelimiterBytes, split.getLength());
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    // 除了第一个块以外的所有分片读取都会将自己的第一行让出去,然后自己多读取一行,
    // 这样可以保证在切片跨两个块的时候,数据的还原不会出现乱码
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }
 -------------------run源码----------------------
 public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      // 实际调用的是LineRecorderReader中的nextKeyValue方法
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
 ----------------------------nextKeyValue()源码--------------------------
 public boolean nextKeyValue() throws IOException {
    if (key == null) {
    // 放置偏移量
      key = new LongWritable();
    }
    key.set(pos);
    // 赋值value
    if (value == null) {
      value = new Text();
    }
  }
 -----------------------------------------------------------
 run方法中获取key和value的方法
 @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  @Override
  public Text getCurrentValue() {
    return value;
  }

input源码中可以看出,在input的阶段我们主要是为了数据的输入,数据输入需要获取数据的输入流,由于hadoop采用的是并行分布式,每个map处理一个切片的方式,所以需要通过seek偏移到切片的起始位置,由于在文件存储的时候,hdfs会做数据切割处理,所有的切片读取的时候都会放让出第一行,然后多读取一行(除了第一块切片)。

6.3 map-output源码分析

void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
                             // get an output object
    if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      // 当前执行的reducetask不会为0,进入到输出收集器方法
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }
}
------------------------------NewOutputCollector源码----------------------
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
      collector = createSortingCollector(job, reporter);
      // 获取reduce任务数,以此确定分区,分区与reduce数是一比一
      partitions = jobContext.getNumReduceTasks();
      if (partitions > 1) {
      	// 如果大于1个分区,反射获取分区器的实例
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
      	// 如果等于1,则返回分区等于0的分区器
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }
    }
@Override
    public void write(K key, V value) throws IOException, InterruptedException {
    // 上下文对象构造(k,v,p)三维度的数据集,放入collect缓冲区中
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
    }
-----------------getPartitioner()源码------------------
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
     throws ClassNotFoundException {
     // 如果有设置就按照设置的来,没有的话默认按照hash算法来分区
    return (Class<? extends Partitioner<?,?>>) 
      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
  }
---------------createSortingCollector()源码------------------------
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    MapOutputCollector.Context context =
      new MapOutputCollector.Context(this, job, reporter);

    Class<?>[] collectorClasses = job.getClasses(
      JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
    int remainingCollectors = collectorClasses.length;
    Exception lastException = null;
    for (Class clazz : collectorClasses) {
      try {
        if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
          throw new IOException("Invalid output collector class: " + clazz.getName() +
            " (does not implement MapOutputCollector)");
        }
        Class<? extends MapOutputCollector> subclazz =
          clazz.asSubclass(MapOutputCollector.class);
        LOG.debug("Trying map output collector class: " + subclazz.getName());
        // 获取收集器对象
        MapOutputCollector<KEY, VALUE> collector =
          ReflectionUtils.newInstance(subclazz, job);
          /// 初始化上下文
        collector.init(context);
        LOG.info("Map output collector class = " + collector.getClass().getName());
        // 返回收集器
        return collector;
      } catch (Exception e) {
        String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
        if (--remainingCollectors > 0) {
          msg += " (" + remainingCollectors + " more collector(s) to try)";
        }
        lastException = e;
        LOG.warn(msg, e);
      }
    }
--------------------------init()源码-----------------------------
public void init(MapOutputCollector.Context context
                    ) throws IOException, ClassNotFoundException {
      job = context.getJobConf();
      reporter = context.getReporter();
      mapTask = context.getMapTask();
      mapOutputFile = mapTask.getMapOutputFile();
      sortPhase = mapTask.getSortPhase();
      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
      partitions = job.getNumReduceTasks();
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

      //sanity checks
      // 溢写阈值,有设置按照设置来,没有设置默认占比80%
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
        // 缓冲区的大小默认为100MB
      final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
          MRJobConfig.DEFAULT_IO_SORT_MB);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }
      // 排序器,这里默认使用的是快排
      // 由于是写入到磁盘中,所以在写入之前使用的是快排先排序
      sorter = ReflectionUtils.newInstance(job.getClass(
                   MRJobConfig.MAP_SORT_CLASS, QuickSort.class,
                   IndexedSorter.class), job);
      // buffers and accounting
      int maxMemUsage = sortmb << 20;
      maxMemUsage -= maxMemUsage % METASIZE;
      kvbuffer = new byte[maxMemUsage];
      bufvoid = kvbuffer.length;
      kvmeta = ByteBuffer.wrap(kvbuffer)
         .order(ByteOrder.nativeOrder())
         .asIntBuffer();
      setEquator(0);
      bufstart = bufend = bufindex = equator;
      kvstart = kvend = kvindex;

      maxRec = kvmeta.capacity() / NMETA;
      softLimit = (int)(kvbuffer.length * spillper);
      bufferRemaining = softLimit;
      LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
      LOG.info("soft limit at " + softLimit);
      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
      LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

      // k/v serialization 排序中用的比较器
      comparator = job.getOutputKeyComparator();
      keyClass = (Class<K>)job.getMapOutputKeyClass();
      valClass = (Class<V>)job.getMapOutputValueClass();
      serializationFactory = new SerializationFactory(job);
      keySerializer = serializationFactory.getSerializer(keyClass);
      keySerializer.open(bb);
      valSerializer = serializationFactory.getSerializer(valClass);
      valSerializer.open(bb);

      // output counters
      mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
      mapOutputRecordCounter =
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
      fileOutputByteCounter = reporter
          .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);

      // compression
      if (job.getCompressMapOutput()) {
        Class<? extends CompressionCodec> codecClass =
          job.getMapOutputCompressorClass(DefaultCodec.class);
        codec = ReflectionUtils.newInstance(codecClass, job);
      } else {
        codec = null;
      }

      // combiner
      // combiner map端数据的合并,一旦设置了合并的话,
      // 在map输出到reduce进行shuffle之前会进行数据的内部的合并,以此减小网络I/O
      final Counters.Counter combineInputCounter =
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
      combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                             combineInputCounter,
                                             reporter, null);
      if (combinerRunner != null) {
        final Counters.Counter combineOutputCounter =
          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
      } else {
        combineCollector = null;
      }
      spillInProgress = false;
      // 如果设置了合并器,当文件数的为3以上时就会触发合并操作
      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
      // 溢写是由另外一个线程来的,由此达到输入与写出的同步并行进行
      spillThread.setDaemon(true);
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
      } finally {
        spillLock.unlock();
      }
      if (sortSpillException != null) {
        throw new IOException("Spill thread failed to initialize",
            sortSpillException);
      }
    }
 -----------------------排序比较器获取的源码-------------------------
 // 默认key按照字典序排列,可以自定义排序规则
 public RawComparator getOutputKeyComparator() {
    Class<? extends RawComparator> theClass = getClass(
      JobContext.KEY_COMPARATOR, null, RawComparator.class);
    if (theClass != null)
      return ReflectionUtils.newInstance(theClass, this);
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
  }

map-output做的事情主要是:对于map输出的是k&v组合的数据,但需要加入分区p的信息,以此交由MapperTask进行处理,写入磁盘的话使用的是环形缓冲区,使用内存默认为100MB,默认阈值为80%,写的时候由于需要写入磁盘,在写之前做一个整体快速排序的过程,在整个运行框架中只有这一次排序,如果从map中输出的文件需要做一个压缩合并,需要配置combiner,生成的小文件先按照分区排序,分区内按照key-value排序,最后归并为一个大文件。排序什么的都是为了减小reduce处理中的I/O阻塞问题。

6.4 Reduce源码分析

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
	
    if (isMapOrReduce()) {
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
    // start thread that will handle communication with parent
    TaskReporter reporter = startReporter(umbilical);
    
    boolean useNewApi = job.getUseNewReducer();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }
    
    // Initialize the codec
    codec = initCodec();
    RawKeyValueIterator rIter = null;
    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    
    Class combinerClass = conf.getCombinerClass();
    CombineOutputCollector combineCollector = 
      (null != combinerClass) ? 
     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

    Class<? extends ShuffleConsumerPlugin> clazz =
          job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
					
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
    LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

    ShuffleConsumerPlugin.Context shuffleContext = 
      new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                  super.lDirAlloc, reporter, codec, 
                  combinerClass, combineCollector, 
                  spilledRecordsCounter, reduceCombineInputCounter,
                  shuffledMapsCounter,
                  reduceShuffleBytes, failedShuffleCounter,
                  mergedMapOutputsCounter,
                  taskStatus, copyPhase, sortPhase, this,
                  mapOutputFile, localMapFiles);
    shuffleConsumerPlugin.init(shuffleContext);
	// 按照顺序迭代
    rIter = shuffleConsumerPlugin.run();

    // free up the data structures
    mapOutputFilesOnDisk.clear();
    
    sortPhase.complete();                         // sort is complete
    setPhase(TaskStatus.Phase.REDUCE); 
    statusUpdate(umbilical);
    Class keyClass = job.getMapOutputKeyClass();
    Class valueClass = job.getMapOutputValueClass();
    // 获取分组比较器
    RawComparator comparator = job.getOutputValueGroupingComparator();

    if (useNewApi) {
      // 运行新的Reducer
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }

    shuffleConsumerPlugin.close();
    done(umbilical, reporter);
  }
--------------------------getOutPutValueComparator源码-------------------
// 如果有设置分组比较器的话就直接获取,如果没有则获取map中的比较器
public RawComparator getOutputValueGroupingComparator() {
    Class<? extends RawComparator> theClass = getClass(
      JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
    if (theClass == null) {
      return getOutputKeyComparator();
    }
    return ReflectionUtils.newInstance(theClass, this);
 }
----------------------runNewReducer源码----------------------------
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass
                     ) throws IOException,InterruptedException, 
                              ClassNotFoundException {
    // wrap value iterator to report progress.
    // 真实的迭代器
    final RawKeyValueIterator rawIter = rIter;
    rIter = new RawKeyValueIterator() {
      public void close() throws IOException {
        rawIter.close();
      }
      public DataInputBuffer getKey() throws IOException {
        return rawIter.getKey();
      }
      public Progress getProgress() {
        return rawIter.getProgress();
      }
      public DataInputBuffer getValue() throws IOException {
        return rawIter.getValue();
      }
      public boolean next() throws IOException {
        boolean ret = rawIter.next();
        reporter.setProgress(rawIter.getProgress().getProgress());
        return ret;
      }
    };
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
          getTaskID(), reporter);
    // make a reducer
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
    job.setBoolean("mapred.skip.on", isSkipping());
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.Reducer.Context 
    // 构建上下文对象,传入比较器
         reducerContext = createReduceContext(reducer, job, getTaskID(),
                                               rIter, reduceInputKeyCounter, 
                                               reduceInputValueCounter, 
                                               trackedRW,
                                               committer,
                                               reporter, comparator, keyClass,
                                               valueClass);
    try {
    // run方法运行reducer上下文
      reducer.run(reducerContext);
    } finally {
      trackedRW.close(reducerContext);
    }
  }
 ---------------ReduceContextImpl源码--------------------
 public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
                           RawKeyValueIterator input, 
                           Counter inputKeyCounter,
                           Counter inputValueCounter,
                           RecordWriter<KEYOUT,VALUEOUT> output,
                           OutputCommitter committer,
                           StatusReporter reporter,
                           RawComparator<KEYIN> comparator,
                           Class<KEYIN> keyClass,
                           Class<VALUEIN> valueClass
                          ) throws InterruptedException, IOException{
    super(conf, taskid, output, committer, reporter);
    this.input = input;
    this.inputKeyCounter = inputKeyCounter;
    this.inputValueCounter = inputValueCounter;
    this.comparator = comparator;
    this.serializationFactory = new SerializationFactory(conf);
    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
    this.keyDeserializer.open(buffer);
    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
    this.valueDeserializer.open(buffer);
    hasMore = input.next();
    this.keyClass = keyClass;
    this.valueClass = valueClass;
    this.conf = conf;
    this.taskid = taskid;
  }

  /** Start processing next unique key. */
  public boolean nextKey() throws IOException,InterruptedException {
  // 是否还有key,判断下一个key是否相同
    while (hasMore && nextKeyIsSame) {
    // 如果都相同,证明在同一组,则获取下一个值,否则跳过
      nextKeyValue();
    }
    if (hasMore) {
      if (inputKeyCounter != null) {
        inputKeyCounter.increment(1);
      }
      return nextKeyValue();
    } else {
      return false;
    }
  }

  /**
   * Advance to the next key/value pair.
   */
  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
  // 判断是否含有数据
    if (!hasMore) {
      key = null;
      value = null;
      return false;
    }
    // 第一个值,将下一个键改为true
    firstValue = !nextKeyIsSame;
    DataInputBuffer nextKey = input.getKey();
    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
                      nextKey.getLength() - nextKey.getPosition());
    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
    // 赋值操作
    key = keyDeserializer.deserialize(key);
    DataInputBuffer nextVal = input.getValue();
    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
        - nextVal.getPosition());
    value = valueDeserializer.deserialize(value);

    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
    currentValueLength = nextVal.getLength() - nextVal.getPosition();

    if (isMarked) {
      backupStore.write(nextKey, nextVal);
    }
	// 判断输入的时候还有数据
    hasMore = input.next();
    // 如果还有数据
    if (hasMore) {
    	// 获取下一个键
      nextKey = input.getKey();
      // 看看下一个键为true还是false
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                     currentRawKey.getLength(),
                                     nextKey.getData(),
                                     nextKey.getPosition(),
                                     nextKey.getLength() - nextKey.getPosition()
                                         ) == 0;
    } else {
      nextKeyIsSame = false;
    }
    inputValueCounter.increment(1);
    return true;
  }

  public KEYIN getCurrentKey() {
    return key;
  }

  @Override
  public VALUEIN getCurrentValue() {
    return value;
  }
---------------------reduce源码分析-------------
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
    // 这个方法获取key,并通过判断下一个key是否相同,以此达到假迭代器在真迭代器中的变换
      while (context.nextKey()) {
      // getValues就是获取迭代器,这是是获取假的迭代器
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }

reduce源码分析:主要就是完成数据的拉取,分组以及保证组内key相同与不同的迭代以此获取每个key的值,这个之间的逻辑,含有一个hasMore来判断是否有新的值,一个nextKeyIsSame来判断下一个key是否与当前的key相同,如果相同的话,继续获取下一个值,否则的话需要进行nextkey的方法的执行,需要更改nextkeyissame方法的布尔值,以此来保证同一组数据的正确迭代下去。实质就是调用run方法运行,调用其中的方法取出值进行跌代,迭代的过程中切换key,然后继续run,最后将所有的数据处理完毕即可。

7. 总结

本次主要学习了Hadoop中的关于MapReduce以及Yarn资源管理器结构,对于MapReduce的理解进一步加深了,对于文件的进入与处理,小案例的编写,具体的jar包的运行,以及Yarn资源管理器的配置,高可用的配置都学习了一下,刚开头难度比较大,很多原理都是符合正常的逻辑思维的,只是暂时的不能够理解为什么这样做,对于Mapreduce中的client端源码,client具体负责的职责,map-input具体做了什么,map-out具体做了什么,以及reduce的源码都简单的分析了一下,特别是对于reduce,一组数据对应一个reduce方法,在方法中迭代一组数据进行 计算,这就是reduce,这是一句原语的话,开发的时候需要遵循这个规则,否则容易出错。

总结一下client-map-reduce,client就是负责工作清单的制作,切片的安排,以及工作的传递,map的话,对于map输入的话需要对切片做一个输入流的处理以让map获取数据,通过seek偏移到每一个切片进行处理(都让出自己的第一行,都读取多一行数据),map输出的话主要是负责将输出的数据加入一个分区的信息,以及对于存储在内存的需要写入到磁盘文件中,需要进行一个排序以及归组成分区加索引的一个文件块,对于此处的写入到磁盘文件,有阈值的设定,有文件缓冲区大小的设定,含有就是对于数据重复过多造成的文件数据过大问题,可以设置合并器combiner来先合并解决,然后在让reduce拉取,从而大大减小网络I/O延迟问题。reduce主要的功能就是做一个数据的拉取,然后对于拉取来的数据做一个数据的计算,具体的就是在真迭代数据中计算出每一个假迭代数据的值最后统一输出即可。

大数据之路漫长,继续加油!!!

发布了74 篇原创文章 · 获赞 12 · 访问量 8214

猜你喜欢

转载自blog.csdn.net/cao1315020626/article/details/101145903