目录
Reduce.run()
首先来到的是 Reduce.run(),先初始化 context,然后进入循环,进入判断条件 context.nextKey()。
/** Reduces a set of intermediate values which share a key to a smaller set ofvalues. **/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context); //这里就是调用我们重写的reduce方法
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
public boolean nextKey() throws IOException, InterruptedException {
return reduceContext.nextKey();
}
ReduceContextImpl.nextKey()
进入上面的 nextKey() 就会来到下面的 ReduceContextImpl.nextKey()。
由于 nextKeyIsSame 初始值是 false,所以一开始不会进入while循环。while循环的作用是如果下一个key和当前的key相同,就跳过当前ReduceContextImpl对象的key和value,nextKeyValue() 更新key和value。如果 hasMore==true,进入if,先增加value数,再进入 nextKeyValue()。
当执行完一次reduce方法再来这个地方时,该处的 nextKeyValue() 提前将旧的key和value更新为下一个key和value。
/** Start processing next unique key. */
public boolean nextKey() throws IOException,InterruptedException {
while (hasMore && nextKeyIsSame) {
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue();
} else {
return false;
}
}
hasMore 是是否还有下一个KV对,它在 ReduceContextImpl 对象创建时初始化,hasMore = input.next();。
input 是 RawKeyValueIterator 的对象,它是一个迭代器,用于遍历全部的KV对,reduce方法迭代会使用这个迭代器。
nextKeyIsSame 是标记当前key是否与下一个key相同,初始化为 false。
public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private RawKeyValueIterator input; //用于遍历全部的KV对
private Counter inputValueCounter; //记录value数
private Counter inputKeyCounter; //记录key数
private RawComparator<KEYIN> comparator;
private KEYIN key; //当前key,在nextKeyValue()中更新
private VALUEIN value; //当前value,在nextKeyValue()中更新
private boolean firstValue = false; //同一个key中的第一个值,nextKeyValue()迭代时会用到
private boolean nextKeyIsSame = false; //当前key是否与下一个key相同
private boolean hasMore; //是否还有下一对KV对
protected Progressable reporter;
private Deserializer<KEYIN> keyDeserializer; //nextKeyValue()中反序列化key
private Deserializer<VALUEIN> valueDeserializer; //nextKeyValue()中反序列化value
private DataInputBuffer buffer = new DataInputBuffer();
private BytesWritable currentRawKey = new BytesWritable();
private ValueIterable iterable = new ValueIterable(); //reduce()方法forEach迭代value的迭代器
...
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
Counter inputKeyCounter,
Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
this.serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(buffer);
this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(buffer);
hasMore = input.next(); //检测是否还有下一个KV对
this.keyClass = keyClass;
this.valueClass = valueClass;
this.conf = conf;
this.taskid = taskid;
}
...
}
ReduceContextImpl.nextKeyValue()
nextKeyValue() 的作用是更新当前的key和value。最后如果返回true,则上面 Reduce.run() 就进入while循环,然后就进入我们重写的reduce方法。
仔细看可以发现它同时更新了key和value,那么传进reduce方法的不是 <xxx, <1,1,1,1,1,1…>>而是<xxx, 1>,<xxx, 1>,<xxx, 1>…
注意该方法更新了 nextKeyIsSame。
最后一个key和value不进入 if (hasMore) ,nextKeyIsSame为false。
/**
* Advance to the next key/value pair.
*/
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!hasMore) {
key = null;
value = null;
return false;
}
firstValue = !nextKeyIsSame;
DataInputBuffer nextKey = input.getKey(); //迭代器遍历获取下一个key
currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition());
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
key = keyDeserializer.deserialize(key); //反序列化更新key
DataInputBuffer nextVal = input.getValue();
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
- nextVal.getPosition());
value = valueDeserializer.deserialize(value); //反序列化更新value
currentKeyLength = nextKey.getLength() - nextKey.getPosition();
currentValueLength = nextVal.getLength() - nextVal.getPosition();
if (isMarked) {
backupStore.write(nextKey, nextVal); //存储下一个key和value
}
hasMore = input.next(); //迭代器向下迭代一次,判断是否还有下一个
if (hasMore) {
//最后一个不进入这里,nextKeyIsSame为false
nextKey = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0; //标记当前key是否与下一个key相同
} else {
nextKeyIsSame = false;
}
inputValueCounter.increment(1);
return true; //注意这里
}
进入上面的 nextKeyIsSame = comparator.compare() 就来到下面这个方法,key1是当前key,key2是下一个key,最下面的 compare() 就是调用我们重写的compare方法(bean实现WritableComparable接口)。
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);
buffer.reset(null, 0, 0); // clean up reference
} catch (IOException e) {
throw new RuntimeException(e);
}
return compare(key1, key2); // compare them
}
public int compare(WritableComparable a, WritableComparable b) {
return a.compareTo(b);
}
如果实现了GroupingComparator分组排序,那么上面的 nextKeyIsSame = comparator.compare() 就会来到我们自己写的GroupingComparator中。
public class GroupingComparator extends WritableComparator{
protected GroupingComparator() {
super(Order.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Order aOrder = (Order) a;
Order bOrder = (Order) b;
if (aOrder.getId() > bOrder.getId()) {
return 1;
} else if (aOrder.getId() < bOrder.getId()) {
return -1;
} else {
return 0;
}
}
}
Reduce.reduce()
传进reduce方法的key和value就是ReduceContextImpl里的key和value(之前nextKeyValue()更新了KV)。可以看出每次调用reduce方法,都只传入一个key。
下面的 hasNext() 和 next() 会反复调用,直到当前key没有下一个value。
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
进入forEach,获取 ValueIterator 迭代器。
protected class ValueIterable implements Iterable<VALUEIN> {
private ValueIterator iterator = new ValueIterator();
@Override
public Iterator<VALUEIN> iterator() {
return iterator;
}
}
ReduceContextImpl.ValueIterator.hasNext()
判断是否有下一对KV,第一次执行 hasNext() 的 firstValue==true,nextKeyIsSame则根据实际情况确定。
public boolean hasNext() {
try {
if (inReset && backupStore.hasNext()) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("hasNext failed", e);
}
return firstValue || nextKeyIsSame;
}
ReduceContextImpl.ValueIterator.next()
该方法返回ReduceContextImpl中的value。
注意,这个方法第一次直接返回value,因为之前 ReduceContextImpl.nextKey() 调用了 nextKeyValue()。后面都会调用 nextKeyValue() 更新ReduceContextImpl中的key和value,再返回value。
注意,该方法调用了 nextKeyValue() ,除了更新了key和value,还更新了 nextKeyIsSame。nextKeyIsSame在 ReduceContextImpl.ValueIterator.hasNext() 和 ReduceContextImpl.nextKey() 都用作判断条件。
public VALUEIN next() {
if (inReset) {
try {
if (backupStore.hasNext()) {
backupStore.next();
DataInputBuffer next = backupStore.nextValue();
buffer.reset(next.getData(), next.getPosition(), next.getLength()
- next.getPosition());
value = valueDeserializer.deserialize(value);
return value;
} else {
inReset = false;
backupStore.exitResetMode();
if (clearMarkFlag) {
clearMarkFlag = false;
isMarked = false;
}
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("next value iterator failed", e);
}
}
// if this is the first record, we don't need to advance
//第一次不用更新key和value,之前已经更新过了
if (firstValue) {
firstValue = false;
return value;
}
// if this isn't the first record and the next key is different, they
// can't advance it here.
if (!nextKeyIsSame) {
throw new NoSuchElementException("iterate past last value");
}
// otherwise, go to the next key/value pair
try {
nextKeyValue(); //更新当前key和value
return value;
} catch (IOException ie) {
throw new RuntimeException("next value iterator failed", ie);
} catch (InterruptedException ie) {
// this is bad, but we can't modify the exception list of java.util
throw new RuntimeException("next value iterator interrupted", ie);
}
}
ReduceContextImpl.nextKey()
当执行完reduce方法,再次执行while循环的判断条件 context.nextKey(),进入 ReduceContextImpl.nextKey()。一般情况下,nextKey()的while循环是不用进去的,因为之前的reduce()的迭代next()里的 nextKeyValue() 更新了 nextKeyIsSame,结束迭代就说明当前key和下一个key是不一样的。什么时候进入这个while循环呢?当下一个key与当前key相同才进去,比如分组排序时,bean作为key,多个bean按某字段(比如说id)分组排序,当id相同,这个key就相同,然后就需要跳过相同的key(第一次执行完后(第一次是不进入while循环的,因为nextKeyIsSame初始化为false)需要跳过)(进入reduce方法只能是一个key)
进入if,进入 nextKeyValue()。注意,此时已经执行完一次reduce方法,说明此时 nextKeyIsSame==false,那么此次更新的key是下一个key。比如第一次reduce的key是“Java”,那么这次更新的key就不是“Java”了,比如此次key更新为“PHP”。同时如果“PHP”对应有两个或以上个value,nextKeyIsSame 就更新为true。那么 context.nextKey() 返回的就是true,继续执行reduce方法,传入的key是“PHP”。
最后写完全部KV,hasMore==false ,那么 context.nextKey() 返回的就是false。
/** Start processing next unique key. */
public boolean nextKey() throws IOException,InterruptedException {
while (hasMore && nextKeyIsSame) {
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue();
} else {
return false;
}
}