本文,并非讲述shuffle的文章,只是对MapReduce内部的几个细节性问题解答一下:
- 提交的Mapper中,传入的参数context究竟是什么?
- map端数据,是如何传输到reduce端的?
- 我们写好的那些代码,到底是如何被加载并且使用的?
本文,基于Hadoop-1.0.0的源码进行分析,2.0以上的源码最大变动在于yarn。
在MapReduce编程模型中,有一个东西吸引了我的兴趣:代码如下:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
我们可以在Map方法中,看到一个Context:
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
而同样,我们在reduce方法中,也看到了一个Context。
那么,问题来了,这个神奇地接洽了Map的结果,并且作为Reduce最终结果写入地方的Context,究竟是个什么东东?
本文,基于Hadoop-1.0.0的MapReduce源码,琢磨下这个Context的神奇之处:
1:我们先从Mapper的Context开始看起:
public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN, VALUEIN> reader, RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) throws IOException, InterruptedException { super(conf, taskid, reader, writer, committer, reporter, split); } }
我们直接指向了Mapper的代码,发现其中有一个内部类,如上,其继承了MapContext,我们看下:
public MapContext(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN, VALUEIN> reader, RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { super(conf, taskid, writer, committer, reporter); this.reader = reader; this.split = split; }
对于任何一个Task来说,默认是指向一个split的,这个split通常来说是一个文件块的大小;reader即RecordReader将会把文件块拆分成一行一行的文本(这里是针对TextInputFormat)而言,FileInputFormat有很多的子类,各自的Reader各有不同:
我们继续看其父类的构造器:
public abstract class TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskAttemptContext implements Progressable { private RecordWriter<KEYOUT, VALUEOUT> output; private StatusReporter reporter; private OutputCommitter committer; public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid, RecordWriter<KEYOUT, VALUEOUT> output, OutputCommitter committer, StatusReporter reporter) { super(conf, taskid); this.output = output; this.reporter = reporter; this.committer = committer; }
这时候还没到最根本的地方,但是有了新的发现,我们实际继承的是一个TaskInputOutputFormat,而其再度继承了个TaskAttemptContext。
说到TaskAttemptContext,我们可以想一下TaskAttempt,对于MapReduce提交的每个任务,为了避免一次性失败,所以会有多次的重试,而每次的尝试,就叫做TaskAttempt,每一个TaskAttempt都有自己的id。
public class TaskAttemptContext extends JobContext implements Progressable { private final TaskAttemptID taskId; private String status = ""; public TaskAttemptContext(Configuration conf, TaskAttemptID taskId) { super(conf, taskId.getJobID()); this.taskId = taskId; }
感觉还要在向上走,看看JobContext:
public JobContext(Configuration conf, JobID jobId) { this.conf = new org.apache.hadoop.mapred.JobConf(conf); this.credentials = this.conf.getCredentials(); this.jobId = jobId; try { this.ugi = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new RuntimeException(e); } }
追根溯源,到了这里,发现的确没什么东西可追溯了,Mapper中的Context实际上就是来自于JobContext的实现,现在,我们动态地追踪下,这个JobContext到底是如何诞生的:
此处,不从WordCount开始分析了,因为整个过程比较复杂,我直接从每个Mapper启动的过程来分析,其对应的代码在MapTask中:
@SuppressWarnings("unchecked") private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewMapper( final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException
我们可以看到其中的参数,job不说了,读取的Job的相关配置;splitIndex,我们通过这个index找到分配给对应Task的数据块的元数据信息;umbilical则是taskAttempt与Task交流的RPC接口,汇报自身状态;reporter则是相应的报告者:
// make a mapper org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>) ReflectionUtils .newInstance(taskContext.getMapperClass(), job); // make the input format org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE>) ReflectionUtils .newInstance(taskContext.getInputFormatClass(), job);
这两句代码很明显,我们从提交的TaskContext中,使用反射的方式,获取了传入的Mapper与InputFormat的主体逻辑:
这里,我们分析的是reduceNumTask!=0的情况:
// get an output object if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); }
对应于Mapper的输出,创建了一个OutputCollector,顾名思义,很好理解:
mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(), input, output, committer, reporter, split);
而这个,就是我们新建的一个mapperContext,其实质就是我们mapper中使用到的Context,这里,主要是committer看似有点陌生,我们看看来源:
if (useNewApi) { if (LOG.isDebugEnabled()) { LOG.debug("using new api for output committer"); } outputFormat = ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job); committer = outputFormat.getOutputCommitter(taskContext); } else { committer = conf.getOutputCommitter(); }
可以看出来,也是使用反射方式,从TaskContext中获取到的outputFomat,构造出来的committer:
public synchronized OutputCommitter getOutputCommitter( TaskAttemptContext context) throws IOException { if (committer == null) { Path output = getOutputPath(context); committer = new FileOutputCommitter(output, context); } return committer; }
构造代码如上,不予赘述了:
到了这里,我们思考下mapperContext的write代码:
/** * Generate an output key/value pair. */ public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { output.write(key, value); }
这个output是何方神圣:其实就是上面封装在mapperContext中的NewOutputCollector,换言之,我们Mapper逻辑的运行,其实完全是通过反射的方式,从提交的JobConf文件中,加载到对应的类,在TaskTracker端新建出来,并且完成我们原先的逻辑的,所以,在Java中,反射实在是非常强大的功能,不仅在Spring中应用的炉火纯青,在Hadoop中也是随处可见;我们可以看到,不仅是在Hadoop内部的RPC中用到,在Mapper和Reducer真正运行的时候,都是离不开的:
接下来,我们认真看下这个NewOutputCollector:
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException { collector = new MapOutputBuffer<K, V>(umbilical, job, reporter); partitions = jobContext.getNumReduceTasks(); if (partitions > 0) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K, V>) ReflectionUtils .newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner<K, V>() { @Override public int getPartition(K key, V value, int numPartitions) { return -1; } }; } }
我们可以看到,对于每一个Task来说,内部使用的collector都是自己内部所有的,所以不会存在冲突;同时,在这里也会根据NumTask来确定分区数量。
@Override public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions)); }
这里,就是重点了,我们要看看这个collector究竟是如何完成收集的,而收集得到的数据,又是存在什么地方的,在这之前,看看其中的参数:
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
从表面来看,collector在收集数据的时候,会根据key来指定该数据会收集到哪个分区之内:
collector = new MapOutputBuffer<K, V>(umbilical, job, reporter);
其中牵涉到一个环形缓冲区,代码分析比较复杂,在此不讨论了,将来另外开文再讨论;下文将挑出其中的一些重点来说:
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
所以,为什么说我们默认情况下是快速排序了,很清楚,一般来说,默认是快速排序,我们也可以指定堆排序来进行排序:
在collect数据,输出环形缓冲区的时候,并未进行任何排序的操作,而实际上,排序的操作是在spill的时候进行排序的,这一点,我们可以追溯到SpillThread中的sortAndSpill中的代码:
final Path filename = mapOutputFile.getSpillFileForWrite( numSpills, size); out = rfs.create(filename);
很明显,这里的刷盘,写入的并不是本地磁盘,而是写入到HDFS中,当然,Reducer获取数据的时候,也是从HDFS中下载数据的,所以,HDFS作为大数据系统的基础,是重中之重:
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
我们可以看到,利用前面生成的快速排序类,对缓冲区的数据进行排序:这里就不多说了:
总之,最后写到本地磁盘中的Map数据,是按照partition分区好,而且在各自分区之内都是有序的:最终,这些有序数据存入到了本地文件系统中:
接下来,我们看看reducer端的实现:
reduceCopier = new ReduceCopier(umbilical, job, reporter);
具体代码在ReduceTask内,在此挑选一些重要的内容进行分析:
private class GetMapEventsThread extends Thread {
该类是reduceTask内负责从Map端进行数据收集的类,我们仔细看看其run方法,并追溯到getMapCompetionEvents,其含义在于收集那些进行完毕的Map的output结果:
case SUCCEEDED: { URI u = URI.create(event.getTaskTrackerHttp()); String host = u.getHost(); TaskAttemptID taskId = event.getTaskAttemptId(); URL mapOutputLocation = new URL( event.getTaskTrackerHttp() + "/mapOutput?job=" + taskId.getJobID() + "&map=" + taskId + "&reduce=" + getPartition()); List<MapOutputLocation> loc = mapLocations.get(host); if (loc == null) { loc = Collections .synchronizedList(new LinkedList<MapOutputLocation>()); mapLocations.put(host, loc); } loc.add(new MapOutputLocation(taskId, host, mapOutputLocation)); numNewMaps++; }
看得出来,重点还是在于对mapLocations的处理,其key是对应dataNode的host,而loc则是需要收集的一系列的数据的MapOutputLocation:
/** * Copies a a map output from a remote host, via HTTP. * * @param currentLocation * the map output location to be copied * @return the path (fully qualified) of the copied file * @throws IOException * if there is an error copying the file * @throws InterruptedException * if the copier should give up */ private long copyOutput(MapOutputLocation loc)
注释很清晰明了,我们认真看看该方法究竟是如何实现数据读取的,先看看其参数:MapOutputLocation:
public MapOutputLocation(TaskAttemptID taskAttemptId, String ttHost, URL taskOutput) { this.taskAttemptId = taskAttemptId; this.taskId = this.taskAttemptId.getTaskID(); this.ttHost = ttHost; this.taskOutput = taskOutput; }
该类追溯了每个Map的output,封装在内,交付给reduce端进行处理:
MapOutput mapOutput = getMapOutput(loc, tmpMapOutput, reduceId.getTaskID().getId());
我们发现,其实真正拷贝数据的是getMapOutput的方法:
/** * Get the map output into a local file (either in the inmemory fs or on the * local fs) from the remote server. We use the file system so that we generate * checksum files on the data. * * @param mapOutputLoc * map-output to be fetched * @param filename * the filename to write the data into * @param connectionTimeout * number of milliseconds for connection timeout * @param readTimeout * number of milliseconds for read timeout * @return the path of the file that got created * @throws IOException * when something goes wrong */ private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, Path filename, int reduce)
这样看起来,在TaskTracker所在的节点,应该是建立了一个可以处理http请求的服务,其接收来自于reduceTask的请求,并且发送数据,这就是map端和reduce端交互的重点:
这是很重要的,我们必须清楚,MapTask与ReduceTask之间的交互,才能更加清楚我们的代码逻辑为什么是这么实现的:
URL url = mapOutputLoc.getOutputLocation(); URLConnection connection = url.openConnection(); InputStream input = setupSecureConnection(mapOutputLoc, connection);
重点在这里,我们读取数据的时候,与TaskTracker建立了一个连接,通过HTTP获取数据流的方式,读取TaskTracker端本地的数据;毫无疑问,我们需要看看TaskTracker的代码,在哪里建立了一个httpServer:
String infoAddr = NetUtils.getServerAddress(conf, "tasktracker.http.bindAddress", "tasktracker.http.port", "mapred.task.tracker.http.address"); InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr); String httpBindAddress = infoSocAddr.getHostName(); int httpPort = infoSocAddr.getPort(); this.server = new HttpServer("task", httpBindAddress, httpPort, httpPort == 0, conf, aclsManager.getAdminsAcl()); workerThreads = conf.getInt("tasktracker.http.threads", 40); server.setThreads(1, workerThreads);
我们在TaskTracker初始化的时候,可以看到这段代码,发现内部构建了一个httpServer,实际上,这就是处理reduceTask拉取数据的server:
server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
httpServer内置的servlet处理器,只有一个,即MapOutputServlet类,我们看下其代码:
/** * This class is used in TaskTracker's Jetty to serve the map outputs to other * nodes. */ public static class MapOutputServlet extends HttpServlet
注释清晰明了:
String mapId = request.getParameter("map"); String reduceId = request.getParameter("reduce"); String jobId = request.getParameter("job");
reduce端传入的参数:
String intermediateOutputDir = TaskTracker.getIntermediateOutputDir(userName, jobId, mapId); String indexKey = intermediateOutputDir + "/file.out.index"; Path indexFileName = fileIndexCache.get(indexKey); if (indexFileName == null) { indexFileName = lDirAlloc.getLocalPathToRead(indexKey, conf); fileIndexCache.put(indexKey, indexFileName); }
String fileKey = intermediateOutputDir + "/file.out"; Path mapOutputFileName = fileCache.get(fileKey);
mapOutputIn = SecureIOUtils.openForRead(new File(mapOutputFileName.toUri().getPath()), runAsUserName);
建立索引文件,并且打开文件,形成outputStream输出流:
shuffleMetrics.outputBytes(len); outStream.write(buffer, 0, len); outStream.flush();
写入outStream中:
outStream = response.getOutputStream();
而实际上,outStream是response的文件流,就这样,我们把本地文件写入到reduceTask的请求响应中:
下面做一个总结:
- 实际上,我们编写的任务提交之初,相关的配置信息,都会序列化为JobConf的文件,上传到HDFS中;而具体的那些文件,jar,xml等,也会上传到文件系统中。
- 我们编写的代码,具体使用的时候,会从配置中获取name,以反射的方式获取到class来得到执行
- map中的context,实际上是一个MapOutBuffer,是一个环形缓冲区,将文本写入到内存,并且在必要的时候进行刷盘操作
- taskTracker内部有一个httpServer,其基于jetty来完成对reduce端拉取数据的操作。