一、MR的宏观流程
1.两个阶段 Map阶段和Reduce阶段
2.一个MapReduce任务为一个Job,一个Job在执行不同的阶段时,启动若干Task
Map阶段启动的进程称为MapTask
MapTask启动的数量取决于切片数,切N片,启动N个MapTask
Reduce阶段启动的进程称为ReduceTask
ReduceTask启动的进程数量由开发人员自己设置
Job.setNumReduceTask(int n);
3.在学习MR时,在Map和Reduce之间,有一个讲Map输出的数据进行分区和排序和传输的过程
这个过程很重要,因此将这个阶段,单独命名为shuffle!
在两个阶段的划分的基础上,再细分为 Map----------shuffle------------Reduce
shuffle阶段不会启动单独的进程来完成,shuffle横跨了MapTask和ReduceTask!
4.官方的阶段划分
Map阶段: map,sort
Reduce阶段: copy,sort,reduce
map(map阶段)---sort|copy|sort(shuffle阶段)---reduce(reduce阶段)
二、MapReduce有两种运行模式
1.local(本地模式): 使用LocalJobRunner提交时,Job就在本地运行!
在本地以多线程模拟MapTask和ReduceTask
2.YARN(在YARN上运行): 使用YARNRunner提交时,此时Job在运行之前,会初始化
MRAppMaster进程,由这个进程向RM申请运行所有Task的资源!
RM将申请分配给NM,NM提供的资源会封装到Container中,此时启动Task进程!
三、MR运行的宏观流程
1.提交Job的阶段
a)job.waitForCompletion()
b)确定job的状态不是running,构建Cluster(代表集群)
c)确定Job的运行模式,如果是本地模型,使用LocalJobRunner提交运行
否则使用YARNRunner提交运行
b)使用提交器,提交Job
2.Job在提交之前的预处理阶段 submitJobInternal()
a)获取输出格式,调用输出格式的checkOutputSpecs()检查输出目录
不能为空,也不能已经存在
b)确定当前Job存放临时文件的目录submitJobDir
如果是本地文件系统,默认在idea所在的磁盘的tmp目录下
如果是HDFS,默认在hdfs的/tmp下
c) int maps = writeSplits(job, submitJobDir);
切片,生成切片信息,设置MapTask的数量
d) 在Job的作业目录上生成Job的配置信息文件
job.xml: 记录了Job所有的配置,这些配置来自于xxx-default.xml和xxx-site.xml
job.split:切片对象
job.splitmetainfo:切片对象的属性说明
如果此方法结束,相当于在YARN上运行时,此时准备申请MRAppMaster进程!
3.构建可以运行的Job
LocalJobRunner.Job: 功能类似于MRAppMaster,负责整个Job的运行的申请,提交等操作!
a)new Job()
b)执行Job.start() 相当于启动了MRAppMaster
c) 进入Job.run() 相当于开始让MRAppMaster干活
4.开始运行Map阶段
MRAppMaster是Job的管理者,一个Job启动一个MRAppMaster,负责JobMap和Reduce阶段Task的资源申请,运行,监控等。
LocalJobRunner.Job.run()
public void run() {
try {
//根据之前生成的job.split和job.splitinfo文件创建TaskSplitMetaInfo[]
// 之前切几片,TaskSplitMetaInfo[]中就有几个切片的元数据对象
TaskSplitMetaInfo[] taskSplitMetaInfos =
SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
// Map用来保存所有MapTask生成的结果文件的影响信息
Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
// 根据有几个切片,就创建几个MapTask的线程
List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
taskSplitMetaInfos, jobId, mapOutputFiles);
initCounters(mapRunnables.size(), numReduceTasks);
ExecutorService mapService = createMapExecutor();
// 运行MapTask
runTasks(mapRunnables, mapService, "map");
// 如果有reduce阶段,运行ReduceTasks
try {
if (numReduceTasks > 0) {
List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
jobId, mapOutputFiles);
ExecutorService reduceService = createReduceExecutor();
//运行ReduceTask
runTasks(reduceRunnables, reduceService, "reduce");
}
} finally {
for (MapOutputFile output : mapOutputFiles.values()) {
output.removeAll();
}
}
}
四、核心配置
1.如何确定当前Job使用什么样的输出格式?
由job.getOutputFormatClass(),此方法从参数中默认读mapreduce.job.outputformat.class
,如果没有配置,默认使用TextOutputFormat
public Class<? extends OutputFormat<?,?>> getOutputFormatClass()
throws ClassNotFoundException {
return (Class<? extends OutputFormat<?,?>>)
conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
}
默认在自己不设置的的情况下,默认使用TextOutputFormat.class!
2.如何确定有没有reduce阶段
Job.getNumReduceTask()获取当前Job要启动多少个ReduceTask!
//mapreduce.job.reduces
public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
3.如何获取Mapper
public Class<? extends Mapper<?,?,?,?>> getMapperClass()
throws ClassNotFoundException {
return (Class<? extends Mapper<?,?,?,?>>)
//mapreduce.job.map.class
conf.getClass(MAP_CLASS_ATTR, Mapper.class);
}
4.如何获取输入格式
尝试获取配置中设置的mapreduce.job.inputformat.class的值,如果没有设置默认使用TextInputFormat
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
默认TextInputFormat使用LineRecordReader!
5.切片的说明
通常情况下,如果是从文件中获取切片,那么使用FileSplit作为切片对象!
FileSplit有三个关键的属性:
Path p: 当前切片属于哪个文件
int start: 当前切片的起始位置是从这个文件的哪个位置开始
int length: 当前切片的长度
从指定文件的start位置开始,读取length的长度,就属于当前切片!
切片和块没有任何关系!只不过每次从指定文件中读取每片数据时,数据实际上以块的形式存储在HDFS,
读取数据时,会访问到指定的块!具体哪个切片对应哪些块,取决于切片如何切,没有直接的对应关系!
注意:默认情况下,使用FileInputFormat,在切片时,默认以块大小作为片大小!
每片刚好是读取一个块!
五、Map阶段的运行
1.MapTaskRunnable.run()
查看LocalJobRunner.Job.MapTaskRunnable,类似于封装了Container,在其中运行了MapTask!
public void run() {
try {
TaskAttemptID mapId = new TaskAttemptID(new TaskID(
jobId, TaskType.MAP, taskId), 0);
LOG.info("Starting task: " + mapId);
mapIds.add(mapId);
//创建一个 MapTask对象,这个对象代表当前的MapTask,负责这个Task的总的运行流程
MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
info.getSplitIndex(), 1);
......
//运行MapTask
map.run(localConf, Job.this);
}
2.MapTask.run()
注意: 如果有reduce,那么Map阶段分为两个阶段
map------67%
sort-------33%
如果没有reduce,Map阶段只有map一个阶段
只有有reduce阶段时,数据才会排序!如果没有reduce阶段数据是按照读入的顺序处理后,输出!
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
//定义了整个 MapTask的阶段划分
if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
.......
if (useNewApi) {
// 开始运行Mapper
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
3.运行Mapper
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper 一个MapTask只会创建一个Mapper对象
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format 创建输入格式对象
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split 重建当前MapTask的切片
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
//构建MapTask的输入对象,负责整个MapTask的输入工作,RecordReader由input负责进行调用读取数据
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
//构建MapTask的输出对象
// get an output object
if (job.getNumReduceTasks() == 0) {
//如果没有reduce阶段,由Map收集输出的数据,直接输出
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
// 创建记录收集器
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
//构建Mapper中使用的context对象,代表MapTask的上下文(来龙去脉),
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
// 会执行输入过程中所需要组件的一系列初始化
// 调用RecordReader.initialize()
input.initialize(split, mapperContext);
// 调用自己编写的Mapper的run()
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
4.Mapper.run()
Mapper在运行时:
KEYIN-VALUEIN: 取决于输入格式的RR的设置!
默认用LongWritable-Text
public void run(Context context) throws IOException, InterruptedException {
//在map()之前只被调用一次
setup(context);
try {
//调用RecoredReader的nextKeyValue()
while (context.nextKeyValue()) {
//每读取一对输入的KEYIN-VALUEIN,执行一次map记录
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
//在map()之后被调用1次
cleanup(context);
}
}
六、MapTask的shuffle
1.缓冲区对象的构建
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
//创建输出中的缓冲区对象,这个缓冲区不仅可以用来收集数据还会对数据进行排序
collector = createSortingCollector(job, reporter);
}
}
2.确定Partitioner
// 根据reduceTask的数量,确定Map阶段总的分区数(不等于实际上数据的分区)
partitions = jobContext.getNumReduceTasks();
// reduceTask个数>1,就使用用户配置的Partitioner
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
从配置中获取用户定义的Partitioner
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
//获取mapreduce.job.partitioner.class值,如果没有设置,使用HashPartitioner作为默认
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
HashPartitioner的工作原理: key相同的会分到同一个区
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
3.缓冲区的初始化
默认使用MapOutPutCollector作为缓冲区对象!
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//sanity checks
// 从配置中获取溢写的百分比,默认读取mapreduce.map.sort.spill.percent,如果没有配置,使用
// 0.8作为百分比
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
// 缓冲区的初始化大小,默认读取mapreduce.task.io.sort.mb,如果没有配置,默认使用100
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
// 默认使用快排
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);
// k/v serialization
// 确定key的比较器
comparator = job.getOutputKeyComparator();
// 获取Mapper输出的key-value的类型
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job);
//根据key的类型返回指定的序列化器
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
// compression 在mapper的输出阶段使用压缩
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
} else {
codec = null;
}
// combiner 设置combiner
final Counters.Counter combineInputCounter =
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
}
4.确定key的比较器
public RawComparator getOutputKeyComparator() {
//尝试获取参数中配置的mapreduce.job.output.key.comparator.class,作为比较器,
//如果没有定义,默认为null,定义的话必须是RawComparator类型
Class<? extends RawComparator> theClass = getClass(
JobContext.KEY_COMPARATOR, null, RawComparator.class);
//如果用户配置,就实例化此类型的一个对象
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this);
// 判断Mapper输出的key是否是writableComparable类型的子类,
//如果是,就默认由系统提供比较器,如果不是就抛异常!
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}
如何在Map阶段定义key的排序:
①要比较的字段,设置为key
②提供一个RawComparator的key的比较器!
或让key实现WritableComparable接口!
根据比较器的compareTo()对key进行比较!
如果比较的条件是多个,称为二次排序!
5.序列化
①什么时候需要序列化
有reduce阶段时,需要Map输出的key-value实现序列化
②怎么实现
实现Writable接口!
③是否必须实现?
不是!
④什么情况下,可以不实现Writable接口,为什么一定要实现Writable接口
只有实现了Writable接口,hadoop才会自动提供基于Writable接口的序列化器!
如果自己提供序列化器,就可以不是先Writable接口
6.总结
①先分区,每个输出的key-value在写出时,先调用partitioner计算分区号,再收集到缓冲区中
②数据被收集进入缓冲区中,当缓冲区达到溢写的条件时,会调用 Sorter对当前缓冲区中的所有的数据进行排序
只排索引(记录排好序的索引的信息)
③按照分区号,从0号开始依次溢写,每次溢写之前,如果设置了Combiner,先Conbine再溢写。
每次溢写会产生一个spillx.out的文件
④所有的数据全部收集到缓冲区后,会执行最后一次flush(),将不满足溢写条件的缓存中的残余数据再次溢写
⑤flush()之后,会调用mergeParts()进行合并
在合并时,先将多个文件的同一个分区的数据进行合并,合并后再排序!
之后讲所有分区的数据溢写为一个final.out文件!
在溢写之前,如果设置了Combiner,并且之前溢写的片段个数>=3,此时会再次调用Combiner,Combine后再溢写
一句话: 分区—排序—按照分区号溢写(combine)… 合并—溢写[combine]
七、ReduceTask的运行
1.ReduceTaskRunable.run()
public void run() {
try {
TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
jobId, TaskType.REDUCE, taskId), 0);
//创建reduceTask对象
ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
reduceId, taskId, mapIds.size(), 1);
......
try {
// 运行reduceTask.run()
reduce.run(localConf, Job.this);
}
2.ReduceTask.run()
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
//阶段定义划分
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
.....
// Initialize the codec 如果MapTask写出有使用压缩,在此时获取压缩格式的编解码器进行解压缩
codec = initCodec();
RawKeyValueIterator rIter = null;
//定义shuffle阶段的消费者线程,从MapTask输出的结果中将指定分区的数据拷贝到ReduceTask
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
// 定义combiner,在reduceTask端合并多个MapTask同一分区的数据时,如果reduceTask内存不够
//会发生溢写,在每次溢写前,还会调用combiner!
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
//初始化shuffle线程,调用其run()
rIter = shuffleConsumerPlugin.run();
// free up the data structures shuffle阶段已经完成了copy过程
mapOutputFilesOnDisk.clear();
// 在shuffle中,sort也已经完成。已经按照Mapper的输出,对所有的数据进行了整体的排序
sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
//获取Mapper输出的key-value的类型
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
//定义分组比较器
RawComparator comparator = job.getOutputValueGroupingComparator();
if (useNewApi) {
//运行Reducer
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
3.获取分组比较器
public RawComparator getOutputValueGroupingComparator() {
//先获取用户定义的比较器,从配置中获取mapreduce.job.output.group.comparator.class
//参数,必须是RawComparator类型
Class<? extends RawComparator> theClass = getClass(
JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
//如果用户没有定义,默认使用Map阶段key的比较器
if (theClass == null) {
return getOutputKeyComparator();
}
//如果用户定义了,就使用用户的比较器
return ReflectionUtils.newInstance(theClass, this);
}
4.runNewReducer
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
// wrap value iterator to report progress.
//封装key-value的迭代器,让迭代器可以报告进度
final RawKeyValueIterator rawIter = rIter;
rIter = new RawKeyValueIterator() {
public void close() throws IOException {
rawIter.close();
}
//每次迭代key-value时,并不是将数据读取后直接封装为key-value
//而是获取当前key-value的byte[]的内容,再使用反序列化
//把这部分内容的属性设置到key-value的实例中
public DataInputBuffer getKey() throws IOException {
return rawIter.getKey();
}
public Progress getProgress() {
return rawIter.getProgress();
}
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
public boolean next() throws IOException {
boolean ret = rawIter.next();
reporter.setProgress(rawIter.getProgress().getProgress());
return ret;
}
};
// 创建上下文
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(), reporter);
// 实例化reducer对象,一个reduceTask只会创建一个reducer对象
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
job.setBoolean("mapred.skip.on", isSkipping());
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
//reduce中的context
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputKeyCounter,
reduceInputValueCounter,
trackedRW,
committer,
reporter, comparator, keyClass,
valueClass);
try {
//运行reducer.run()
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
}
5.Reducer.run()
public void run(Context context) throws IOException, InterruptedException {
//在reduce()之前,调用一次setUp()
setup(context);
try {
//判断数据中是否有和当前读取的key相同的key-value,如果读到了相同的key-value,进入一次reduce()
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
//在reduce()之前,调用一次cleanUp()
cleanup(context);
}
}
八、自定义组件
1.自定义输入格式
核心提供一个RecordReader
initialize(): 不需要手动调用,框架在运行时,在进入 Mapper处理之前会自动调用!
boolean nextKeyValue(): 读取输入的记录,封装为key-value对,如果封装完成,返回true,否则返回false
只有返回true,才会调用map()
2.自定义Partitioner
继承Partitioner,实现public int getPartition(Text key, FlowBean value, int numPartitions)!
注意: 分区号必须为int型的值,且必须符合 0<= partitionNum < numPartitions
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
}
3.实现RawComparator
对Flowbean的sumFlow不使用Key实现WriableComparable接口的方式!进行降序的全排序!
/**
* @author: WangMC
* @date: 2020/3/2 21:56
* @description:
*/
public class RawComparatorImpl implements RawComparator<FlowBean> {
private FlowBean key1 = new FlowBean();
private FlowBean key2 = new FlowBean();
private DataInputBuffer buffer = new DataInputBuffer();
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);
buffer.reset(null, 0, 0); // clean up reference
} catch (IOException e) {
throw new RuntimeException(e);
}
return compare(key1, key2); // compare them
}
@Override
public int compare(FlowBean o1, FlowBean o2) {
int result;
// 按照总流量大小,倒序排列
if (o1.getSumFlow() > o2.getSumFlow()) {
result = -1;
} else if (o1.getSumFlow() < o2.getSumFlow()) {
result = 1;
} else {
// 二次排序 按下行流量倒序排序
result = o1.getDownFlow() > o2.getDownFlow() ? -1 : 1;
}
return result;
}
}