【Hadoop】25-MapReduce的输入格式

1.输入分片与记录

一个输入分片(split)就是一个由单个map操作来处理的输人块。每一个map操作只处理一个输人分片。每个分片被划分为若干个记录,每条记录就是一个键·值对,map一个接一个地处理记录。输人分片和记录都是逻辑溉念,不必将它们对应到文件,尽管其常见形式都是文件。在数据库的场景中,一个输人分片可以对应于一个表上的若干行,而一条记录对应到一行(如同DBInputFormat,这种输人格式用于从关系型数据库读取数据)。
输入分片在Java中表示为InputSplit接口(和本章提到的所有类一样,它也在org.apache.hadoop.mapreduce包中)。

public abstract class InputSplit {
    public abstract long getLength() throws IOException, InterruptedException;
    public abstract String[] getLocations() throws IOException, InterruptedException;
    @Evolving
    public SplitLocationInfo[] getLocationInfo() throws IOException {
        return null;
    }
}

lnputSplit包含一个以字节为单位的长度和一组存储位置(即一组主机名)。注意,分片并不包含数据本身,而是指向数据的引用(reference)。存储位置供MapReduce系统使用以便将map任务尽量放在分片数据附近,而分片大小用来排序分片,以便优先处理最大的分片,从而最小化作业运行时间(这也是贪婪近似算法的一个实例)。
MapReduce应用开发人员不必直接处理lnputSplit,因为它是由InputFormat创建的(1nputFormat负责创建输人分片并将它们分割成记录)。在我们探讨InputFormat的具体例子之前,先简单看一下它在MapReduce中的用法。接口如下:

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
    public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
    public RecordReader() {
    }

    public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;

    public abstract boolean nextKeyValue() throws IOException, InterruptedException;

    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;

    public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;

    public abstract float getProgress() throws IOException, InterruptedException;

    public abstract void close() throws IOException;
}

运行作业的客户端通过调用getSplits()计算分片,然后将它们发送到application master,application master使用其存储位置信息来调度map任务从而在集群上处理这些分片数据。map任务把输人分片传给InputFormat的createRecordReader()方法来获得这个分片RecordReader。RecordReader就像是记录上的迭代器,map任务用一个RecordReader来生成记录的键·值对,然后再传递给map函数。查看Mapper的run()方法可以看到这些情况:

public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKeyValue()) {
                this.map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            this.cleanup(context);
        }

    }

运行setup()之后,再重复调用context上的nextKeyvalue()(委托给RecordRader的同名方法)为mapper产生键·值对象。通过Context,键/值从RecordReader中被检索出并传递给map()方法。当reader读到stream的结尾时,nextKeyValue()方法返回false,map任务运行其cleanup()方法,然后结束。
 

尽管这段代码没有显示,由于效率的原因,RecordReader程序每次调用gerCurrentKey()和getCurrentValue()时将返回相同的键.值对象。只是这些对象的内容被reader的nextKeyValue()方法改变。用户对此可能有些惊讶,他们可能希望键/值是不可变的且不会坡重用。在map()方法之外有对键/值的引用时,这可能引起问题,因为它的值会在没有警告的情况下被改变。如果确实需要这样的引用,那么请保存你想保留的对象的一个副本,例如,对于Text对象,可以使用他的复制构造函数:new Text(value)。
这样的情况在reducer中也会发生。reducer迭代器中的值对象被反复使用,所以,在调用迭代器之间,一定要复制任何需要保留的任何对象惨见范例9·1。

最后,注意Mapper的run()方法是公共的,可以由用户定制。MultithreadMapRunner是另一个MapRunnable接口的实现,它可以使用可配置个数的线程来并发运行多个用mapreduce.mapper.multithreadedmapper.threads设置)。对于大多数数据处理任务来说,默认的执行机制没有优势。但是,对于因为需要连接外部服务器而造成单个记录处理时间比较长的mapper来说,它允许多个mapper在同一个JVM下以尽量避免竞争的方式执行。
1.1.FilelnputFormat类
FileInputFormat是所有使用文件作为其数据源的InputFormat实现的基类(参见图8.2)。它提供两个功能:一个用于指出作业的输人文件位置;一个是为输人文件生成分片的代码实现。把分片分割成记录的作业由其子类来完成。

****** 识别结果 1******

