MapReduce的类型
Context类对象用于输出键-值对
map: (k1, v1) -> list(k2, v2)
combiner: (k2, list(v2)) -> list(k2, v2)
reduce: (k2, list(v2)) -> list(k3, v3)
partition函数对中间结果的键值对 (k2 , v2)进行处理,并返回一个分区索引 partition index
partition: (k2 , v2) - integer
输入数据类型由输入格式进行设置:
TextInputFormat 的键类型是 LongWritable(行偏移量) ,值是 Text
其他类型通过job类的方法显式进行设置,如果 k2,k3相同,则不需要调用setMapOutputKeyClass(),
如果v2,v3相同,只需要使用setOutPutValueClass()
默认的partitioner 是 HashPartitioner,它对每条记录的键进行哈希操作以决定记录应该属于哪个分区,
每个分区有一个reduce任务处理,所以分区数等于作业的reduce任务个数
map的任务数量等于输入文件被划分的分块数
reduce 增加数量可缩短reduce过程,一条法则:目标reducer保持在每个运行5分钟左右,且产生至少一个HDFS块的输出比较合适
public class StationPartitioner extends Partitioner<LongWritable, Text> {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public int getPartition(LongWritable key, Text value, int numPartitions) {
parser.parse(value);
return getPartition(parser.getStationId());
}
private int getPartition(String stationId) {
/*...*/
// ^^ StationPartitioner
return 0;
// vv StationPartitioner
}
输入格式
输入分片与记录
一个输入分片就是由单个map操作来处理的输入块,分片不包含数据本身,而是指向数据的引用
MapReduce不必直接处理InputSplit,它是由InputFormat创建的(InputFormat负责创建输入分片并将他们分隔成记录)
public abstract class InputFormat<K, V> {
public abstract List<InputSplit> getSplits(JobContext paramJobContext) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit paramInputSplit,
TaskAttemptContext paramTaskAttemptContext) throws IOException, InterruptedException;
}
FileInputFormat类
FileInputFormat是所有文件作为其数据源的InputFormat实现的基类
有两个功能:
- 用于支出作业的输入文件位置
- 为输入文件生成分片的代码实现
FileInputFormat输入路径:
四中静态方法设定job输入路径:
- addInputPath
- addInputPaths
- setInputPaths
- setInputPaths
FileInputFormat输入分片
小文件与CombineFileInputFormat
- 相对于大批量的小文件,Hadoop更适合处理少量的大文件
- 如果一个把1GB文件按100KB划分,10000个文件
- CombineFileInputFormat把多个文件打包到一个分片中,以便mapper可以处理更多数据,但是小文件会增加namenode寻址次数,可以使用顺序文件将,这些小文件合并为一个大文件
避免切分
如果不希望文件被切分:
- 增加最小分片大小
- 使用FileInputFormat具体子类,重写isSplitable(),把返回值设置为false
public class NonSplittableTextInputFormat extends TextInputFormat {
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
}
mapper中的文件信息
调用Mapper的Context对象上的 getInputSplit()
把整个文件作为一条记录处理
public class WholeFileInputFormat
extends FileInputFormat<NullWritable, BytesWritable> {
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}