Hadoop-MapReduce-Reduce阶段调用reduce()方法过程源码分析

Reduce.run()

    首先来到的是 Reduce.run(),先初始化 context,然后进入循环,进入判断条件 context.nextKey()

/** Reduces a set of intermediate values which share a key to a smaller set ofvalues. **/
public void run(Context context) throws IOException, InterruptedException {
    
    
    setup(context);
    try {
    
    
      while (context.nextKey()) {
    
    
        reduce(context.getCurrentKey(), context.getValues(), context);   //这里就是调用我们重写的reduce方法
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
    
    
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
    
    
      cleanup(context);
    }
}
public boolean nextKey() throws IOException, InterruptedException {
    
    
    return reduceContext.nextKey();
}

ReduceContextImpl.nextKey()

    进入上面的 nextKey() 就会来到下面的 ReduceContextImpl.nextKey()
    由于 nextKeyIsSame 初始值是 false,所以一开始不会进入while循环。while循环的作用是如果下一个key和当前的key相同,就跳过当前ReduceContextImpl对象的key和value,nextKeyValue() 更新key和value。如果 hasMore==true,进入if,先增加value数,再进入 nextKeyValue()
    当执行完一次reduce方法再来这个地方时,该处的 nextKeyValue() 提前将旧的key和value更新为下一个key和value。

/** Start processing next unique key. */
public boolean nextKey() throws IOException,InterruptedException {
    
    
    while (hasMore && nextKeyIsSame) {
    
    
      nextKeyValue();
    }
    if (hasMore) {
    
    
      if (inputKeyCounter != null) {
    
    
        inputKeyCounter.increment(1);
      }
      return nextKeyValue();
    } else {
    
    
      return false;
    }
}

    hasMore 是是否还有下一个KV对,它在 ReduceContextImpl 对象创建时初始化,hasMore = input.next();
    inputRawKeyValueIterator 的对象,它是一个迭代器,用于遍历全部的KV对,reduce方法迭代会使用这个迭代器。
    nextKeyIsSame 是标记当前key是否与下一个key相同,初始化为 false

public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
    extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
    implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    
    
  private RawKeyValueIterator input;					  //用于遍历全部的KV对
  private Counter inputValueCounter;					  //记录value数
  private Counter inputKeyCounter;						  //记录key数
  private RawComparator<KEYIN> comparator;				  
  private KEYIN key;                                  	  //当前key,在nextKeyValue()中更新
  private VALUEIN value;                              	  //当前value,在nextKeyValue()中更新
  private boolean firstValue = false;                	  //同一个key中的第一个值,nextKeyValue()迭代时会用到
  private boolean nextKeyIsSame = false;             	  //当前key是否与下一个key相同
  private boolean hasMore;                            	  //是否还有下一对KV对
  protected Progressable reporter;
  private Deserializer<KEYIN> keyDeserializer;   		  //nextKeyValue()中反序列化key
  private Deserializer<VALUEIN> valueDeserializer;   	  //nextKeyValue()中反序列化value
  private DataInputBuffer buffer = new DataInputBuffer();
  private BytesWritable currentRawKey = new BytesWritable();
  private ValueIterable iterable = new ValueIterable();   //reduce()方法forEach迭代value的迭代器
  ...
  public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
                           RawKeyValueIterator input, 
                           Counter inputKeyCounter,
                           Counter inputValueCounter,
                           RecordWriter<KEYOUT,VALUEOUT> output,
                           OutputCommitter committer,
                           StatusReporter reporter,
                           RawComparator<KEYIN> comparator,
                           Class<KEYIN> keyClass,
                           Class<VALUEIN> valueClass
                          ) throws InterruptedException, IOException{
    
    
    super(conf, taskid, output, committer, reporter);
    this.input = input;
    this.inputKeyCounter = inputKeyCounter;
    this.inputValueCounter = inputValueCounter;
    this.comparator = comparator;
    this.serializationFactory = new SerializationFactory(conf);
    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
    this.keyDeserializer.open(buffer);
    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
    this.valueDeserializer.open(buffer);
    hasMore = input.next();   //检测是否还有下一个KV对
    this.keyClass = keyClass;
    this.valueClass = valueClass;
    this.conf = conf;
    this.taskid = taskid;
  }
  ...
}