1.2.FilelnputFormat类的输入路径
作业的输人被设定为一组路径,这对限定输人提供了很强的灵活性。
FileInputFormat提供四种静态方法来设定Job的输人路径,其中,addInputPath()和addInputPaths()方法可以将一个或多个路径加人路径列表。可以分别调用这两种方法来建立路径列表。setInputPaths()方法一次设定完整的路径列表(替换前面调用中在Job上所设置的所有路径)。
一条路径可以表示一个文件、一个目录或是一个glob,即一个文件和目录的集合。路径是目录的话,表示要包含这个目录下所有的文件,这些文件都作为作业的输人。一个被指定为输人路径的目录,其内容不会被递归处理。事实上,这些目录只包含文件:如果包含子目录,也会被解释为文件(从而产生错误)。处理这个问题的方法是:使用一个文件glob或一个过滤器根据命名模式(name pattern)限定选择目录中的文件。另一种方法是将mapreduce.input.fileinputformat.input.dir.recursive设置为true从而强制对输人目录进行递归地读取。

add方法和set方法允许指定包含的文件。如果需要排除特定文件,可以使用FiLeInputFormat的setInputPathFilter()方法设置一个过滤器。过滤器的详细讨论参见3巧巧节中对PathFilter的讨论。即使不设置过滤器,FileInputFormat也会使用一个默认的过滤器来排除隐藏文件(名称中以“.”和“_”开头的文件)。如果通过调用setInputPathFilter()设置了过滤器,它会在默认过滤器的基础上进行过滤。换句话说,自定义的过滤器只能看到非隐藏文件。
路径和过滤器也可以通过配置属性来设置(参见表8.4),这对于Streaming作业来说很方便。Streaming接口使用-input选项来设置路径,所以通常不需要直接进行手动设置。

输入路径和过滤器属性
属性名称 类型 默认值 描述
mapreduce.input.fileinputformat.inputdir 逗号分隔的路径

作业的输人文件。包含逗号的路径中的逗号由“\”符号转义。

例如,glob{a,b}变成了{a\,b}

mapreduce.input.pathFilter.class PathFilter类名 应用于作业输人文件的过滤器

1.3.FiIeInputFormat类的输入分片
假设有一组文件,FileInputFormat如何把它们转换为输人分片呢?
FileInputFormat只分割大文件。这里的“大”指的是文件超过HDFS块的大小。分片通常与HDFS块大小一样,这在大多应用中是合理的;然而,这个值也可以通过设置不同的Hadoop属性来改变,如表8巧所示。

控制分片大小的属性
属性名称 类型 默认值 描述
mapreduce.input.fileinputformat.split.minsize int 1 一个文件分片最小的有效字节数
mapreduce.input.fileinputformat.split.maxsize long Long.MAX_VALUE 一个文件分片中最大的有效字节数(以字节算)
dfs.blocksize long 128MB,即134217728 HDFS中块的大小(按字节)

最小的分片大小通常是1个字节,不过某些格式可以使分片大小有一个更低的下界。例如,顺序文件在流中每次插人一个同步人口,所以,最小的分片大小不得不足够大以确保每个分片有一个同步点,以便reader根据记录边界进行重新同步。详见5.4,1节。
应用程序可以强制设置一个最小的输人分片大小:通过设置一个比HDFS块更大一些的值,强制分片比文件块大。如果数据存储在HDFS上,那么这样做是没有好处的,因为这样做会增加对map任务来说不是本地文件的文件块数。

最大的分片大小默认是由Java的long类型表示的最大值。只有把它的值被设置成小于块大小才有效果,这将强制分片比块小。
分片大小由以下公式十算,参见FileInputFormat的computeSplitSize()方法:
max(minimumSize,min(maximumSize,blockSize))
在默认情况下:
minimumSize<blockSize<maximumSize
所以分片的大小就是blocksizeo这些参数的不同设置及其如何影响最终分片大小,请参见表8·6的详细说明。

表8.6.举例说明如何控制分片的大小
最小份片大小 最大分片大小 块的大小 分片大小 说明
1(默认值) Long.MAX_VALUE(默认值) 128MB(默认值) 128MB 默认情况下,分片大小与块大小相同
1(默认值) Long.MAX_VALUE(默认值) 256MB 256MB 增加分片大小最自然的方法是提供更大的HDFS
块,通过dfs.blocksize或在构建文件时以单个文
件为基础进行设置
256MB Long.MAX_VALUE(默认值) 128MB(默认值) 256MB 通过使最小分片大小的值大于块大小的方法来增大
分片大小,但代价是增加了本地操作
1(默认值) 64MB 128MB(默认值) 64MB

