MapReduce: Simplified Data Processing on Large Clusters 翻译加理解

前言

这是真正的核心知识,我要将这篇论文阅读并翻译一遍。
原文可以从MapReduce: Simplified Data Processing on Large Clusters下载

MapReduce: 大型集群上的简化数据处理

作者:Jeffrey Dean,Sanjay Ghemawat;Google, Inc.
译者:LittleFall

摘要

MapReduce是一个编程模型和用于处理生成大型数据集的相关实现。用户定义一个map函数去处理一个键值对去生成一组中间键值对,一个reduce函数去合并所有具有相同中间键的中间值。如下文所示,许多真实世界的任务都可以通过这个模型来表达。

遵循这个模型的程序可以自然并行地执行在一个大型商业计算机集群上。运行时由系统关注于以下问题的细节:分割输入数据、将程序在一组计算机上调度执行、处理机器故障、管理必要的机器间内部通信。这使得没有任何并行与分布式系统编程经验的程序员可以轻松利用大型分布式系统的资源。

1. 引言

在过去的五年里,笔者和许多谷歌同僚已经实现了数以百计的独立计算程序,用于处理大量的原始数据,例如爬虫文件、网络请求日志,等等,用于计算各种各样的衍生数据,如反转索引、网页文档的各种表示图结构、每个宿主爬取大量页面的总结、一天中最频繁的一组查询,等等。大多数这样的计算程序在概念上是简洁明了的。然而,输入数据通常非常大,而且为了在合理时间内执行完毕计算程序必须分布运行在成百上千个计算机上。这些有关于如何并行计算、分布数据、解决故障的事务需要大量复杂的代码去处理,这使得本来朴素的计算任务变得极为冗杂。

作为对这种复杂性的应对,我们设计了一个新的抽象模型,它允许我们直接去做纯粹的计算,而将繁杂的细节如并行,容错,数据分布,负载均衡等封装在库中。我们的抽象受到了来自Lisp和其它函数式语言中mapreduce原语(primitives present)的启发。我们发现我们大多数的计算程序都涉及到以下特性:对于输入中的每个逻辑记录提供一个map操作,目的是计算得到一组中间键值对;然后对于所有具有相同键的值实现一个reduce操作,目的是适当地联合衍生数据。我们对于函数模型的使用以及用户自定义map和reduce操作允许我们可以轻易并行化大量的计算,并将重新执行作为容错的主要机制。

这项工作的主要贡献是一个简洁有力的接口,它实现了自动并行化与大规模数据的分布。这个接口与它的实现一起在大规模商业计算机集群上获得了优异表现。

第二部分描述了基础编程模型并且给出了一些例子。第三部分描述了一个定制在基于集群的计算环境下的MapReduce接口实现。第四部分描述了一些有用的编程模型细节。第五部分对其在一个任务的实现进行了表现评估。第六部分探索了MapReduce的使用,包括谷歌内部使用它重写产品索引系统的经验。第七部分讨论了相关工作与未来展望。

2. 编程模型

计算过程接受一系列输入键值对,产生一系列输出键值对。MapReduce库的用户用两个函数来表示计算:MapReduce

Map函数由用户编写,接收一个输入键值对,产生一组中间键值对。MapReduce库组合所有的具有相同中间键 I I 的中间值并且将它们传给Reduce函数。

Reduce函数也由用户编写,接收一个中间键 I I 和对应的一组值。它将其合并成一组可能规模更小的值。通常每个Reduce函数只会输出零个或一个值。中间值通过一个迭代器提供给用户的reduce函数。这允许我们操作过大以至于不能放在内存中的值列表。

2.1 例子

考虑统计大型文章集合中每个单词出现次数的问题,用户写出的代码将类似于如下伪代码:

map(String key, String value):
	// key:  document name
	// value: document contents
	for each word w in value:
		EmitIntermediate(w, "1");

reduce(String key, Iterator values):
	// key: a word
	// values: a list of counts
	int result = 0;
	for each v in values:
		result += ParseInt(v);
	Emit(AsString(result));

map函数生成每个单词加上一个关联的出现计数(例子中为1)。reduce函数对同一个单词所有生成的计数求和。

另外,用户编写代码填充入一个mapreduce规格对象,以及输入输出文件名和可选的调整参数。随后用户调用MapReduce函数,并将规格对象传递给它。用户的代码和MapReduce库(C++实现)一同链接。 A 附录A 包含了这个例子的完整程序。