ReduceContextImpl.nextKeyValue()

    nextKeyValue() 的作用是更新当前的key和value。最后如果返回true,则上面 Reduce.run() 就进入while循环,然后就进入我们重写的reduce方法。
    仔细看可以发现它同时更新了key和value,那么传进reduce方法的不是 <xxx, <1,1,1,1,1,1…>>而是<xxx, 1>,<xxx, 1>,<xxx, 1>…
    注意该方法更新了 nextKeyIsSame
    最后一个key和value不进入 if (hasMore) ,nextKeyIsSame为false。

  /**
   * Advance to the next key/value pair.
   */
public boolean nextKeyValue() throws IOException, InterruptedException {
    
    
    if (!hasMore) {
    
    
      key = null;
      value = null;
      return false;
    }
    firstValue = !nextKeyIsSame;
    DataInputBuffer nextKey = input.getKey();   //迭代器遍历获取下一个key
    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
                      nextKey.getLength() - nextKey.getPosition());
    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
    key = keyDeserializer.deserialize(key);   //反序列化更新key
    DataInputBuffer nextVal = input.getValue();
    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
        - nextVal.getPosition());
    value = valueDeserializer.deserialize(value);   //反序列化更新value

    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
    currentValueLength = nextVal.getLength() - nextVal.getPosition();

    if (isMarked) {
    
    
      backupStore.write(nextKey, nextVal);   //存储下一个key和value
    }

    hasMore = input.next();   //迭代器向下迭代一次,判断是否还有下一个
    if (hasMore) {
    
       //最后一个不进入这里,nextKeyIsSame为false
      nextKey = input.getKey();
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                     currentRawKey.getLength(),
                                     nextKey.getData(),
                                     nextKey.getPosition(),
                                     nextKey.getLength() - nextKey.getPosition()
                                         ) == 0;   //标记当前key是否与下一个key相同
    } else {
    
    
      nextKeyIsSame = false;
    }
    inputValueCounter.increment(1);
    return true;   //注意这里
}

    进入上面的 nextKeyIsSame = comparator.compare() 就来到下面这个方法,key1是当前key,key2是下一个key,最下面的 compare() 就是调用我们重写的compare方法(bean实现WritableComparable接口)。

  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    
    
    try {
    
    
      buffer.reset(b1, s1, l1);                   // parse key1
      key1.readFields(buffer);
      
      buffer.reset(b2, s2, l2);                   // parse key2
      key2.readFields(buffer);
      
      buffer.reset(null, 0, 0);                   // clean up reference
    } catch (IOException e) {
    
    
      throw new RuntimeException(e);
    }
    
    return compare(key1, key2);                   // compare them
  }
public int compare(WritableComparable a, WritableComparable b) {
    
    
    return a.compareTo(b);
}

    如果实现了GroupingComparator分组排序,那么上面的 nextKeyIsSame = comparator.compare() 就会来到我们自己写的GroupingComparator中。

public class GroupingComparator extends WritableComparator{
    
    
	
	protected GroupingComparator() {
    
    
		super(Order.class, true);
	}
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
    
    
		Order aOrder =  (Order) a;
		Order bOrder =  (Order) b;
		if (aOrder.getId() > bOrder.getId()) {
    
    
			return 1;
		} else if (aOrder.getId() < bOrder.getId()) {
    
    
			return -1;
		} else {
    
    
			return 0;
		}
	}
}

Reduce.reduce()

    传进reduce方法的key和value就是ReduceContextImpl里的key和value(之前nextKeyValue()更新了KV)。可以看出每次调用reduce方法,都只传入一个key。
    下面的 hasNext()next() 会反复调用,直到当前key没有下一个value。

  /**
   * This method is called once for each key. Most applications will define
   * their reduce class by overriding this method. The default implementation
   * is an identity function.
   */
  @SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    
    
    for(VALUEIN value: values) {
    
    
        context.write((KEYOUT) key, (VALUEOUT) value);
    }
}

    进入forEach,获取 ValueIterator 迭代器。

protected class ValueIterable implements Iterable<VALUEIN> {
    
    
    private ValueIterator iterator = new ValueIterator();
    @Override
    public Iterator<VALUEIN> iterator() {
    
    
      return iterator;
    } 
}

ReduceContextImpl.ValueIterator.hasNext()

    判断是否有下一对KV,第一次执行 hasNext()firstValue==true,nextKeyIsSame则根据实际情况确定。