通过使最大分片大小的值大于块大小的方法来减少

分片大小

1.4.小文件与CombineFilelnputFormat
相对于大批量的小文件,Hadoop更合适处理少量的大文件。一个原因是FileInputFormat生成的分块是一个文件或该文件的一部分。如果文件很小(“小”意味着比HDFS的块要小很多),并且文件数量很多,那么每次map任务只处理很少的输人数据,(一个文件)就会有很多map任务,每次map操作都会造成额外的开销。请比较一下把1GB的文件分割成8个128MB块与分成1000个左右100KB的文件。1000个文件每个都需要使用一个map任务,作业时间比一个输人文件上用8个map任务慢几十倍甚至几百倍。

CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。FileInputFomnat为每个文件产生1个分片,而CmbineFileInputFomat把多个文件打包到一个分片中以便每个mapper可以处理更多的数据。关键是,决定哪些块放人同一个分片时,CombineFileInputFormat会考虑到节点和机架的因素,所以在典型MapReduce作业中处理输人的速度并不会下降。
当然,如果可能的话应该尽量避免许多小文件的情况,因为MapReduce处理数据的最佳速度最好与数据在集群中的传输速度相同,而处理小文件将增加运行作业而必需的寻址次数。还有,在HDFS集群中存储大量的小文件会浪费namenode的内存。一个可以减少大量小文件的方法是使用顺序文件(sequence file)将这些小文件合并成一个或多个大文件(参见范例8.4):可以将文件名作为键(如果不需要键,可以用NullWritable等常量代替),文件的内容作为值。但如果HDFS中已经有大批小文件,CombineFileInputFormat方法值得一试。
CombineFi1eInputFormat不仅可以很好地处理小文件,在处理大文件的时候也有好处。这是因为,它在每个节点生成一个分片,分片可能由多个块组成。本质上,combineFileInputFormat使map操作中处理的数据量与HDFS中文件的块大小之间的耦合度降低了。

1.5.避免切分
有些应用程序可能不希望文件被切分,而是用一个mapper完整处理每一个输人文件。例如,检查一个文件中所有记录是否有序,一个简单的方法是顺序扫描每一条记录并且比较后一条记录是否比前一条要小。如果将它实现为一个map任务,那么只有一个map操作整个文件时,这个算法才可行(SortValidator.RecordStatsChecker中的mapper就是这样实现的)。
有两种方法可以保证输人文件不被切分。第一种(最简单但不怎么漂亮)方法就是增加最小分片大小,将它设置成大于要处理的最大文件大小。把它设置为最大值Long.MAX_VALUE即可。第二种方法就是使用FileInputFormat具体子类,并且重写isSplitable()方法。把返回值设置为false。例如,以下就是一个不可分割的TextInputFormat:

public class NonSplittableTextInputFormat extends TextInputFormat{
      @override
       protected boolean isSplitable(JobContext context,Path file){
                return false;
       }
}

****** 识别结果 1******

1.6.mapper中的文件信息
处理文件输人分片的mapper可以从作业配置对象的某些特定属性中读取输人分片的有关信息,这可以通过调用在Mapper的Context对象上的getInputSplit()方法来实现。当输人的格式源自于FileInputFormat时,该方法返回的InputSplit可以被强制转换为一个FileSplit,以此来访问表8·7列出的文件在旧版本的MapReduceAPI和Streaming接口中,同一个文件分片的信息可通过从mapper配置的可读属性获取。(在旧版本的MapReduceAPI中,可以通过在Mappe类中写configure()方法访问JobConf对象来实现。)
除了表8.7中的属性,所有mapper和reducer都可以访问7,4,1节中列出的属性。

表8.7.文件输入分片的属性
FileSplit方法 属性名称 类型 说明
getPath() mapreduce.map.input.file Path/String 正在处理的输人文件的路径
getStart() mapreducenp.input.start long 分片开始处的字节偏移量
getLength() mapreducenp.input.length long 分片的长度(按字节)

1.7.把整个文件作为一条记录处理
有时,mapper需要访问一个文件中的全部内容。即使不分割文件,仍然需要一个RecordReader来读取文件内容作为record的值。范例的WhoIeFiIeInputFormat展示了实现的方法。
 