2.2 类型

虽然之前的伪代码用字符串输入输出类型编写,实际上由用户提供的map和reduce函数已经将类型联系起来:

    map         (k1, v1) -> list(k2, v2)
    reduce      (k2, list(v2)) -> list(v2)

也即,输入键值与输出键值的域不同。此外,中间键值与输出键值的域相同。

我们的C++实现在这里传递字符串,而由用户定义的函数中,把它留给用户代码在字符串与合适的类型之间转变。

2.3 更多的例子

这里有一些有趣程序的简单例子,它们可以轻松使用MapReduce计算。

分布式的Grep:map函数返回所有匹配成功的行。reduce函数是一个同一性函数,仅仅将被生成的中间数据拷贝到输出中。

URL访问频率计数:map函数处理网络页面请求日志并且生成 < U R L , 1 > <URL, 1> 。reduce函数对所有来自相同URL的值求和并且生成 < U R L , > <URL, 总计数>

反向网页链接图:map函数对于每个 s o u r c e source 页面中指向 t a r g e t target 的链接生成 < t a r g e t , s o u r c e > <target,source> 。reduce函数链接给定target URL的整个source URL列表,生成 < t a r g e t , l i s t ( s o u r c e ) > <target, list(source)>

每个主机的词条向量(Term-Vector):一个词条向量总结一个文件或者一组文件中最重要的词,表示为一列 < w o r d , f r e q u e n c y > <word, frequency> 键值对。map函数对于每个输入文档生成一个 < h o s t n a m e , t e r m v e c t o r > <hostname, term-vector> 键值对。reduce函数接收单个主机所有文档的词条向量。它将这些词条向量组合在一起,扔掉不频繁的词汇,然后生成最终的 < h o s t n a m e , t e r m v e c t o r > <hostname, term-vector> 键值对。

反向索引:map函数分析每个文档,生成一系列 < w o r d , d o c u m e n t I D > <word, documentID> 键值对。reduce函数接收一个单词所有这样的键值对,排序对应的文档ID并生成一个 < w o r d , l i s t ( d o c u m e n t I D ) > <word, list(documentID)> 键值对。所有输出键值对的集合组成一个简单的反向索引。可以轻易改进这个计算去记录单词位置。
【译注:反向索引 I n v e r t e d I n d e x InvertedIndex 是一个搜索引擎术语。正向索引是指统计出每个文档中有哪些单词,而反向索引是指统计出每个单词出现在哪些文档中。】

分布式排序:map函数从每个record中提取关键字key,生成一个 < k e y , r e c o r d > <key,record> 键值对。reduce函数不加改变地输出所有键值对。这个计算依赖于 4.1 4.1 节描述的划分工具和 4.2 4.2 节描述的顺序特性。

3 实现

MapReduce可以有很多不同的实现接口,应当由具体环境来决定正确的实现。比如说,一种实现可能适用于一个小型共享内存机,另一种适用于一个大型NUMA多处理器系统,另一种可能适用于大型网络计算机集群。

这部分描述的实现针对于谷歌内部广泛使用的计算环境:由以太网[4]连接的大型商业计算机集群。在我们的环境中:

  1. 计算机通常是多核x86处理器,运行在Linux系统上,每台机器有2-4GB内存。
  2. 使用商业网络硬件:在机器水平上通常为100Mb/s或1Gb/s,但是在完全双向带宽上被认为平均值较慢。
  3. 一个集群包含成百上千的计算机,因此机器故障是普遍的。
  4. 存储由与独立计算机直接相连的廉价IDE硬盘提供。一个内部开发的分布式文件系统[8]被用来在硬盘上管理数据存储。文件系统使用重复存储来在不可靠的硬件上保证可利用性和可靠性。
  5. 用户把作业提交给调度系统。每个作业包含一系列任务,由调度器分配到集群内一组可用的计算机上。.

3.1 运行总览

通过自动划分输入数据为M段(split)来保证Map函数可以多机分布式调用。输入段可以被多机并行处理。通过使用划分函数(如哈希后模R)划分中间键空间为R片(piece),来保证Reduce函数可以分布式调用。片数R和划分函数由用户指定。

