Hadoop MapReduce的类型和格式

1.MapReduce的类型

Hadoop的MapReduce函数遵循如下常规格式:

--map:(K1, V1) -> list(K2, V2)

--combine:(K2, list(V2)) -> list(K2, V2)

--partition:(K2, V2) -> integer

--reduce:(K2, list(V2)) -> list(K3, V3)

输入数据(K1,K2)的类型由输入格式进行设置。

由于JAVA的泛型存在类型擦除,其它的类型不能从mapper,reducer等直接导出,而需要手动设置。

其余设置如下表所示:


1)默认的MapReduce作业

job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(Mapper.class);

job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);

job.setPartitionerClass(HashPartitioner.class);

job.setNumReduceTasks(1);
job.setReducerClass(Reducer.class);

job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);

job.setOutputFormatClass(TextOutputFormat.class);
 HashPartitioner:对每条记录的键进行哈希操作以决定该记录属于那个分区让reduce处理。每个分区对应一个reducer任务,所以分区数等于作业的reduce的个数

2.输入格式

1)输入分片和记录

一个输入分片(split)是由单个map处理的输入块。每一个map只处理一个输入分片。每个分片被划分为若干个记录,每条记录就是一个key/value对,map一个接一个的处理每条记录。输入分片和记录都是逻辑的,不必将他们对应到文件上。

public abstract class InputSplit {
	//以字节为单位的长度
	public abstract long getLength() throws IOException, InterruptedException;
	//分片的存储位置
	public abstract String[] getLocations() throws IOException, InterruptedException;
}
注:一个分片并不包含数据本身,而是指向数据的引用。

长度:用来排序分片,以便优先处理最大的分片,从而最小化作业运行时间(近似贪婪算法)。

存储位置:供MapReduce系统使用以便将Map任务尽量放在分片数据附近。 

2)InputFormat

InputFormat负责创建输入分片并将它们分割成记录

public abstract class InputFormat<K, V> {

  public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;
                               
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

}

客户端通过调用getSpilts()方法获得分片列表。

JobTracker使用分片的存储位置来调度map任务。

Map任务把分片传给InputFormat的createRecordReader方法来获得这个分片的RecordReader。

RecordReader就是记录上的迭代器,map任务用一个RecordReader来生成记录的key/value对,然后再传递给map函数。

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
	......
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
}

--FileInputFormat类

FileInputFormat是所有使用文件为数据源的InputFormat实现的基类.

它提供了两个功能:一个定义哪些文件包含在一个作业的输入中;一个为输入文件生成分片的实现。把分片割成记录的作业由其子类来完成。 

设置JobConf的输入路径:

public static void addInputPath(Job job, Path path);
public static void addInputPaths(Job job, String commaSeparatedPaths);
public static void setInputPaths(Job job, Path... inputPaths);
public static void setInputPaths(Job job, String commaSeparatedPaths);
一个路径可以表示一个文件,一个目录或是一个glob(文件模式)。

注:目录中的内容不会被递归处理。如果包含子目录,也会被当做文件,从而产生错误。

设置PathFilter排除特定文件。无论设置与否,FileInputFormat会使用默认过滤器来排除隐藏文件(以.和_开头的文件)。

public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter);
也可以通过配置属性来设置:
mapred.input.dir:逗号分隔的路径
mapred.input.path.filter.class:PathFilter类名
----FileInputFormat类的输入分片


分片大小的计算公式为

max(minimumSize, min(maximumSize, blockSize))

注:FileInputFormat定义的逻辑记录有时并不能很好地匹配HDFS的文件块,即逻辑记录的边界和HDFS的块的边界没有对齐。因此那些“本地”map可能会执行一些远程的读操作。


----CombineFileInputFormat

FileInputFormat生成的InputSplit是一个文件或该文件的一部分。