public class WholeFileInputFormat extends FileInputFormat<NullWritable,ByteWritable>{
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }
        @Override
        public RecordReader<NullWritable, ByteWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            WholeFileRecordReader reader = new WholeFileRecordReader();
            reader.initalize(inputSplit,taskAttemptContext);
            return reader;
        }
    }

WholeFileInputFormat中没有使用键,此处表示为NullWritable,值是文件内容,表示成BytesWritab1e实例。它定义了两个方法:一个是将isSplitable()方法重写返回false值,以此来指定输人文件不被分片;另一个是实现了createRecordReader()方法,以此来返回一个定制的RecordReader实现,如范例8.3所示。
范例8·3.WholeFilelnputFormat使用RecordReader将整个文件读为一条记录
 

class WholeFileRecordReader extends RecordReader<NullWritable,BytesWritable>{
        private FileSplit fileSplit;
        private Configuration conf;
        private BytesWritable value=new BytesWritable();
        private boolean processed=false;

        @Override
        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            this.fileSplit=(FileSplit) inputSplit;
            this.conf=taskAttemptContext.getConfiguration();
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if(!processed){
                byte[] contents=new byte[(int)fileSplit.getLength()];
                Path file=fileSplit.getPath();
                FileSystem fs=file.getFileSystem(conf);
                FSDataInputStream in=null;
                try{
                    in=fs.open(file);
                    IOUtils.readFully(in,contents,0,contents.length);
                    value.set(contents,0,contents.length);
                }finally{
                    IOUtils.closeStream(in);
                }
                processed=true;
                return true;
            }
            return false;
        }

        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException {
            return NullWritable.get();;
        }

        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return processed?1.0f:0.0f;
        }

        @Override
        public void close() throws IOException {
            //do nothing
        }
    }

WholeFileRecordReader负责将FileSplit转换成一条记录,该记录的键是null,值是这个文件的内容。因为只有一条记录,WholeFileRecordReader要么处理这条记录,要么不处理,所以它维护一个名称为processed的布尔变量来表示记录是否被处理过。如果当nextKeyva1ue()方法被调用时,文件没有被处理过,就打开文件,产生一个长度是文件长度的字节数组,并用Hadoop的IOUtils类把文件的内容放人字节数组。然后再被传递到next()方法的BytesWritable实例上设置数组,返回值为true则表示成功读取记录。
其他一些方法都是一些直接的用来访问当前的键和值类型、获取reader进度的方法,还有一个close()方法,该方法由MapReduce框架在reader完成后调用。
现在演示如fi可使用WholeFileInputFormat。假设有一个将若干个小文件打包成顺序文件的MapReduce作业,键是原来的文件名,值是文件的内容。
 

 public class SmallFilesToSequenceFileConverter extends Configured implements Tool{
        public static void main(String[] args) {
            int exitCode=ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);
            System.exit(exitCode);
        }
        
        @Override
        public int run(String[] args) throws Exception {
            Job job=JobBuilder.parseInputAndOutput(this,getConf(),args);
            if(conf==null){
                return -1;
            }
            job.setInputFormatClass(WholeFileInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
            job.setMapperClass(SequenceFileMapper.class);
            return job.waitForCompletion(true)?0:1;
        }

        static class SequenceFileMapper extends Mapper<NullWritable,BytesWritable,Text,BytesWritable>{
            private Text filenameKey;
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                InputSplit split=context.getInputSplit();
                Path path=((FileSplit)split).getPath();
                filenameKey=new Text(path.toString());
            }

            @Override
            protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
                context.write(filenameKey,value);
            }
        }
    }

由于输人格式是wholeFileInputFormat,所以mapper只需要找到文件输人分片的文件名。通过将InputSp1it从context强制转换为FileSplit来实现这点,后者包含一个方法可以获取文件路径。路径存储在键对应的的一个Text对象中。reducer的类型是相同的(没有明确设置),输出格式是SequenceFile0utputFormat。
以下是在一些小文件上运行样例。此处使用了两个reducer,所以生成两个输出顺序文件:

由此产生两部分文件,每一个对应一个顺序文件,可以通过文件系统shell的-text选项来进行检查:

输人文件的文件名分别是a、b、c、d、e和 f,每个文件分别包含10个相应字母(比如,a文件中包含10个"a”字母),e文件例外,它的内容为空。我们可以看到这些顺序文件的文本表示,文件名后跟着文件的十六进制的表示。
至少有一种方法可以改进我们的程序。前面提到,一个mapper处理一个文件的方法是低效的,所以较好的方法是继承CombineFileInputFormat而不是FileInputFormat。

2.文本输入

Hadoop非常擅长处理非结构化文本数据。本节讨论Hadoop提供的用于处理文本的不同InputFormat类。
2.1.TextInputFormat
TextInputFormat是默认的InputFormat。每条记录是一行输人。键是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符),它被打包成一个Text对象。所以,包含如下文本的文件被切分为包含4条记录的一个分片:

On the top of the Crumpetty Tree 
The Quangle Wangle sat, 
But his face you could not see, 
On account of his Beaver Hat. 

每条记录表示为以下键值:
(0, On the top of the Crumpetty Tree) 

(33, The Quangle Wangle sat,) 
(57, But his face you could not see,) 
(89, On account of his Beaver Hat. ) 

很明显,键并不是行号。一般情况下,很难取得行号,因为文件按字节而不是按行切分为分片。每个分片单独处理。行号实际上是一个顺序的标记,即每次读取一行的时候需要对行号进行计数。因此,在分片内知道行号是可能的,但在文件中是不可能的。
然而,每一行在文件中的偏移量是可以在分片内单独确定的,而不需要知道分片的信息,因为每个分片都知道上一个分片的大小,只需要加到分片内的偏移量上,就可以获得每行在整个文件中的偏移量了。通常,对于每行需要唯一标识的应用来说,有偏移量就足够了。如果再加上文件名,那么它在整个文件系统内就是唯一的。当然,如果每一行都是定长的,那么这个偏移量除以每一行的长度即可算出行号。

输入分片与HDFS块之间的关系
FileInputFormat定义的逻辑记录有时并不能很好地匹配HDFS的文件块。例如,TextInputFormat的逻辑记录是以行为单位的,那么很有可能某一行会跨文件块存放。虽然这对程序的功能没有什么影响,如行不会丢失或出错,但这种现象应该引起注意,因为这意味着那些“本地的”map(即map运行在输入数据所在的主机上)会执行一些远程的读操作·由此而来的额外开销一般不是特别明显。图8·3展示了一个例子。一个文件分成几行,行的边界与HDFS块的边界没有对齐·分片的边界与逻辑记录的边界对齐(这里是行边界),所以第一个分片包含第5行,即使第5行跨第一块和第二块·第二个分片从第6行开始。

2.2.控制一行最大的长度
如果你正在使用这里讨论的文本输人格式中的一种,可以为预期的行长设一个最大值,对付被损坏的文件。文件的损坏可以表现为一个超长行,这会导致内存溢出错误,进而任务失败。通过将mapreduce.input.linerecordreader.line.maxlength设置为用字节数表示的、在内存范围内的值(适当超过输人数据中的行长),可以确保记录reader跳过(长的)损坏的行,不会导致任务失败。

2.3.关于KeyVaIueTextlnputFormat
TextInputFormat的键,即每一行在文件中的字节偏移量,通常并不是特别有用。通常情况下,文件中的每一行是一个键·值对,使用某个分界符进行分隔,比如制表符。例如由TextOutputFormat(即Hadoop默认OutputFormat)产生的输出就是这种。如果要正确处理这类文件,KeyvalueTextInputFormat比较合适。
可以通过mapreduce.input.keyvaluelinerecordreader.key.value.separator属性来指定分隔符。它的默认是一个制表符。以下是一个范例,其中一表示一个(水平方向的)制表符:
line1->On the top of the Crumpetty Tree
line2->The QuangIe wangle sat,
line3->But his face you could not see,
line4->On account of his Beaver Hat.
与TextInputFormat类似,输人是一个包含4条记录的分片,不过此时的键是每行排在制表符之前的Text序列:
(line1,On the top of the Crumpetty Tree)
(line2,The QuangIe wangle sat,)
(line3,But his face you could not see,)
(line4,On account of his Beaver Hat.)

2.4.关于NLineInputFormat
通过TextInputFormat和KeyValueTextInputFormat,每个mapper收到的输人行数不同。行数取决于输人分片的大小和行的长度。如果希望mapper收到固定行数的输人,需要将NLineInputFormat作为lnputFormat使用。与TextInputFormat一样,键是文件中行的字节偏移量,值是行本身。
N是每个mapper收到的输人行数。设置为1(默认值)时,每个mapper正好收到一行输人。mapreduce.input.lineinputformat.linespermap属性控制N值的设定。仍然以刚才的4行输人为例:

On the top of the Crumpetty Tree
The QuangIe Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

例如,如果N是2,则每个输人分片包含两行。一个mapper收到前两行键.值对:
(0,On the top of the Crumpetty Tree)
〈33,The QuangIe Wangle sat,)
另一个mapper则收到后两行:
(57,But his face you could not see,)
(89,On account of his Beaver Hat.)
键和值与TextInputFormat生成的一样。不同的是输人分片的构造方法。
通常来说,对少量输人行执行map任务是比较低效的(任务初始化的额外开销造成的),但有些应用程序会对少量数据做一些扩展的(也就是CPU密集型的)计算任务,然后产生输出。仿真是一个不错的例子。通过生成一个指定输人参数的输人文件,每行一个参数,便可以执行一个参数扫描分析(parameter sweep):并发运行一组仿真试验,看模型是如何随参数不同而变化的。

在一些长时间运行的仿真实验中,可能会出现任务超时的情况。一个任务在10分钟内没有报告状态,applicationmaster就会认为任务失败,进而中止进程(参见7.2.1节的详细讨论)。
这个问题最佳解决方案是定期报告状态,如写一段状态信息,或增加计数器的值。详情可以参见7.1.5节的补允材料“MapReduce中进度的组成”。
另一个例子是用Hadoop引导从多个数据源(如数据库)加载数据。创建一个“种子”输人文件,记录所有的数据源,一行一个数据源。然后每个mapper分到一个数据源,并从这些数据源中加载数据到HDFS中。这个作业不需要reduce阶段,所以reducer的数量应该被设成0(通过调用Job的setNumReduceTasks()来设置)。进而可以运行MapReduce作业处理加载到HDFS中的数据。范例参见附录C。

2.5.关于XML
大多数XML解析器会处理整个XML文档,所以如果一个大型XML文档由多个输人分片组成,那么单独解析每个分片就相当有挑战。当然,可以在一个mapper上(如果这个文件不是很大),可以用8.2.1节介绍的方法来处理整个XML文档。

由很多“记录”(此处是XML文档片断)组成的XML文档,可以使用简单的字符串匹配或正则表达式匹配的方法来查找记录的开始标签和结束标签,而得到很多记录。这可以解决由MapReduce框架进行分割的问题,因为一条记录的下一个开始标签可以通过简单地从分片开始处进行扫描轻松找到,就像TextInputFormat确定新行的边界一样。
Hadoop提供了StreamXmIRecordReader类(在org.apache.hadoop.streaming.mapreduce包中,还可以在Streaming之外使用)。通过把输人格式设为StreamInputFormat,把stream.recordreader.class属性设为org.apache.hadoop.streaming.mapreduce.StreamXmlRecordReader来用StreamXmlRecordReader类。reader的配置方法是通过作业配置属性来设reader开始标签和结束标签(详情参见这个类的帮助文档)。
例如,维基百科用XML格式来提供大量数据内容,非常适合用MapReduce来并行处理。数据包含在一个大型的打包文档中,文档中有一些元素,例如包含每页内容和相关元数据的page元素。使用StreamXmlRecordReader后,这些page元素便可解释为一系列的记录,交由一个mapper来处理。

3.二进制输入