图1展示了一个MapReduce操作在我们的实现上的整体运行流程。当用户程序调用MapReduce函数时,紧随其后将发生一系列操作(图1中的标号对应于下方列表的序号)。
在这里插入图片描述

  1. 与用户程序一起执行的MapReduce库首先将输入文件分成M段,通常每段16MB到64MB(用户可以设置参数控制)。然后在集群电脑上启动程序的很多副本。
  2. 有一个副本程序是独特的,称为主机。剩下的是由主机分配任务的工作机。有M个map任务和R个reduce任务需要被分配。主机会选择每个空闲工作机并分别分配一个map或reduce任务。
  3. 一个被分配map任务的工作机读取对应输入段的内容。它分析出输入数据的键值对,将其传递给用户定义的Map函数。由后者生成中间键值对,并缓冲在内存中。
  4. 每隔一段时间缓冲键值对会被写入本地磁盘,被划分函数划分成R个区域。这些缓冲键值对在磁盘上的位置被传回给主机,主机负责将这些位置发送reduce工作机。
  5. 当一个reduce工作机收到主机传来的数据位置时,它使用远程过程调用(RPC)从map工作机本地的磁盘中读取缓冲数据。当一个reduce工作机读取完成所有中间数据后,它会根据中间键对它们进行排序,所有具有相同键的记录会被组合在一起。如果中间数据的总量太大而不能放入内存中,会使用外排序。
  6. reduce工作机在排好序的中间数据上重复工作,遍历每个遇到的独立的中间键。它把键和对应的中间值集合传递给用户的Reduce函数。Reduce函数的输出被添加到最终的输出文件中这个reduce的部分。
  7. 当所有的map任务和reduce任务完成后,主机唤醒用户程序。此时,用户的MapReduce调用返回到用户代码中。

成功结束后,mapreduce的执行输出会放入R个输出文件中(每个reduce任务一个,文件名由用户指定)。通常来说,用户不用把R个输出文件联合成一个——他们通常用这些文件作为另一个MapReduce调用的输入,或者把它们用于另一个可以处理多文件输入的分布式应用程序。

3.2 主机数据结构

主机维护一些数据结构。对于map任务和reduce任务,它会存储状态(空闲,执行中,完成),以及非空闲任务工作机的身份编号。

主机是由map任务向reduce任务传递中间文件存放位置的通道。因此,对每个完成的map任务,主机储存R个中间文件区域的位置和大小。当map任务完成时,主机就会收
到位置与大小信息的更新。信息会被逐个发送给有运行中reduce任务的工作机。

3.3 容错

因为MapReduce库被设计用来使用成百上千的计算机处理非常大总量的数据,库必须能够优雅地容忍计算机错误。

工作机失效

主机周期性地ping向每个工作机。如果没有在一定时间内从一个工作机收到回应,主机会把这个标记为失效。每个由工作机执行完毕的map任务都会被设置回未执行状态,因此可以被调度到其它工作机执行。类似的,每个由失效工作机正在执行的map任务或reduce任务同样会被设置为未执行并可以被重新调度。

由失效工作机完成的map任务会被重新执行,因为它们的输出存储在失效工作机本地硬盘上且因此不能被访问。完成的reduce任务不需要重新执行因为他们的输出存储在全局文件系统上。

当一个map任务首次被工作机A执行并在之后被工作机B执行时(因为A失效),所有执行reduce任务的工作机会被提醒。每个执行reduce任务的工作机如果没有从工作机A完全读取数据,会从工作机B读取数据。

MapReduce可以从大规模工作机失效中复原。比如说在一个MapReduce操作中,一个正在运行的集群网络中有80台机器在几分钟内都失效了。MapReduce主机可以直接重新执行由失效计算机所执行的工作,继续推进进度,最终完成整个MapReduce操作。

主机失效

可以使主机周期性存储如上方所述的主机数据结构的检查点。当主机宕机时,一个新的副本可以从上一个检查点的状态开始执行。然而,假定只有一个主机,那么它不太可能失效。因此我们的当前实现在主机失效时会中断MapReduce操作。客户在有需要时可以重设这个条件并重启MapReduce操作。

存在故障时的语义

当用户提供的map和reduce操作对于他们的输入是确定性的操作时,我们的分布式实现产生的输出将与无错地顺序执行整个程序时相同。

