Hadoop MapReduce Job 相关参数设置 概念介绍与理解
InputFormat:
作用:将输入的文件分成 一个个split,并且将split 拆分成一个个<key,value> 作为 map的输入。
使用:通过job.setInputFormatClass() 进行设置
原理:
InputFormat 仅有两个抽象方法:
1、List<InputSplit> getSplits() 获取文件计算出的输入分片,解决数据分割成片的问题。
2、RecordReader<K,V>createRecordReader() 从InputSplit 中读取,解决从分片中读取数据地方问题。将读入Map的数据拆分成<key,value>类
public abstract class InputFormat<K, V> {
/**
* 仅仅是逻辑分片,并没有物理分片,所以每一个分片类似于这样一个元组 <input-file-path, start, offset>
*/
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException;
/**
* Create a record reader for a given split.
*/
public abstract RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException;
}
实例运用:
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Text;
/**
* 类说明
*
* <pre>
* Modify Information:
* Author Date Description
* ============ =========== ============================
* DELL 2017年3月15日 Create this file
* </pre>
*
*/
public class TextArrayWritable extends ArrayWritable {
public TextArrayWritable() {
super(Text.class);
}
public TextArrayWritable(Text[] strings) {
super(Text.class, strings);
}
}
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* 类说明
*
* <pre>
* Modify Information:
* Author Date Description
* ============ =========== ============================
* DELL 2017年3月15日 Create this file
* </pre>
*
*/
public class CSVInputFormat extends FileInputFormat<LongWritable, TextArrayWritable> {
public static final String CSV_TOKEN_SEPARATOR_CONFIG = "csvinputformat.token.delimiter";
@Override
protected boolean isSplitable(JobContext context, Path filename) {
CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
return codec == null;
}
@Override
public RecordReader<LongWritable, TextArrayWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
String csvDelimiter = context.getConfiguration().get(CSV_TOKEN_SEPARATOR_CONFIG);
Character separator = null;
if (csvDelimiter != null && csvDelimiter.length() == 1) {
separator = csvDelimiter.charAt(0);
}
return new CSVRecordReader(separator);
}
}
Mapper :
作用:实现map函数,根据输入<key,value> 对生成中间结果。
使用:可以通过job.setMapperClass() 进行设置
源码:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Context(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN, VALUEIN> reader,
RecordWriter<KEYOUT, VALUEOUT> writer,
OutputCommitter committer, StatusReporter reporter,
InputSplit split) throws IOException, InterruptedException {
super(conf, taskid, reader, writer, committer, reporter, split);
}
}
/**
* 预处理,仅在map task启动时运行一次
*/
protected void setup(Context context) throws IOException,
InterruptedException {
}
/**
* 对于InputSplit中的每一对<key, value>都会运行一次
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value, Context context)
throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* 扫尾工作,比如关闭流等
*/
protected void cleanup(Context context) throws IOException,
InterruptedException {
}
/**
* map task的驱动器
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}
原理:输入时一个key -value 输出一个key-value 主要写个类重写Mapper的map方法
实例:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("key: "+key.toString());
System.out.println("value:"+value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Reducer :
作用:实现reduce 类,将中间结果合并,得到最终结果。
使用:job.setReducerClass()方法进行设置
使用实例:
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Combiner :
作用: 实现map函数 合并中间结果中有相同key的键值对。
使用: job.setCombinerClass()方法进行设置, 默认为null 不合并, 输出是reduce的输入
实例使用:
public static class Combine extends Reducer<Text,Text,Text,Text> {
// Reduce Method
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sum = 0;
int count = 0;
for (Text value : values) {
String fields[] = value.toString().split(",");
sum += Double.parseDouble(fields[0]);
count += Integer.parseInt(fields[1]);
}
context.write(key, new Text(sum+","+count));
}
}
partioner类:
作用: 实现getPartition()函数用来在Shuffle 过程中,按照key 将中间数据分成R份, 每一份由一个Reducer 负责。
使用:job.setPartitioner() 进行设置,默认是HashPartitoner
Shuffle 作用: 将Map 阶段的输出,Copy Reduce节点本地
使用实例:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author xadil
*
*/
public class StatementsPartioner extends Partitioner<StatementsKeyType, Text> {
@Override
public int getPartition(StatementsKeyType key, Text value, int numPartitions) {
return Math.abs(key.getAccountId().hashCode()*127)%numPartitions;
}
}
OutputFormat :
作用: 负责输出最终结果
使用:job.setOutputFormatClass() 进行设置, 默认是TextOutputFormat 类,TextOutputFormat 将结果写成文件。每行一个key -value 之间以制表符分开。
使用实例:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author xadil
*
*/
public class StatementPDFOutputFormat<K,V> extends FileOutputFormat<K,V> {
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
Path file = getDefaultWorkFile(job, "");
return new PDFStatementWriter<K,V>(file,job.getConfiguration());
}
/*@Override
public RecordWriter<K, V> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get(SEPERATOR, "\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
Path fullPath = new Path(file, "result.txt");
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(fullPath, false);
return new PDFWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(fullPath, false);
return new PDFWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}*/
}
OutputKey:
作用:设置最终结果key的类型
OutputValue:
作用:设置最终结果value的类型。
MapOutputKey:
作用:设置中间结果(Mapper)key的类型
使用:方法类似
MapOutputValue:
作用:设置中间结果(Mapper)value的类型。
使用:job.setMapOutputValueClass()
GroupingComparator:
作用: 设置分组函数类,相同key放到一起,value 放到同一个函数类。