public boolean hasNext() {
    
    
    try {
    
    
        if (inReset && backupStore.hasNext()) {
    
    
          return true;
        } 
    } catch (Exception e) {
    
    
        e.printStackTrace();
        throw new RuntimeException("hasNext failed", e);
    }
    return firstValue || nextKeyIsSame;
}

ReduceContextImpl.ValueIterator.next()

    该方法返回ReduceContextImpl中的value。
    注意,这个方法第一次直接返回value,因为之前 ReduceContextImpl.nextKey() 调用了 nextKeyValue()。后面都会调用 nextKeyValue() 更新ReduceContextImpl中的key和value,再返回value。
    注意,该方法调用了 nextKeyValue() ,除了更新了key和value,还更新了 nextKeyIsSame。nextKeyIsSame在 ReduceContextImpl.ValueIterator.hasNext()ReduceContextImpl.nextKey() 都用作判断条件。

public VALUEIN next() {
    
    
    if (inReset) {
    
    
        try {
    
    
            if (backupStore.hasNext()) {
    
    
                backupStore.next();
            	DataInputBuffer next = backupStore.nextValue();
            	buffer.reset(next.getData(), next.getPosition(), next.getLength()
                	- next.getPosition());
            	value = valueDeserializer.deserialize(value);
            	return value;
            } else {
    
    
            	inReset = false;
            	backupStore.exitResetMode();
            	if (clearMarkFlag) {
    
    
              		clearMarkFlag = false;
                isMarked = false;
              }
           }
        } catch (IOException e) {
    
    
            e.printStackTrace();
            throw new RuntimeException("next value iterator failed", e);
        }
    } 

    // if this is the first record, we don't need to advance
    //第一次不用更新key和value,之前已经更新过了
    if (firstValue) {
    
    
        firstValue = false;
        return value;
    }
    // if this isn't the first record and the next key is different, they
    // can't advance it here.
    if (!nextKeyIsSame) {
    
    
        throw new NoSuchElementException("iterate past last value");
    }
    // otherwise, go to the next key/value pair
    try {
    
    
        nextKeyValue();   //更新当前key和value
        return value;
    } catch (IOException ie) {
    
    
        throw new RuntimeException("next value iterator failed", ie);
    } catch (InterruptedException ie) {
    
    
        // this is bad, but we can't modify the exception list of java.util
        throw new RuntimeException("next value iterator interrupted", ie);        
    }
}

ReduceContextImpl.nextKey()

    当执行完reduce方法,再次执行while循环的判断条件 context.nextKey(),进入 ReduceContextImpl.nextKey()。一般情况下,nextKey()的while循环是不用进去的,因为之前的reduce()的迭代next()里的 nextKeyValue() 更新了 nextKeyIsSame,结束迭代就说明当前key和下一个key是不一样的。什么时候进入这个while循环呢?当下一个key与当前key相同才进去,比如分组排序时,bean作为key,多个bean按某字段(比如说id)分组排序,当id相同,这个key就相同,然后就需要跳过相同的key(第一次执行完后(第一次是不进入while循环的,因为nextKeyIsSame初始化为false)需要跳过)(进入reduce方法只能是一个key)
    进入if,进入 nextKeyValue()注意,此时已经执行完一次reduce方法,说明此时 nextKeyIsSame==false,那么此次更新的key是下一个key。比如第一次reduce的key是“Java”,那么这次更新的key就不是“Java”了,比如此次key更新为“PHP”。同时如果“PHP”对应有两个或以上个value,nextKeyIsSame 就更新为true。那么 context.nextKey() 返回的就是true,继续执行reduce方法,传入的key是“PHP”。
    最后写完全部KV,hasMore==false ,那么 context.nextKey() 返回的就是false。

/** Start processing next unique key. */
public boolean nextKey() throws IOException,InterruptedException {
    
    
    while (hasMore && nextKeyIsSame) {
    
    
      nextKeyValue();
    }
    if (hasMore) {
    
    
      if (inputKeyCounter != null) {
    
    
        inputKeyCounter.increment(1);
      }
      return nextKeyValue();
    } else {
    
    
      return false;
    }
}

猜你喜欢

转载自blog.csdn.net/H_X_P_/article/details/106071511