我们依赖自动提交map和reduce任务的输出来完成这个特性。每个运行中的任务将它地输出写入私有的临时文件中。一个reduce任务产生一个这样的文件,一个map任务产生R个这样的文件(每个对应一个reduce任务)。当一个map任务完成后,工作机向主机发送一个消息,消息中包括有这R个临时文件的名字。如果主机收到一个已经完成的map任务发送的完成消息,它会将其忽略。否则,它在主机数据结构中记录这R个文件的名字。

当一个reduce任务完成后,reduce工作机自动重命名它的临时输出文件到最终输出文件中。如果一个相同的reduce任务在多个计算机上执行,用一个最终输出文件会被多次重命名。我们依赖底层文件系统提供的自动重命名操作,去保证最终文件系统中对于每个reduce任务产生的数据恰好包含一份。

我们绝大多数的mapreduce操作是确定性的,在这种情况下我们的语义与顺序执行时相同,这就使得程序员很容易去解释程序的表现。当mapreduce操作是非确定性的时,我们提供更弱但依然可解释的语义。在这种非确定性的操作中,一个独特reduce任务 R 1 R_1 的输出与 R 1 R_1 的顺序执行非确定性操作相同。然而,一个不同的reduce任务 R 2 R_2 也许与另一个由非确定性程序顺序执行的输出相同。

考虑map任务 M M 和reduce任务 R 1 R_1 R 2 R_2 。设 e ( R i ) e(R_i) 表示 R i R_i 的执行(恰好只有一个这样的执行)。更弱的语义产生自 e ( R 1 ) e(R_1) 可能读取到 M M 的一个执行所产生的输出,而 e ( R 2 ) e(R_2) 可能读取到由不同的 M M 执行所产生的输出。

3.4 原地性

网络带宽在我们的计算环境中是相对缺乏的资源。由GFS[8] 管理的输入数据存储在集群计算机的本地磁盘上,我们利用这个优势来省下网络带宽。GFS把文件分成64MB的块,在不同的计算机上存储每个块的副本(通常会存储3个副本)。MapReduce主机提取输入文件的位置信息,然后尝试在包含输入数据副本的计算机上调度对应的map任务。当失败之后,会尝试在最近的包含输入数据副本的计算机上调度这个map任务(比如切换到同一网络中包含数据的工作机)。当在一个集群中大部分工作机上运行大型MapReduce操作时,大部分输入数据被会原地读取,不用花费网络带宽。

3.5 任务粒度

我们把map阶段和reduce阶段分别细分成如上所述的M部分和R部分。理想情况下,MR应该比工作机的数量大得多。让每个工作机执行许多不同的任务可以提升动态负载均衡,也可以加速工作机失效后的恢复工作:很多map任务已经被通过其它工作机延展。

在我们的实现中对于MR的大小有实际性的界限,因为如上所述,主机必须进行 O ( M + R ) O(M+R) 次调度并且在内存中保持 O ( M R ) O(M*R) 个状态。(然而内存占用常数因子很小: O ( M R ) O(M*R) 个状态每个只需要1byte数据,即每个map/reduce任务对)。

此外,R经常被用户约束因为每个reduce任务输出在一个独立输出文件中。在实践中,我们倾向于选择M使得每个独立任务大约有16MB到64MB的输入数据(这样上面描述的原地性优化效果最好),我们让R为我们期望使用工作机数量的一个小倍数。我们通常设定MapReduce计算的 M = 200000 M=200000 R = 5000 R=5000 ,使用 2000 2000 台工作机。

3.6 备份任务

一个普遍性的加长MapReduce总时间消耗的情况是“流浪者”:在计算中一个计算机花费异常长的时间去完成最后几个map或reduce工作。流浪者可以由一系列原因导致。比如说,一个硬盘质量很差的计算机可能经历频繁的可修正错误,将它的读取表现从30MB/s减缓到1MB/s。集群调度系统可能已经把其它的任务调度到这台机器上,导致它由于CPU,内存,本地硬盘,或者网络带宽的竞争使得MapReduce代码执行更慢。我们最近遇到的一个计算机初始化代码上的bug导致处理器缓存被关闭:在被影响的计算机上的运算变慢了一百倍。