Hadoop的MapReduce不只是可以处理文本信息,它还可以处二进制格式的数据。
3.1.关于SequenceFiIeInputFormat类
Hadoop的顺序文件格式存储二进制的键.值对的序列。由于它们是可分割的(它们有同步点,所以reader可以从文件中的任意一点与记录边界进行同步,例如分片的起点),所以它们很符合MapReduce数据的格式要求,并且它们还支持压缩,可以使用一些序列化技术来存储任意类型。详情参见5.4.1节。
如果要用顺序文件数据作为MapReduce的输人,可以使用SequenceFileInputFormat0键和值是由顺序文件决定,所以只需要保证map输人的类型匹配。例如,如果顺序文件中键的格式是lntwritable,值是Text,就像第5章中生成的那样,那么mapper的格式应该是Mapper<Intwritable,Text,K,V>,其中K和V是这个mapper输出的键和值的类型。
虽然从名称上看不出来,但SequenceFileInputFormat可以读map文件和顺序文件。如果在处理顺序文件时遇到目录,SequenceFileInputFormat会认为自己正在读map文件,使用的是其数据文件。因此,如果没有MapFileInputFormat类,也是可以理解的。
3.2.关于SequenceFileAsTextlnputFormat类
SequenceFileAsTextInputFormat是SequenceFileInputFormat的变体,它将顺序文件的键和值转换为Text对象。这个转换通过在键和值上调用toString()方法实现。这个格式使顺序文件作为Streaming的合适的输人类型。
3.3.关于SequenceFiIeAsBinaryInputFormat类
SequenceFi1eAsBinaryInputFormat是SequenceFileInputFormat的一种变体,它获取顺序文件的键和值作为二进制对象。它们被封装为BytesWritable对象,因而应用程序可以任意解释这些字节数组。与使用SequenceFile.Reader的appendRaw()方法或SequenceFileAsBinaryOutputFormat创建顺序文件的过程相配合,可以提供在MapReduce中可以使用任意二进制数据类型的方法(作为顺序文件打包),不过呢,插人Hadoop序列化机制通常更简洁,详情参见5,3.4节。

3.4.关于FixedLengthInputFormat类
FixedLengthInputFormat用于从文件中读取固定宽度的二进制记录,当然这些记录没有用分隔符分开。必须通过fixedlengthinputformat.record.length设置每个记录的大小。

4.多个输入

虽然一个MapReduce作业的输人可能包含多个输人文件(由文件glob、过滤器和路径组成),但所有文件都由同一个InputFormat和同一个Mapper来解释。然而,数据格式往往会随时间演变,所以必须写自己的mapper来处理应用中的遗留数据格式问题。或者,有些数据源会提供相同的数据,但是格式不同。对不同的数据集进行连接(join,也称“联接”)操作时,便会产生这样的问题。详情参见9.3.2节。例如,有些数据可能是使用制表符分隔的文本文件,另一些可能是二进制的顺序文件。即使它们格式相同,它们的表示也可能不同,因此需要分别进行解析。
这些问题可以用MultipleInputs类来妥善处理,它允许为每条输人路径指定InputFormat和Mapper。例如,我们想把英国Met Office的气象数据和NCDC的气象数据放在一起来分析最高气温,则可以按照下面的方式来设置输人路径:
MultipleInputs.addInputPath(job,ncdcInputPath,TextInputFormat.class,maxTemperatureMapper.class);
MultipleInputs.addInputPath(job,metOfficeInputPath,TextInputFormat.class,MetofficeMaxTemperatureMapper.class);
这段代取代了对FileInputFomat.addInputPath()和job.setMapperClass()的常规调用。MetOfflce和NCDC的数据都是文本文件,所以对两者都使用TextInputFormat数据类型。但这两个数据源的行格式不同,所以我们使用了两个不一样的mapper。MaxTemperatureMapper读取NCDC的输人数据并抽取年份和气温字段的值。MetOfficeMaxTemperatureMapper读取Met Office的输人数据,抽取年份和气温字段的值。重要的是两个mapper的输出类型一样,因此,reducer看到的是聚集后的map输出,并不知道这些输人是由不同的mapper产生的。
MultipleInputs类有一个重载版本的addInputPath()方法,它没有mapper参数:

public static void addInputPath(Job job,Path path,class<? extends InputFormat> inputFormatClass)
如果有多种输人格式而只有一个mapper(通过Job的setMapperClass()方法设定),这种方法很有用。

5.数据库输入(和输出)

DBInputFormat这种输人格式用于使用JDBC从关系型数据库中读取数据。因为它没有任何共享能力,所以在访问数据库的时候必须非常小心,在数据库中运行太多的mapper读数据可能会使数据库受不了。正是由于这个原因,DBInputFormat最好用于加载小量的数据集,如果需要与来自HDFS的大数据集连接,要使用MultipleInputs。与之相对应的输出格式是DB0utputFormat,它适用于将作业输出数据(中等规模的数据)转储到数据库。
在关系型数据库和HDFS之间移动数据的另一个方法是:使用Sqoop,具体描述可以参见第巧章。
HBase的TableInputFormat的设计初衷是让MapReduce程序操作存放在HBase表中的数据。而TableOutputFormat则是把MapReduce的输出写到HBase表。


 

猜你喜欢

转载自blog.csdn.net/shenchaohao12321/article/details/81192713