Hadoop中的MapReduce库支持集中不同的格式的输入数据。例如,文本模式的输入数据的每一行被视为一个key/value键值对。key是文件的偏移量,value是那一行的内容。另一种常见的格式是以key进行排序来存储key/value键值对的序列。每种输入类型的实现都必须能够把数据分割成数据片段,并能够由单独的Map任务来对数据片段进行后续处理。
1. 输入格式-InputFormat
当运行一个MapReduce作业的时候,我们需要为作业制定它的输入格式。InputFormat作为Hadoop作业的所有数入格式的抽象基类,它描述了作业的输入需要满足的规范细节。
1.1 InputFormate抽象类
InputFormat所在的包为org.apache.hadoop.mapredce,在该抽象类中定义了连个抽象类:
/**
* 该方法的主要作用是将所有的输入文件分割成逻辑上的多个分片InputSplit,每个InputSplit通过输入文件路 * 径、开始位置和偏移量三个信息进行来唯一标识。
*/
public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
/**
* 该方法的主要作用是为制定的InputSplit创建记录读取器
*/
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, nterruptedException;
InputFormat有三个直接子类:FIleInputFormat、DBInputFormat、DelegatingInputFormat。而FileInputFormat又包括TextInputFormat、KeyValueInputFormat、CombineFileInputFormat、NineInputFormat和SequenceFIleInputFormat五个子类。
1.2 FIleInputFormat文件输入格式
文件是Hadoop作业最为常用的格式,FIleInputFormat可以根据文件的总大小来将输入文件分割成若干个输入切片。为了保证整个记录不被截断,我们必须采取相应的处理策略,如通过创建RecordReader来保证记录的完整性,从而为Map提供一个面向记录的逻辑分块的试图。
1.2.1 成员属性
public static final String INPUT_DIR =
"mapreduce.input.fileinputformat.inputdir";//逗号分隔输入路径列表
public static final String SPLIT_MAXSIZE =
"mapreduce.input.fileinputformat.split.maxsize";//输入切片的最大size
public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";//输入切片的最小size
public static final String PATHFILTER_CLASS =
"mapreduce.input.pathFilter.class";//输入文件的过滤器类,只有通过过滤器的文件才会加入
public static final String NUM_INPUT_FILES =
"mapreduce.input.fileinputformat.numinputfiles";
public static final String INPUT_DIR_RECURSIVE =
"mapreduce.input.fileinputformat.input.dir.recursive";
public static final String LIST_STATUS_NUM_THREADS =
"mapreduce.input.fileinputformat.list-status.num-threads";
public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
private static final double SPLIT_SLOP = 1.1; // 10% slop,切片容量溢出阈值
1.2.2 成员方法
/**
*获取输入路径上所有文件的操作,每个文件使用FileStatus对象代替,如果输入路径为空或未满足过滤条件将报错
*/
protected List<FileStatus> listStatus(JobContext job) throws IOException
/**
* 生成输入文件的输入切片的方法,生成的输入切片是FileSplit格式的。
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
//首先取得输入切片的上界和下界
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits,初始化用于保存生成的输入切片的列表对象
List<InputSplit> splits = new ArrayList<InputSplit>();
//取得所有的输入文件列表
List<FileStatus> files = listStatus(job);
//对于文件列表中的每一个文件进行相应的分割处理,然后生成文件的输入切片列表
for (FileStatus file: files) {
Path path = file.getPath();//取得文件的Path对象
long length = file.getLen();//取得文件的长度
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {//判断是否为本地的文
//获取本地文件的所有块信息
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
//取得文件所在的文件系统
FileSystem fs = path.getFileSystem(job.getConfiguration());
//获取文件所在的文件系统的相应块信息
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {//判断文件是否可以被切割
long blockSize = file.getBlockSize();//取出文件块的大小
//将文件系统数据块的大小、输入切片的上下界作为参数传递给computeSplitSize方法来计算出真正的输入切边的大小,计算策略为:首先取出块大小和设置的切片大小上界中的较小值,然后在取出上一步计算出的较小值和设置的切片大小的下界的较大值,最终第二部取出的较大值作为实际切片的大小。
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//剩余文件大小的初始值为整个切片的大小
long bytesRemaining = length;
//如果文件未切割部分大小比切片daxiao的1.1倍还要大,那么就创建一个FileSplit切片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
//首先取得当前切片所在数据块的索引
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
//新建FileSplit,并添加到输入切片列表
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
//将剩余文件的大小剪去切片大小,并将返回值作为新的剩余文件的大小
bytesRemaining -= splitSize;
}
//当剩余文件大小比切片大小1.1倍小时,将剩余部分作为整个FileSplit切片处理
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable,如果文件是不可切割的,那么将整个文件作为一个FileSplit
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
//如果输入文件是可分割的,但是文件大小为0,那么创建一个默认的FileSplit切片
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
1.3 TextInputFormat文本文件输入格式
TextInputFormat是FIleInputFormat抽象类的默认实现。该输入格式主要针对的是文本类型的文件,文件被分割成许多的行,而且每一行使用换行符后者【Enter】键作为每一行结束标识。该类主要重写了createRecordReader和isSplitable方法:
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
//创建LineRecordReader行记录读取器,该读取器用于从文件中读取一行
String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
//根据是否能够找到与压缩文件对于的编码器来决定是否对输入文件进行分割处理
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
1.4 KeyValueTextInputFormat键值对文本输入格式
该类是与TextInputFormat具有类似性质的针对文本类型文件,文件被分割成许多的行,而且每一行使用换行符后者【Enter】键作为每一行结束标识。只不过KeyValueTextInputFormat所对应的文件中的每一行是由一个特殊的分割符所割成的键值对。
1.5 CombineFileInputFormat组合文件输入格式
1.6 SequenceFileInputFormat序列文件输入格式
2. 输入切片-InputSplit
输入切片InputSplit是一个单独的Map要处理的数据单元。输入切片的数据类型一般都是字节类型。输入切片经过相应的RecordReader处理之后,转化成记录视图的形式,然后交给Map处理。一般一条记录一一个键值对的形式来展现。
InputSplitt所在的包为org.apache.hadoop.mapredce,该抽象类定义了两个抽象方法:
//取得输入切片的大小
public abstract long getLength() throws IOException, InterruptedException;
//取得保存该输入切片的数据节点的列表
public abstract String[] getLocations() throws IOException, InterruptedException;
InputSplit的实现类包括:FileSplit、CombineFielSplit和DBInputFormat.DBInputSplit。
2.1 FileSplit文件输入切片
FileSplit是默认的InputSplit,这一点可以从FileInputFormat的创建输入切片的方法中体现出来。
2.1.1 成员变量
private Path file;//该输入切片所在的文件
private long start;//该输入切片在文件中的起始位置
private long length;//该输入切片的大小
private String[] hosts;//保存该输入切片的主机列表
private SplitLocationInfo[] hostInfos;//保存该输入切片的数据节点列表
2.1.2 成员方法
该类主要的成员方法就是上面成员变量所对应的get方法,以及Wrtiable接口的实现方法。FileSplit只对它所在的文件、起始位置、和切片大小属性进行序列化:
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, file.toString());
out.writeLong(start);
out.writeLong(length);
}
同理,FileSplit也只对它所在的文件、起始位置、和切片大小属性进行反序列化,主机列表属性会被默认为0:
@Override
public void readFields(DataInput in) throws IOException {
file = new Path(Text.readString(in));
start = in.readLong();
length = in.readLong();
hosts = null;
}
2.2 CombineFileSplit多文件输入切片
CombineFielSplit是与前面介绍的CombineFielInputFormat输入格式相对应的输入切片类型。FileSpilt代表一个文件的一个输入切片,而CombinFileSplit切片代表将来自多个文件的输入切片的一个输入切片。虽然每个CombineFIleSplit切片一般会包含来自不同文件的数据块,但是在同一个切片中的所有数据块一般都是在同一个机架上的。
2.2.1 成员变量
private Path[] paths;//该输入切片所在的文件
private long[] startoffset;//每个子切片在对应得文件中起始位置
private long[] lengths;//该输入切片的大小
private String[] locations;//每个子切片所在的机器名
private long totLength;//所有子切片的长度纸盒
2.2.2 成员方法
该类主要的成员方法就是上面成员变量所对应的get方法,以及Writable接口的实现方法。
3. 记录读取器-RecordReader
FileInputFormat的一些子类中都实现了createRecordReader方法,并返回了用于处理输入切片的RecordReader。该类包含6个抽象方法:
public abstract void initialize(InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException;
//初始化方法,该方法只会在初始化执行一次,输入参数包括输入切片和任务尝试上下文。
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
//取得输入切片的下一个键值对,如果读取到键值对该方法返回ture,否则返回false。
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
//取得当前读取到的键值对的键对象。
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
//取得当前读取到的键值对的值对象。
public abstract float getProgress() throws IOException, InterruptedException;
//取得当前数据读取的进度,该方法会返回0.0到1.0之间的浮点数。
public abstract void close() throws IOException;
//该方法是java.io的Closeable接口中的方法,用于关闭RecordReader,节约资源。
4. 输出格式-OutputFormat
4.1 OutputFormat抽象类
OutputFormat抽象类描述了MapReduce作业的输出规范,它决定了MapReduce的作业输出结果保存到哪里,以及如何对输出结果进行持久化操作。主要工作有:
- 检查作业的输出是否有效,比如检查输出目录是否存在;
- 提供一个具体的RecordWriter实现类。Hadoop依靠该实现类将MapReduce作业的处理结果保存到制定文件系统的文件中,一般写到HDFS文件系统中。
public abstract void checkOutputSpecs(JobContext context
) throws IOException,
InterruptedException;
//检查作业输出目录是否存在。
public abstract RecordWriter<K, V>
getRecordWriter(TaskAttemptContext context
) throws IOException, InterruptedException;
//返回RecordWriter,用来将产生的键值进行输出。
public abstract
OutputCommitter getOutputCommitter(TaskAttemptContext context
) throws IOException, InterruptedException;
//返回OutputCommitter,用来将提交输出结果。
4.2 FileOutputFormat 文件输出格式
该类在工作的过程中利用大量配置对象中所包含的配置项,如是否压缩等。
4.2.1 枚举类
public static enum Counter {
BYTES_WRITTEN
}
//该枚举类型定义了一个定时器,它记录了输出的字节数。
4.2.2 成员变量
//用于输出文件名的数字部分,对处理结果进行排序输出
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
//输出文件的基名称相对应得配置项,如类似名称part-00xx中的part部分
protected static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
//默认输出文件的基名称为“part”
protected static final String PART = "part";
//对文件类型的输出进行提交的提交器
private FileOutputCommitter committer = null;
4.2.3 静态代码块
//用于设置上面定义的NUMBER_FORMAT变量的属性,如设置数值的整数部分最少为5位吗,不够5位的用0填充; //setGroupingUsed(false)设置数字部分不进行分组操作,如12345不会被格式化为12,345
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}
4.2.4 成员方法
//通过配置项决定输出是否进行压缩操作
public static void setCompressOutput(Job job, boolean compress) {
job.getConfiguration().setBoolean(FileOutputFormat.COMPRESS, compress);
}
//通过配置项决定采用的压缩器
public static boolean getCompressOutput(JobContext job) {
return job.getConfiguration().getBoolean(
FileOutputFormat.COMPRESS, false);
}
//通过配置项决定输出目录
public static void setOutputPath(Job job, Path outputDir) {
...
}
//生成一个唯一的输出文件名,输出格式为$name-[mr]-$taskid$extension,其中那么就是BASE_OUPUT_NAME
public synchronized static String getUniqueFile(TaskAttemptContext context,
String name,
String extension) {
...
}
4.3 TextOutFormat 文本格式的文件输出格式
4.4 SequenceFileOutputFormat 普通序列文件输出格式
4.5 SequenceFileAsBinaryOutputFormat 二进制序列文件输出格式
4.6 FilterOutputFormat 过滤器输出格式
4.6 DBOutputFormat 数据库输出格式
4.8 MultipleOutputs 多种输出格式
5. 记录写入器-RecordWriter
RecordWriter用于将MapReduce作业的键值对结果写入到制定的输出中。
//将键值对以制定格式写入到输出目录
public abstract void write(K key, V value
) throws IOException, InterruptedException;
//关闭输出,节约资源
public abstract void close(TaskAttemptContext context
) throws IOException, InterruptedException;
6. 输出提交器-OutputCommitter
OutputCommitter主要用于控制MapReduce作业的输出环境。主要一下工作:
- 在OutputCommitter初始化时启动jon。比如会创建job的临时输出目录;
- 在作业完成之后清除job申请的资源。比如会删除job的临时目录;
- 为Mapper或者Reducer任务创建临时的输出目录;
- 检查Mapper或者Reducer任务是否需要提交;
- 提交或者丢弃MapReduce任务的输出。