我们有一个通用机制去减轻流浪者的问题。当MapReduce操作接近完成时,主机调度执行仍在正在运行中的任务的备份去执行。无论是基础任务完成还是副本任务完成都会标记任务完成。我们已经调整了这个机制,现在它通常只会增加百分之几的计算资源。我们已经发现它显著减少了完成大型地MapReduce操作的时间。比如说, 5.3 5.3 节描述的排序程序在备份任务机制关闭时需要花额外44%的时间。

4. 细化

即使基础由简单写作的MapReduce函数提供的功能可以适用于大部分的需求,我们已经发现一部分有用的扩展。将在这部分中描述这些扩展。

4.1 划分函数

MapReduce的用户指定他们需要的reduce任务数/输出文件数(即R)。这些任务的数据通过一个中间键上的划分函数进行划分。已经提供的一个默认划分函数使用哈希(如 h a s h ( k e y ) m o d    R hash(key)\mod R )。这趋向于结果公平匀称的划分。然而在某些情况下,使用一些其它的键划分函数更有用。比如,有时输出键是URL,我们希望每个主机的所有入口最后放入一个输出文件中。为了支持像这样的情况,MapReduce库的用户可以提供一个特殊的划分函数。比如说,使用 h a s h ( H o s t n a m e ( u r l k e y ) ) m o d    R hash(Hostname(urlkey))\mod R 作为划分函数可以使一个主机所有的URL放在同一个输出文件中。

4.2 顺序保证

我们保证在一个给定的划分内,中间键值对会按键的升序进行处理。顺序的保证使得每个划分可以很轻松地生成一个有序的输出文件,这在输出文件格式需要用键进行高效随机存取查找,或者输出文件用户发现拥有有序的数据是一件很方便的事情时非常有用。

4.3 联合函数

在一些情况下,由每个map任务产生的中间键中有很多重复,而用户指定的Reduce函数是可交换的且相关的。一个形象的例子是 2.1 2.1 节中的单词统计。因为单词频率趋向于服从Zipf分布,每个map任务会产生成百上千拥有类似于 < t h e , 1 > <the,1> 形式的记录。所有这些统计会被通过网络送到一个reduce任务上,然后被Reduce函数将它们联合起来计算出一个数字。我们允许用户指定一个可选的联合函数在数据被送到网络之前执行部分的合并工作。
【译注:Zipf分布是指单词出现频率(F)与出现频率排名®成反比,即 F R = C C F*R=C,C为常数 ,类似于二八法则。】

联合函数运行在每个执行map任务的计算机上。通常与reduce函数的代码相同。它们仅有的不同是MapReduce库如何抓取函数的输出。reduce函数的输出写入进最终的输出文件中。联合函数的输出写入将被送往reduce任务的中间文件中。

部分联合可以有效地加速某些MapReduce操作。 A 附录A 包含了使用联合的例子。

4.4 输入和输出类型

MapReduce库提供支持去读取一些不同格式的输入数据。比如说,“文本”模式的输入将每行处理为一个键值对:键是文件的偏移,值是行的内容。另一种常见的支持格式存储一系列按键排序的键值对。每种输入类型的实现都知道该如何把它们分割到可被当作独立map任务处理的有意义范围(比如,文本模式的的范围分割确保范围分割仅在每行出现一次)。用户可以通过实现一个简单的reader接口对新的输入类型添加支持,即使大部分用户只会使用少量预定义的输入类型中的一个。

一个reader不用非要从文件中读入数据。比如说,可以很轻松地定义一个reader从数据库或映射在内存中的数据结构中读入记录。

以类似地方式,我们支持一系列产生于不同方式中的输出数据格式,可以很轻松地使得用户代码对新的输出类型添加支持。

4.5 副作用

在某些例子中,MapReduce的用户已经发现可以从他们的map或reduce操作中很方便地去产生辅助文件作为附加输出。我们依赖于应用程序写入使得这样的副作用自动且幂等。通常情况下应用程序写入一个临时文件,并且当它被完全生成后就会自动地重命名。

我们不对由一个任务产生的多输出文件的自动两阶段提交提供支持。因此,产生具有跨文件一致性需求的多输出文件的任务应当是确定性的。这个限制在实践中从来不是一个问题。

4.6 跳过坏记录

有时候在用户代码中会有一些bug导致Map或Reduce函数在某些确定的记录上会崩溃。这些bug阻止一个MapReduce操作完成。通常的行动方案是去修复bug,但是有些时候

猜你喜欢

转载自blog.csdn.net/m0_37809890/article/details/87830686