CombineFileInputFormat把多个文件打包到一个分片中以便每个mapper可以处理更多地数据。并且在决定哪些块放入同一个分片时,它会考虑节点和机架的因素,所以在典型的MapReduce作业中处理输入的速度并不会下降。

CombineFileInputFormat是一个抽象类,没有提供实体类,所以使用的时候要创建一个CombineFileInputFormat的具体类和实现createRecordReader()方法。

----避免切分

方法一:增加最小分片大小

方法二:覆盖FileInputFormat的isSplitable方法

----mapper中文件分片信息的相关属性

----TextInputFormat

TextInputFormat是默认的InputFormat。每条记录是一行输入。key是LongWritable类型,存储该行在整个文件中的字节偏移量,value是这行的内容,不包括任何终止符(换行符和回车符),它是Text类型

----KeyValueTextInputFormat 

和TextInputFormat类似,KeyValueTextInputFormat也是将文件中的每行解析成一条记录,不同的是,它将每行的文本根据分隔符来拆分成键值对。默认的分隔符为制表符。可通过mapreduce.input.keyvaluelinerecordreader.key.value.separator属性设置。

----NLineInputFormat

如果希望mapper收到固定行数的输入,需要使用NLineInputFormat作为InputFormat。与TextInputFormat一样,key是文件中行的字节偏移量,值是行本身。

N是每个mapper收到的输入行数,默认时N=1,每个mapper会正好收到一行输入,mapreduce.input.lineinputformat.linespermap属性控制N的值。

----SequenceFileInputFormat

如果要用顺序文件数据作为MapReduce的输入,应用SequenceFileInputFormat。key和value由顺序文件决定,所以只需要保证map输入的类型匹配。

SequenceFileInputFormat可以读MapFile和SequenceFile。如果在处理顺序文件时遇到目录,SequenceFileInputFormat类会认为正在读MapFile,使用的是其数据文件,因此没有MapFileInputFormat类也是可以理解的。

----SequenceFileAsTextInputFormat和SequenceFileAsBinaryInputFormat

两者均是SequenceFileInputFormat的变体,前者将顺序文件(其实就是SequenceFile)的key和value转成Text对象,后者获取顺序文件的key和value作为二进制对象。

----多种输入MultipleInputs

MultipleInputs允许为每条输入路径指定InputFormat和Mapper,不同的Mapper输出的KEY/VALUE类型必须一致。

----数据库输入和输出

DBInputFormat和DBOutputFormat,HBase的TableInputFormat和TableOutputFormat。

运行太多的mapper数据库中读取数据可能会使数据库受不了。

另一种方法是使用Sqoop。

3.输出格式

1)TextOutputFormat

默认的输出格式,它本每条记录写成文本行。它的key/value可以是任意类型,因为TextOutputFormat调用toString()方法把它们转成字符串。

默认情况下key和value用制表符分割,可以通过mapreduce.output.textoutputformat.separator进行设置,与TextOutputFormat对应的输入格式是KeyValueTextInputFormat。

可以使用NullWritable来省略输出的key和value,这也会导致无分隔符输出。

2)SequenceFileOutputFormat & SequenceFileAsBinaryOutputFormat

SequenceFileOutputFormat将它的输出写为一个顺序文件。

SequenceFileAsBinaryOutputFormat把key/value对作为二进制格式写到一个SequenceFile容器中。

3)MapFileOutputFormat

把MapFile作为输出,MapFile中的key必需顺序添加,所以必须确保reducer输出的key已经排好序。

4)多个输出MultipleOutputs

5)延迟输出

FileOutputFormat的子类会产生输出文件,即使文件时空的。但是有些应用倾向于不创建空文件,此时就可以使用LazyOutputFormat了,它是一个封装输出格式,可以保证指定分区第一条记录输出时才真正的创建文件。要使用它,用JobConf和相关输出格式作为参数来调用setOutputFormatClass()方法即可。

猜你喜欢

转载自siyuan-zhu.iteye.com/blog/2039594