hadoop的文件类型

hadoop的基本数据类型就是其他语言对应数据类型的封装,这里不再多说。hadoop最重要的就是对文件进行操作,一个文件必须被hadoop序列化读入、存储,才能被接下来的map和reduce过程执行,下面来看文件类型的数据的读取和存储方式

1.SequenceFile文件

SequenceFile存储的是key-value形式的纯文本数据,按照压缩方式分为三种:无压缩,仅压缩value不压缩key以及key和value都压缩(块压缩)。
SequenceFile文件对无压缩的方法是Writer,其私有变量如下

 * Configuration conf//是hadoop的配置对象
   * FSDataOutputStream out//是文件的输出流
   * boolean ownOutputStream //用于表示是否使用自己的输出流
   * DataOutputBuffer buffer=new DataOutputBuffer()//输出缓存区
   * Class keyClass//是输出key所代表的类型
   * Class valueClass//是输出value所代表的类型
   * private boolean compress//用于标识是否进行压缩
   * CompressionCodec codec//是压缩编码解码器
   * CompressionOutputStream deflateFilter//是压缩输出流
   * DataOutputStream deflateOut//未压缩的输出流
   * Metadata metadata//文件元数据
   * Compress compress//文件输出所用的压缩器
   * Serializer keySerializer//对于key进行序列化的序列化器
   * Serializer uncompressedValSerializer//对于未压缩的value的序列化器
   * Serializer compressedValSerializer//对压缩的value进行序列化的序列化器
   * long lastSyncPos//最后一个同步位置
   * byte[] sync//存储最后一个同步字节

Writer()方法可通过多种不同的参数进行实例化,这里不再细说,请看下面源码部分。还可以通过createWriter()静态方法创建SequenceFile对象,并返SequenceFile.Writer实例,该静态方法有多个重载版本,详见源码部分。
SequenceFile对另外两种压缩形式的处理分别是是RecordCompressWriter和BlockCompressWriter,他们继承自Writer
此外SequenceFile中还有sortr类,是对key-value的一个整理排序,其输出存储在MapFille中。

package org.apache.hadoop.io;

import java.io.*;
import java.util.*;
import java.rmi.server.UID;
import java.security.MessageDigest;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.MergeSort;
import org.apache.hadoop.util.PriorityQueue;

/*
 @see CompressionCodec
 */
public class SequenceFile {
  private static final Log LOG = LogFactory.getLog(SequenceFile.class);

  private SequenceFile() {}                         // no public ctor

  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;//标识压缩块的版本
  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;//标识客户端压缩的版本
  private static final byte VERSION_WITH_METADATA = (byte)6;//用于表示带有元数据的版本
  private static byte[] VERSION = new byte[] { //标识组成版本的字节数组
    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
  };

  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries同步的标识
  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 同步hash中的字节数
  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash同步标识的大小

  /** The number of bytes between sync points.*/
  public static final int SYNC_INTERVAL = 100*SYNC_SIZE;//两同步点之间的字节数

  /** 
   * The compression type used to compress key/value pairs in the 
   * {@link SequenceFile}.
   * 
   * @see SequenceFile.Writer
   */
  public static enum CompressionType {
    /** Do not compress records. */
    NONE, //没有压缩
    /** Compress values only, each separately. */
    RECORD,//只压缩value,不压缩Key
    /** Compress sequences of records together in blocks. */
    BLOCK//key和value都压缩
  }  //枚举型的三种压缩形式


  @Deprecated
  static public CompressionType getCompressionType(Configuration job) {
    String name = job.get("io.seqfile.compression.type");
    return name == null ? CompressionType.RECORD : 
      CompressionType.valueOf(name);
  }//返回压缩类型

  @Deprecated
  static public void setCompressionType(Configuration job, 
                                        CompressionType val) {
    job.set("io.seqfile.compression.type", val.toString());
  }//设置压缩类型

  public static Writer 
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass) 
    throws IOException {
    return createWriter(fs, conf, name, keyClass, valClass,
                        getCompressionType(conf));
  }


  public static Writer 
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass, CompressionType compressionType) 
    throws IOException {
    return createWriter(fs, conf, name, keyClass, valClass,
            fs.getConf().getInt("io.file.buffer.size", 4096),
            fs.getDefaultReplication(), fs.getDefaultBlockSize(),
            compressionType, new DefaultCodec(), null, new Metadata());
  }

 SequenceFile Writer.
   * @throws IOException
   */
  public static Writer
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass, CompressionType compressionType,
                 Progressable progress) throws IOException {
    return createWriter(fs, conf, name, keyClass, valClass,
            fs.getConf().getInt("io.file.buffer.size", 4096),
            fs.getDefaultReplication(), fs.getDefaultBlockSize(),
            compressionType, new DefaultCodec(), progress, new Metadata());
  }

  public static Writer 
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass, 
                 CompressionType compressionType, CompressionCodec codec) 
    throws IOException {
    return createWriter(fs, conf, name, keyClass, valClass,
            fs.getConf().getInt("io.file.buffer.size", 4096),
            fs.getDefaultReplication(), fs.getDefaultBlockSize(),
            compressionType, codec, null, new Metadata());
  }


  public static Writer
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass, 
                 CompressionType compressionType, CompressionCodec codec,
                 Progressable progress, Metadata metadata) throws IOException {
    return createWriter(fs, conf, name, keyClass, valClass,
            fs.getConf().getInt("io.file.buffer.size", 4096),
            fs.getDefaultReplication(), fs.getDefaultBlockSize(),
            compressionType, codec, progress, metadata);
  }


  public static Writer
    createWriter(FileSystem fs, Configuration conf, Path name,
                 Class keyClass, Class valClass, int bufferSize,
                 short replication, long blockSize,
                 CompressionType compressionType, CompressionCodec codec,
                 Progressable progress, Metadata metadata) throws IOException {
    if ((codec instanceof GzipCodec) &&
        !NativeCodeLoader.isNativeCodeLoaded() &&
        !ZlibFactory.isNativeZlibLoaded(conf)) {
      throw new IllegalArgumentException("SequenceFile doesn't work with " +
                                         "GzipCodec without native-hadoop code!");
    }

    Writer writer = null;

    if (compressionType == CompressionType.NONE) {
      writer = new Writer(fs, conf, name, keyClass, valClass,
                          bufferSize, replication, blockSize,
                          progress, metadata);
    } else if (compressionType == CompressionType.RECORD) {
      writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,
                                        bufferSize, replication, blockSize,
                                        codec, progress, metadata);
    } else if (compressionType == CompressionType.BLOCK){
      writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,
                                       bufferSize, replication, blockSize,
                                       codec, progress, metadata);
    }

    return writer;
  }

  public static Writer
    createWriter(FileSystem fs, Configuration conf, Path name,
                 Class keyClass, Class valClass, int bufferSize,
                 short replication, long blockSize, boolean createParent,
                 CompressionType compressionType, CompressionCodec codec,
                 Metadata metadata) throws IOException {
    if ((codec instanceof GzipCodec) &&
        !NativeCodeLoader.isNativeCodeLoaded() &&
        !ZlibFactory.isNativeZlibLoaded(conf)) {
      throw new IllegalArgumentException("SequenceFile doesn't work with " +
                                         "GzipCodec without native-hadoop code!");
    }


    FSDataOutputStream fsos;
    if (createParent) {
      fsos = fs.create(name, true, bufferSize, replication, blockSize);
    } else {
      fsos = fs.createNonRecursive(name, true, bufferSize, replication,
          blockSize, null);
    }

    switch (compressionType) {
    case NONE:
      return new Writer(conf, fsos, keyClass, valClass, metadata).ownStream();
    case RECORD:
      return new RecordCompressWriter(conf, fsos, keyClass, valClass, codec,
          metadata).ownStream();
    case BLOCK:
      return new BlockCompressWriter(conf, fsos, keyClass, valClass, codec,
          metadata).ownStream();
    default:
      return null;
    }
  } 

  public static Writer
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass, 
                 CompressionType compressionType, CompressionCodec codec,
                 Progressable progress) throws IOException {
    Writer writer = createWriter(fs, conf, name, keyClass, valClass, 
                                 compressionType, codec, progress, new Metadata());
    return writer;
  }

  private static Writer
    createWriter(Configuration conf, FSDataOutputStream out, 
                 Class keyClass, Class valClass, boolean compress, boolean blockCompress,
                 CompressionCodec codec, Metadata metadata)
    throws IOException {
    if (codec != null && (codec instanceof GzipCodec) && 
        !NativeCodeLoader.isNativeCodeLoaded() && 
        !ZlibFactory.isNativeZlibLoaded(conf)) {
      throw new IllegalArgumentException("SequenceFile doesn't work with " +
                                         "GzipCodec without native-hadoop code!");
    }

    Writer writer = null;

    if (!compress) {
      writer = new Writer(conf, out, keyClass, valClass, metadata);
    } else if (compress && !blockCompress) {
      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
    } else {
      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
    }

    return writer;
  }


  private static Writer
  createWriter(FileSystem fs, Configuration conf, Path file, 
               Class keyClass, Class valClass, 
               boolean compress, boolean blockCompress,
               CompressionCodec codec, Progressable progress, Metadata metadata)
  throws IOException {
  if (codec != null && (codec instanceof GzipCodec) && 
      !NativeCodeLoader.isNativeCodeLoaded() && 
      !ZlibFactory.isNativeZlibLoaded(conf)) {
    throw new IllegalArgumentException("SequenceFile doesn't work with " +
                                       "GzipCodec without native-hadoop code!");
  }

  Writer writer = null;

  if (!compress) {
    writer = new Writer(fs, conf, file, keyClass, valClass, progress, metadata);
  } else if (compress && !blockCompress) {
    writer = new RecordCompressWriter(fs, conf, file, keyClass, valClass, 
                                      codec, progress, metadata);
  } else {
    writer = new BlockCompressWriter(fs, conf, file, keyClass, valClass, 
                                     codec, progress, metadata);
  }

  return writer;
}

  public static Writer
    createWriter(Configuration conf, FSDataOutputStream out, 
                 Class keyClass, Class valClass, CompressionType compressionType,
                 CompressionCodec codec, Metadata metadata)
    throws IOException {
    if ((codec instanceof GzipCodec) && 
        !NativeCodeLoader.isNativeCodeLoaded() && 
        !ZlibFactory.isNativeZlibLoaded(conf)) {
      throw new IllegalArgumentException("SequenceFile doesn't work with " +
                                         "GzipCodec without native-hadoop code!");
    }

    Writer writer = null;

    if (compressionType == CompressionType.NONE) {
      writer = new Writer(conf, out, keyClass, valClass, metadata);
    } else if (compressionType == CompressionType.RECORD) {
      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
    } else if (compressionType == CompressionType.BLOCK){
      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
    }

    return writer;
  }


  public static Writer
    createWriter(Configuration conf, FSDataOutputStream out, 
                 Class keyClass, Class valClass, CompressionType compressionType,
                 CompressionCodec codec)
    throws IOException {
    Writer writer = createWriter(conf, out, keyClass, valClass, compressionType,
                                 codec, new Metadata());
    return writer;
  }


  /** The interface to 'raw' values of SequenceFiles. */
  public static interface ValueBytes {

    /** Writes the uncompressed bytes to the outStream.
     * @param outStream : Stream to write uncompressed bytes into.
     * @throws IOException
     */
    public void writeUncompressedBytes(DataOutputStream outStream)
      throws IOException;

    public int getSize();
  }

  private static class UncompressedBytes implements ValueBytes {
    private int dataSize;
    private byte[] data;

    private UncompressedBytes() {
      data = null;
      dataSize = 0;
    }

    private void reset(DataInputStream in, int length) throws IOException {
      data = new byte[length];
      dataSize = -1;

      in.readFully(data);
      dataSize = data.length;
    }

    public int getSize() {
      return dataSize;
    }

    public void writeUncompressedBytes(DataOutputStream outStream)
      throws IOException {
      outStream.write(data, 0, dataSize);
    }

    public void writeCompressedBytes(DataOutputStream outStream) 
      throws IllegalArgumentException, IOException {
      throw 
        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
    }

  } // UncompressedBytes

  private static class CompressedBytes implements ValueBytes {
    private int dataSize;
    private byte[] data;
    DataInputBuffer rawData = null;
    CompressionCodec codec = null;
    CompressionInputStream decompressedStream = null;

    private CompressedBytes(CompressionCodec codec) {
      data = null;
      dataSize = 0;
      this.codec = codec;
    }

    private void reset(DataInputStream in, int length) throws IOException {
      data = new byte[length];
      dataSize = -1;

      in.readFully(data);
      dataSize = data.length;
    }

    public int getSize() {
      return dataSize;
    }

    public void writeUncompressedBytes(DataOutputStream outStream)
      throws IOException {
      if (decompressedStream == null) {
        rawData = new DataInputBuffer();
        decompressedStream = codec.createInputStream(rawData);
      } else {
        decompressedStream.resetState();
      }
      rawData.reset(data, 0, dataSize);

      byte[] buffer = new byte[8192];
      int bytesRead = 0;
      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
        outStream.write(buffer, 0, bytesRead);
      }
    }

    public void writeCompressedBytes(DataOutputStream outStream) 
      throws IllegalArgumentException, IOException {
      outStream.write(data, 0, dataSize);
    }

  } // CompressedBytes

  /**
   * The class encapsulating with the metadata of a file.
   * The metadata of a file is a list of attribute name/value
   * pairs of Text type.
   *
   */
  public static class Metadata implements Writable {

    private TreeMap<Text, Text> theMetadata;//存放元数据的map,map的key-value都是Text

    public Metadata() {
      this(new TreeMap<Text, Text>());
    }

    public Metadata(TreeMap<Text, Text> arg) {
      if (arg == null) {
        this.theMetadata = new TreeMap<Text, Text>();
      } else {
        this.theMetadata = arg;
      }
    }//以上为构造函数

    public Text get(Text name) {
      return this.theMetadata.get(name);
    }//get方法用于根据name获取元数据的信息

    public void set(Text name, Text value) {
      this.theMetadata.put(name, value);
    }//set方法用于设置元数据信息

    public TreeMap<Text, Text> getMetadata() {
      return new TreeMap<Text, Text>(this.theMetadata);
    }//该方法用于取得所有的元数据信息

    public void write(DataOutput out) throws IOException {
      out.writeInt(this.theMetadata.size());//元数据map的大小
      Iterator<Map.Entry<Text, Text>> iter =
        this.theMetadata.entrySet().iterator();
      while (iter.hasNext()) {  //输出所有的key-value
        Map.Entry<Text, Text> en = iter.next();
        en.getKey().write(out);
        en.getValue().write(out);
      }
    }//将元数据序列化

    public void readFields(DataInput in) throws IOException {
      int sz = in.readInt();//输入数据的大小
      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
      this.theMetadata = new TreeMap<Text, Text>();
      for (int i = 0; i < sz; i++) {
        Text key = new Text();
        Text val = new Text();
        key.readFields(in);
        val.readFields(in);
        this.theMetadata.put(key, val);
      }    
    }//将元数据反序列化为对象

    public boolean equals(Metadata other) {
      if (other == null) return false;
      if (this.theMetadata.size() != other.theMetadata.size()) {
        return false;
      }
      Iterator<Map.Entry<Text, Text>> iter1 =
        this.theMetadata.entrySet().iterator();
      Iterator<Map.Entry<Text, Text>> iter2 =
        other.theMetadata.entrySet().iterator();
      while (iter1.hasNext() && iter2.hasNext()) {
        Map.Entry<Text, Text> en1 = iter1.next();
        Map.Entry<Text, Text> en2 = iter2.next();
        if (!en1.getKey().equals(en2.getKey())) {
          return false;
        }
        if (!en1.getValue().equals(en2.getValue())) {
          return false;
        }
      }
      if (iter1.hasNext() || iter2.hasNext()) {
        return false;
      }
      return true;
    }//判断是否相等

    public int hashCode() {
      assert false : "hashCode not designed";
      return 42; // any arbitrary constant will do 
    }//得到hashcode

    public String toString() {
      StringBuffer sb = new StringBuffer();
      sb.append("size: ").append(this.theMetadata.size()).append("\n");
      Iterator<Map.Entry<Text, Text>> iter =
        this.theMetadata.entrySet().iterator();
      while (iter.hasNext()) {
        Map.Entry<Text, Text> en = iter.next();
        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
        sb.append("\n");
      }
      return sb.toString();
    }
  }//转化为字符串输出

  /** Write key/value pairs to a sequence-format file. */
  /*Write是用于读取不压缩的key和value的SequenceFile
   * Configuration conf是hadoop的配置对象
   * FSDataOutputStream out是文件的输出流
   * boolean ownOutputStream 用于表示是否使用自己的输出流
   * DataOutputBuffer buffer=new DataOutputBuffer()输出缓存区
   * Class keyClass是输出key所代表的类型
   * Class valueClass是输出value所代表的类型
   * private boolean compress用于标识是否进行压缩
   * CompressionCodec codec是压缩编码解码器
   * CompressionOutputStream deflateFilter是压缩输出流
   * DataOutputStream deflateOut未压缩的输出流
   * Metadata metadata文件元数据
   * Compress compress文件输出所用的压缩器
   * Serializer keySerializer对于key进行序列化的序列化器
   * Serializer uncompressedValSerializer对于未压缩的value的序列化器
   * Serializer compressedValSerializer对压缩的value进行序列化的序列化器
   * long lastSyncPos最后一个同步位置
   * byte[] sync存储最后一个同步字节
   * */ 
  public static class Writer implements java.io.Closeable {
    Configuration conf;//hadoop的配置对象
    FSDataOutputStream out;//文件的输出流
    boolean ownOutputStream = true;//用于表示是否使用自己的输出流
    DataOutputBuffer buffer = new DataOutputBuffer();//输出缓存区

    Class keyClass;//是输出key所代表的类型
    Class valClass;//是输出value所代表的类型

    private boolean compress;//用于标识是否进行压缩
    CompressionCodec codec = null;//是压缩编码解码器
    CompressionOutputStream deflateFilter = null;//是压缩输出流
    DataOutputStream deflateOut = null;//未压缩的输出流
    Metadata metadata = null;//文件元数据
    Compressor compressor = null;//文件输出所用的压缩器

    protected Serializer keySerializer;//对于key进行序列化的序列化器
    protected Serializer uncompressedValSerializer;//对于未压缩的value的序列化器
    protected Serializer compressedValSerializer;//对压缩的value进行序列化的序列化器

    // Insert a globally unique 16-byte value every few entries, so that one
    // can seek into the middle of a file and then synchronize with record
    // starts and ends by scanning for this value.
    long lastSyncPos;                     // position of last sync最后一个同步位置
    byte[] sync;                          // 16 random bytes存储最后一个16位的同步字符
    {
      try {                                       
        MessageDigest digester = MessageDigest.getInstance("MD5");
        long time = System.currentTimeMillis();
        digester.update((new UID()+"@"+time).getBytes());
        sync = digester.digest();
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }

    /** Implicit constructor: needed for the period of transition!*/
    Writer()
    {}

    /** Create the named file. */
    public Writer(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass)
      throws IOException {
      this(fs, conf, name, keyClass, valClass, null, new Metadata());
    }

    /** Create the named file with write-progress reporter. */
    public Writer(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass,
                  Progressable progress, Metadata metadata)
      throws IOException {
      this(fs, conf, name, keyClass, valClass,
           fs.getConf().getInt("io.file.buffer.size", 4096),
           fs.getDefaultReplication(), fs.getDefaultBlockSize(),
           progress, metadata);
    }

    /** Create the named file with write-progress reporter. */
    public Writer(FileSystem fs, Configuration conf, Path name,
                  Class keyClass, Class valClass,
                  int bufferSize, short replication, long blockSize,
                  Progressable progress, Metadata metadata)
      throws IOException {
      init(name, conf,
           fs.create(name, true, bufferSize, replication, blockSize, progress),
              keyClass, valClass, false, null, metadata);
      initializeFileHeader();
      writeFileHeader();
      finalizeFileHeader();
    }

    /** Write to an arbitrary stream using a specified buffer size. */
    Writer(Configuration conf, FSDataOutputStream out, 
                   Class keyClass, Class valClass, Metadata metadata)
      throws IOException {
      this.ownOutputStream = false;
      init(null, conf, out, keyClass, valClass, false, null, metadata);

      initializeFileHeader();
      writeFileHeader();
      finalizeFileHeader();
    }  //以上为构造函数

    /** Write the initial part of file header. */
    void initializeFileHeader() 
      throws IOException{
      out.write(VERSION);
    }//向文件中写入文件头的初始化部分,即版本信息

    /** Write the final part of file header. */
    void finalizeFileHeader() 
      throws IOException{
      out.write(sync);                       // write the sync bytes写入同步字节
      out.flush();                           // flush header写入文件header部分
    }//向文件写入文件头最后部分

    boolean isCompressed() { return compress; }//是否压缩,返回文件压缩的标志compress
    boolean isBlockCompressed() { return false; }//返回是否可以对文件进行块压缩,默认false

    Writer ownStream() { this.ownOutputStream = true; return this; }

    /** Write and flush the file header. */
    void writeFileHeader() 
      throws IOException {
      Text.writeString(out, keyClass.getName());//写入key
      Text.writeString(out, valClass.getName());//写入value

      out.writeBoolean(this.isCompressed());//写入是否可以压缩
      out.writeBoolean(this.isBlockCompressed());//写入是否可以进行块压缩

      if (this.isCompressed()) {
        Text.writeString(out, (codec.getClass()).getName());
        //如果可以压缩则写入相应的压缩编码解码器
      }
      this.metadata.write(out);//写入元数据
    }//向文件中写入文件头的内容部分

    /** Initialize. */
    @SuppressWarnings("unchecked")
    void init(Path name, Configuration conf, FSDataOutputStream out,
              Class keyClass, Class valClass,
              boolean compress, CompressionCodec codec, Metadata metadata) 
      throws IOException {
      this.conf = conf;
      this.out = out;
      this.keyClass = keyClass;
      this.valClass = valClass;
      this.compress = compress;
      this.codec = codec;
      this.metadata = metadata;
      SerializationFactory serializationFactory = new SerializationFactory(conf);
      this.keySerializer = serializationFactory.getSerializer(keyClass);
      this.keySerializer.open(buffer);
      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
      this.uncompressedValSerializer.open(buffer);
      if (this.codec != null) {
        ReflectionUtils.setConf(this.codec, this.conf);
        this.compressor = CodecPool.getCompressor(this.codec);
        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
        this.deflateOut = 
          new DataOutputStream(new BufferedOutputStream(deflateFilter));
        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
        this.compressedValSerializer.open(deflateOut);
      }
    }//初始化函数

    /** Returns the class of keys in this file. */
    public Class getKeyClass() { return keyClass; } //返回key的类型

    /** Returns the class of values in this file. */
    public Class getValueClass() { return valClass; }//返回value的类型

    /** Returns the compression codec of data in this file. */
    public CompressionCodec getCompressionCodec() { return codec; }//返回压缩编码解码器

    /** create a sync point */
    public void sync() throws IOException {
      if (sync != null && lastSyncPos != out.getPos()) {
        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync标记同步的开始
        out.write(sync);                          // write sync写入同步字节
        lastSyncPos = out.getPos();               // update lastSyncPos更新最后同步的位置
      }
    }//创建同步点

    /** flush all currently written data to the file system */
    public void syncFs() throws IOException {
      if (out != null) {
        out.sync();                               // flush contents to file system
      }
    }//同步信息写入到文件

    /** Returns the configuration of this file. */
    Configuration getConf() { return conf; }//返回配置信息

    /** Close the file. */
    public synchronized void close() throws IOException {
      keySerializer.close();//关闭key序列化器
      uncompressedValSerializer.close();//关闭未压缩的value的序列器
      if (compressedValSerializer != null) {
        compressedValSerializer.close();//关闭压缩的value的序列化器
      }

      CodecPool.returnCompressor(compressor);//将压缩器放回池中
      compressor = null;

      if (out != null) {

        // Close the underlying stream iff we own it...
          //如果我们拥有输出流则关闭它
        if (ownOutputStream) {
          out.close();
        } else {
          out.flush();//同步输出流内容
        }
        out = null;
      }
    }//关闭对文件的写入

    synchronized void checkAndWriteSync() throws IOException {
      if (sync != null &&
          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
        sync();
      }
    }//检查是否写入同步字节数组标识

    /** Append a key/value pair.
     * 向文件的末尾追加key和value对 */
    public synchronized void append(Writable key, Writable val)
      throws IOException {
      append((Object) key, (Object) val);
    }

    /** Append a key/value pair. */
    @SuppressWarnings("unchecked")
    public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val.getClass().getName()
                              +" is not "+valClass);
     //只能向文件中追加具有相同key和value相同类型的键值对
      buffer.reset();//重置buffer

      // Append the 'key'追加key值
      keySerializer.serialize(key);
      int keyLength = buffer.getLength();
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);

      // Append the 'value'追加value 
      if (compress) {  //如果可以压缩则进行压缩
        deflateFilter.resetState();
        compressedValSerializer.serialize(val);
        deflateOut.flush();
        deflateFilter.finish();
      } else {
        uncompressedValSerializer.serialize(val);//不对value进行压缩
      }

      // Write the record out写入文件中
      checkAndWriteSync();                                // sync写入同步标志
      out.writeInt(buffer.getLength());                   // total record length记录的总长度
      out.writeInt(keyLength);                            // key portion length  key的长度
      out.write(buffer.getData(), 0, buffer.getLength()); // data写入数据
    }//向文件的末尾追加key和value对

    public synchronized void appendRaw(byte[] keyData, int keyOffset,
        int keyLength, ValueBytes val) throws IOException {
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + keyLength);

      int valLength = val.getSize();

      checkAndWriteSync();

      out.writeInt(keyLength+valLength);          // total record length
      out.writeInt(keyLength);                    // key portion length
      out.write(keyData, keyOffset, keyLength);   // key
      val.writeUncompressedBytes(out);            // value
    }

    /** Returns the current length of the output file.
     *
     * <p>This always returns a synchronized position.  In other words,
     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
     * the key may be earlier in the file than key last written when this
     * method was called (e.g., with block-compression, it may be the first key
     * in the block that was being written when this method was called).
     */
    public synchronized long getLength() throws IOException {
      return out.getPos();
    }//返回当前输出文件的长度,即同步位置

  } // class Writer

  /** Write key/compressed-value pairs to a sequence-format file. */
  /*RecordCompressWriter继承自writer,是对key和压缩的value操作的*/
  static class RecordCompressWriter extends Writer {

    /** Create the named file. */
    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
                                Class keyClass, Class valClass, CompressionCodec codec) 
      throws IOException {
      this(conf, fs.create(name), keyClass, valClass, codec, new Metadata());
    }

    /** Create the named file with write-progress reporter. */
    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
                                Class keyClass, Class valClass, CompressionCodec codec,
                                Progressable progress, Metadata metadata)
      throws IOException {
      this(fs, conf, name, keyClass, valClass,
           fs.getConf().getInt("io.file.buffer.size", 4096),
           fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
           progress, metadata);
    }

    /** Create the named file with write-progress reporter. */
    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
                                Class keyClass, Class valClass,
                                int bufferSize, short replication, long blockSize,
                                CompressionCodec codec,
                                Progressable progress, Metadata metadata)
      throws IOException {
      super.init(name, conf,
                 fs.create(name, true, bufferSize, replication, blockSize, progress),
                 keyClass, valClass, true, codec, metadata);

      initializeFileHeader();
      writeFileHeader();
      finalizeFileHeader();
    }

    /** Create the named file with write-progress reporter. */
    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
                                Class keyClass, Class valClass, CompressionCodec codec,
                                Progressable progress)
      throws IOException {
      this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
    }

    /** Write to an arbitrary stream using a specified buffer size. */
    RecordCompressWriter(Configuration conf, FSDataOutputStream out,
                                 Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
      throws IOException {
      this.ownOutputStream = false;
      super.init(null, conf, out, keyClass, valClass, true, codec, metadata);

      initializeFileHeader();
      writeFileHeader();
      finalizeFileHeader();

    }

    boolean isCompressed() { return true; }
    boolean isBlockCompressed() { return false; }

    /** Append a key/value pair. */
    @SuppressWarnings("unchecked")
    public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val.getClass().getName()
                              +" is not "+valClass);

      buffer.reset();

      // Append the 'key'
      keySerializer.serialize(key);
      int keyLength = buffer.getLength();
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);

      // Compress 'value' and append it
      deflateFilter.resetState();
      compressedValSerializer.serialize(val);
      deflateOut.flush();
      deflateFilter.finish();

      // Write the record out
      checkAndWriteSync();                                // sync
      out.writeInt(buffer.getLength());                   // total record length
      out.writeInt(keyLength);                            // key portion length
      out.write(buffer.getData(), 0, buffer.getLength()); // data
    }

    /** Append a key/value pair. */
    public synchronized void appendRaw(byte[] keyData, int keyOffset,
        int keyLength, ValueBytes val) throws IOException {

      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + keyLength);

      int valLength = val.getSize();

      checkAndWriteSync();                        // sync
      out.writeInt(keyLength+valLength);          // total record length
      out.writeInt(keyLength);                    // key portion length
      out.write(keyData, keyOffset, keyLength);   // 'key' data
      val.writeCompressedBytes(out);              // 'value' data
    }

  } // RecordCompressionWriter/*RecordCompressWriter继承自writer,是对key和压缩的value操作的*/

  /** Write compressed key/value blocks to a sequence-format file. */
  /*BlockCompressWriter继承自writer,是对压缩的key和value操作的*/
  static class BlockCompressWriter extends Writer {

    private int noBufferedRecords = 0;

    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();//压缩的key长度的缓冲区
    private DataOutputBuffer keyBuffer = new DataOutputBuffer();//压缩的key缓冲区

    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();//压缩的value长度的缓冲区
    private DataOutputBuffer valBuffer = new DataOutputBuffer();//压缩的value缓冲区

    private int compressionBlockSize;//压缩的块大小

    /** Create the named file. */
    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
                               Class keyClass, Class valClass, CompressionCodec codec) 
      throws IOException {
      this(fs, conf, name, keyClass, valClass,
           fs.getConf().getInt("io.file.buffer.size", 4096),
           fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
           null, new Metadata());
    }

    /** Create the named file with write-progress reporter. */
    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
                               Class keyClass, Class valClass, CompressionCodec codec,
                               Progressable progress, Metadata metadata)
      throws IOException {
      this(fs, conf, name, keyClass, valClass,
           fs.getConf().getInt("io.file.buffer.size", 4096),
           fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
           progress, metadata);
    }

    /** Create the named file with write-progress reporter. */
    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
                               Class keyClass, Class valClass,
                               int bufferSize, short replication, long blockSize,
                               CompressionCodec codec,
                               Progressable progress, Metadata metadata)
      throws IOException {
      super.init(name, conf,
                 fs.create(name, true, bufferSize, replication, blockSize, progress),
                 keyClass, valClass, true, codec, metadata);
      init(conf.getInt("io.seqfile.compress.blocksize", 1000000));

      initializeFileHeader();
      writeFileHeader();
      finalizeFileHeader();
    }

    /** Create the named file with write-progress reporter. */
    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
                               Class keyClass, Class valClass, CompressionCodec codec,
                               Progressable progress)
      throws IOException {
      this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
    }

    /** Write to an arbitrary stream using a specified buffer size. */
    BlockCompressWriter(Configuration conf, FSDataOutputStream out,
                                Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
      throws IOException {
      this.ownOutputStream = false;
      super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
      init(conf.getInt("io.seqfile.compress.blocksize", 1000000));

      initializeFileHeader();
      writeFileHeader();
      finalizeFileHeader();
    }

    boolean isCompressed() { return true; }//是否压缩,默认为true
    boolean isBlockCompressed() { return true; }//是否可以块压缩,默认为true

    /** Initialize */
    void init(int compressionBlockSize) throws IOException {
      this.compressionBlockSize = compressionBlockSize;
      keySerializer.close();
      keySerializer.open(keyBuffer);
      uncompressedValSerializer.close();
      uncompressedValSerializer.open(valBuffer);
    }//初始化的函数

    /** Workhorse to check and write out compressed data/lengths */
    private synchronized 
      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
      throws IOException {
      deflateFilter.resetState();
      buffer.reset();
      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
                       uncompressedDataBuffer.getLength());
      deflateOut.flush();
      deflateFilter.finish();

      WritableUtils.writeVInt(out, buffer.getLength());
      out.write(buffer.getData(), 0, buffer.getLength());
    }//检查输出压缩的数据和长度

    /** Compress and flush contents to dfs */
    public synchronized void sync() throws IOException {
      if (noBufferedRecords > 0) {
        super.sync();

        // No. of records
        WritableUtils.writeVInt(out, noBufferedRecords);

        // Write 'keys' and lengths
        writeBuffer(keyLenBuffer);
        writeBuffer(keyBuffer);

        // Write 'values' and lengths
        writeBuffer(valLenBuffer);
        writeBuffer(valBuffer);

        // Flush the file-stream
        out.flush();

        // Reset internal states
        keyLenBuffer.reset();
        keyBuffer.reset();
        valLenBuffer.reset();
        valBuffer.reset();
        noBufferedRecords = 0;
      }

    }//压缩并且更新同步信息到文件

    /** Close the file. */
    public synchronized void close() throws IOException {
      if (out != null) {
        sync();
      }
      super.close();
    }//关闭文件

    /** Append a key/value pair. */
    @SuppressWarnings("unchecked")
    public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key+" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val+" is not "+valClass);

      // Save key/value into respective buffers 
      int oldKeyLength = keyBuffer.getLength();
      keySerializer.serialize(key);
      int keyLength = keyBuffer.getLength() - oldKeyLength;
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);
      WritableUtils.writeVInt(keyLenBuffer, keyLength);

      int oldValLength = valBuffer.getLength();
      uncompressedValSerializer.serialize(val);
      int valLength = valBuffer.getLength() - oldValLength;
      WritableUtils.writeVInt(valLenBuffer, valLength);

      // Added another key/value pair
      ++noBufferedRecords;

      // Compress and flush?
      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
      if (currentBlockSize >= compressionBlockSize) {
        sync();
      }
    }

    /** Append a key/value pair. */
    public synchronized void appendRaw(byte[] keyData, int keyOffset,
        int keyLength, ValueBytes val) throws IOException {

      if (keyLength < 0)
        throw new IOException("negative length keys not allowed");

      int valLength = val.getSize();

      // Save key/value data in relevant buffers
      WritableUtils.writeVInt(keyLenBuffer, keyLength);
      keyBuffer.write(keyData, keyOffset, keyLength);
      WritableUtils.writeVInt(valLenBuffer, valLength);
      val.writeUncompressedBytes(valBuffer);

      // Added another key/value pair
      ++noBufferedRecords;

      // Compress and flush?
      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
      if (currentBlockSize >= compressionBlockSize) {
        sync();
      }
    }

  } // BlockCompressionWriter

  /** Reads key/value pairs from a sequence-format file. */
  public static class Reader implements java.io.Closeable {
    private Path file; //文件的路径
    private FSDataInputStream in;//输入流
    private DataOutputBuffer outBuf = new DataOutputBuffer();//缓存

    private byte version;

    private String keyClassName;
    private String valClassName;
    private Class keyClass;
    private Class valClass;

    private CompressionCodec codec = null;
    private Metadata metadata = null;

    private byte[] sync = new byte[SYNC_HASH_SIZE];
    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
    private boolean syncSeen;

    private long end;
    private int keyLength;
    private int recordLength;

    private boolean decompress;
    private boolean blockCompressed;

    private Configuration conf;

    private int noBufferedRecords = 0;
    private boolean lazyDecompress = true;
    private boolean valuesDecompressed = true;

    private int noBufferedKeys = 0;
    private int noBufferedValues = 0;

    private DataInputBuffer keyLenBuffer = null;
    private CompressionInputStream keyLenInFilter = null;
    private DataInputStream keyLenIn = null;
    private Decompressor keyLenDecompressor = null;
    private DataInputBuffer keyBuffer = null;
    private CompressionInputStream keyInFilter = null;
    private DataInputStream keyIn = null;
    private Decompressor keyDecompressor = null;

    private DataInputBuffer valLenBuffer = null;
    private CompressionInputStream valLenInFilter = null;
    private DataInputStream valLenIn = null;
    private Decompressor valLenDecompressor = null;
    private DataInputBuffer valBuffer = null;
    private CompressionInputStream valInFilter = null;
    private DataInputStream valIn = null;
    private Decompressor valDecompressor = null;

    private Deserializer keyDeserializer;
    private Deserializer valDeserializer;

    /** Open the named file. */
    public Reader(FileSystem fs, Path file, Configuration conf)
      throws IOException {
      this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
    }

    private Reader(FileSystem fs, Path file, int bufferSize,
                   Configuration conf, boolean tempReader) throws IOException {
      this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader);
    }

    private Reader(FileSystem fs, Path file, int bufferSize, long start,
                   long length, Configuration conf, boolean tempReader) 
    throws IOException {
      this.file = file;
      this.in = openFile(fs, file, bufferSize, length);
      this.conf = conf;
      seek(start);
      this.end = in.getPos() + length;
      init(tempReader);
    }

    /**
     * Override this method to specialize the type of
     * {@link FSDataInputStream} returned.
     */
    protected FSDataInputStream openFile(FileSystem fs, Path file,
        int bufferSize, long length) throws IOException {
      return fs.open(file, bufferSize);
    }//打开SequenceFile文件

    /**
     * Initialize the {@link Reader}
     * @param tmpReader <code>true</code> if we are constructing a temporary
     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
     *                  and hence do not initialize every component; 
     *                  <code>false</code> otherwise.
     * @throws IOException
     */
    private void init(boolean tempReader) throws IOException {
      byte[] versionBlock = new byte[VERSION.length];
      in.readFully(versionBlock);

      if ((versionBlock[0] != VERSION[0]) ||
          (versionBlock[1] != VERSION[1]) ||
          (versionBlock[2] != VERSION[2]))
        throw new IOException(file + " not a SequenceFile");

      // Set 'version'
      version = versionBlock[3];
      if (version > VERSION[3])
        throw new VersionMismatchException(VERSION[3], version);

      if (version < BLOCK_COMPRESS_VERSION) {
        UTF8 className = new UTF8();

        className.readFields(in);
        keyClassName = className.toStringChecked(); // key class name

        className.readFields(in);
        valClassName = className.toStringChecked(); // val class name
      } else {
        keyClassName = Text.readString(in);
        valClassName = Text.readString(in);
      }

      if (version > 2) {                          // if version > 2
        this.decompress = in.readBoolean();       // is compressed?
      } else {
        decompress = false;
      }

      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
        this.blockCompressed = in.readBoolean();  // is block-compressed?
      } else {
        blockCompressed = false;
      }

      // if version >= 5
      // setup the compression codec
      if (decompress) {
        if (version >= CUSTOM_COMPRESS_VERSION) {
          String codecClassname = Text.readString(in);
          try {
            Class<? extends CompressionCodec> codecClass
              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
            this.codec = ReflectionUtils.newInstance(codecClass, conf);
          } catch (ClassNotFoundException cnfe) {
            throw new IllegalArgumentException("Unknown codec: " + 
                                               codecClassname, cnfe);
          }
        } else {
          codec = new DefaultCodec();
          ((Configurable)codec).setConf(conf);
        }
      }

      this.metadata = new Metadata();
      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
        this.metadata.readFields(in);
      }

      if (version > 1) {                          // if version > 1
        in.readFully(sync);                       // read sync bytes
      }

      // Initialize... *not* if this we are constructing a temporary Reader
      if (!tempReader) {
        valBuffer = new DataInputBuffer();
        if (decompress) {
          valDecompressor = CodecPool.getDecompressor(codec);
          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
          valIn = new DataInputStream(valInFilter);
        } else {
          valIn = valBuffer;
        }

        if (blockCompressed) {
          keyLenBuffer = new DataInputBuffer();
          keyBuffer = new DataInputBuffer();
          valLenBuffer = new DataInputBuffer();

          keyLenDecompressor = CodecPool.getDecompressor(codec);
          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
                                                   keyLenDecompressor);
          keyLenIn = new DataInputStream(keyLenInFilter);

          keyDecompressor = CodecPool.getDecompressor(codec);
          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
          keyIn = new DataInputStream(keyInFilter);

          valLenDecompressor = CodecPool.getDecompressor(codec);
          valLenInFilter = codec.createInputStream(valLenBuffer, 
                                                   valLenDecompressor);
          valLenIn = new DataInputStream(valLenInFilter);
        }

        SerializationFactory serializationFactory =
          new SerializationFactory(conf);
        this.keyDeserializer =
          getDeserializer(serializationFactory, getKeyClass());
        if (!blockCompressed) {
          this.keyDeserializer.open(valBuffer);
        } else {
          this.keyDeserializer.open(keyIn);
        }
        this.valDeserializer =
          getDeserializer(serializationFactory, getValueClass());
        this.valDeserializer.open(valIn);
      }
    }

    @SuppressWarnings("unchecked")
    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
      return sf.getDeserializer(c);
    }

    /** Close the file. */
    public synchronized void close() throws IOException {
      // Return the decompressors to the pool
      CodecPool.returnDecompressor(keyLenDecompressor);
      CodecPool.returnDecompressor(keyDecompressor);
      CodecPool.returnDecompressor(valLenDecompressor);
      CodecPool.returnDecompressor(valDecompressor);
      keyLenDecompressor = keyDecompressor = null;
      valLenDecompressor = valDecompressor = null;

      if (keyDeserializer != null) {
        keyDeserializer.close();
      }
      if (valDeserializer != null) {
        valDeserializer.close();
      }

      // Close the input-stream
      in.close();
    }//关闭SequenceFile文件

    /** Returns the name of the key class. */
    public String getKeyClassName() {
      return keyClassName;
    }//返回key的名字?

    /** Returns the class of keys in this file. */
    public synchronized Class<?> getKeyClass() {
      if (null == keyClass) {
        try {
          keyClass = WritableName.getClass(getKeyClassName(), conf);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
      return keyClass;
    }//返回key的类型

    /** Returns the name of the value class. */
    public String getValueClassName() {
      return valClassName;
    }//返回value的名字?

    /** Returns the class of values in this file. */
    public synchronized Class<?> getValueClass() {
      if (null == valClass) {
        try {
          valClass = WritableName.getClass(getValueClassName(), conf);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
      return valClass;
    }//返回value的类型

    /** Returns true if values are compressed. */
    public boolean isCompressed() { return decompress; }//是否value压缩

    /** Returns true if records are block-compressed. */
    public boolean isBlockCompressed() { return blockCompressed; }//是否块压缩

    /** Returns the compression codec of data in this file. */
    public CompressionCodec getCompressionCodec() { return codec; }//返回压缩编码的解码器

    /** Returns the metadata object of the file */
    public Metadata getMetadata() {
      return this.metadata;
    }//返回元数据

    /** Returns the configuration used for this file. */
    Configuration getConf() { return conf; }//返回配置信息

    /** Read a compressed buffer */
    private synchronized void readBuffer(DataInputBuffer buffer, 
                                         CompressionInputStream filter) throws IOException {
      // Read data into a temporary buffer
      DataOutputBuffer dataBuffer = new DataOutputBuffer();//将数据读到临时缓冲区

      try {
        int dataBufferLength = WritableUtils.readVInt(in);
        dataBuffer.write(in, dataBufferLength);

        // Set up 'buffer' connected to the input-stream将缓冲区与输入流连起来
        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
      } finally {
        dataBuffer.close();
      }

      // Reset the codec
      filter.resetState();
    }//读取被压缩的缓冲区

    /** Read the next 'compressed' block */
    private synchronized void readBlock() throws IOException {
      // Check if we need to throw away a whole block of 
      // 'values' due to 'lazy decompression' 
        /*判断是否因为延迟解压缩而舍弃一个块*/
      if (lazyDecompress && !valuesDecompressed) {
        in.seek(WritableUtils.readVInt(in)+in.getPos());
        in.seek(WritableUtils.readVInt(in)+in.getPos());
      }

      // Reset internal states重置内部状态
      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
      valuesDecompressed = false;

      //Process sync处理同步过程
      if (sync != null) {
        in.readInt();
        in.readFully(syncCheck);                // read syncCheck
        if (!Arrays.equals(sync, syncCheck))    // check it
          throw new IOException("File is corrupt!");
      }
      syncSeen = true;

      // Read number of records in this block读取块中的记录的数量
      noBufferedRecords = WritableUtils.readVInt(in);

      // Read key lengths and keys读取key的长度和key
      readBuffer(keyLenBuffer, keyLenInFilter);
      readBuffer(keyBuffer, keyInFilter);
      noBufferedKeys = noBufferedRecords;

      // Read value lengths and values
      if (!lazyDecompress) {
        readBuffer(valLenBuffer, valLenInFilter);
        readBuffer(valBuffer, valInFilter);
        noBufferedValues = noBufferedRecords;
        valuesDecompressed = true;
      }
    }//读取下一个压缩的块

    /** 
     * Position valLenIn/valIn to the 'value' 
     * corresponding to the 'current' key 
     */
    private synchronized void seekToCurrentValue() throws IOException {
      if (!blockCompressed) {
        if (decompress) {
          valInFilter.resetState();
        }
        valBuffer.reset();
      } else {
        // Check if this is the first value in the 'block' to be read
        if (lazyDecompress && !valuesDecompressed) {
          // Read the value lengths and values
          readBuffer(valLenBuffer, valLenInFilter);
          readBuffer(valBuffer, valInFilter);
          noBufferedValues = noBufferedRecords;
          valuesDecompressed = true;
        }

        // Calculate the no. of bytes to skip
        // Note: 'current' key has already been read!
        int skipValBytes = 0;
        int currentKey = noBufferedKeys + 1;          
        for (int i=noBufferedValues; i > currentKey; --i) {
          skipValBytes += WritableUtils.readVInt(valLenIn);
          --noBufferedValues;
        }

        // Skip to the 'val' corresponding to 'current' key
        if (skipValBytes > 0) {
          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
            throw new IOException("Failed to seek to " + currentKey + 
                                  "(th) value!");
          }
        }
      }
    }//定位到当前的value

    /**
     * Get the 'value' corresponding to the last read 'key'.
     * @param val : The 'value' to be read.
     * @throws IOException
     */
    public synchronized void getCurrentValue(Writable val) 
      throws IOException {
      if (val instanceof Configurable) {
        ((Configurable) val).setConf(this.conf);
      }

      // Position stream to 'current' value
      seekToCurrentValue();

      if (!blockCompressed) {
        val.readFields(valIn);

        if (valIn.read() > 0) {
          LOG.info("available bytes: " + valIn.available());
          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
                                + " bytes, should read " +
                                (valBuffer.getLength()-keyLength));
        }
      } else {
        // Get the value
        int valLength = WritableUtils.readVInt(valLenIn);
        val.readFields(valIn);

        // Read another compressed 'value'
        --noBufferedValues;

        // Sanity check
        if (valLength < 0) {
          LOG.debug(val + " is a zero-length value");
        }
      }

    }//得到读取的最后一个key对应的的value,未反序列化

    /**
     * Get the 'value' corresponding to the last read 'key'.
     * @param val : The 'value' to be read.
     * @throws IOException
     */
    public synchronized Object getCurrentValue(Object val) 
      throws IOException {
      if (val instanceof Configurable) {
        ((Configurable) val).setConf(this.conf);
      }

      // Position stream to 'current' value
      seekToCurrentValue();

      if (!blockCompressed) {
        val = deserializeValue(val);

        if (valIn.read() > 0) {
          LOG.info("available bytes: " + valIn.available());
          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
                                + " bytes, should read " +
                                (valBuffer.getLength()-keyLength));
        }
      } else {
        // Get the value
        int valLength = WritableUtils.readVInt(valLenIn);
        val = deserializeValue(val);

        // Read another compressed 'value'
        --noBufferedValues;

        // Sanity check
        if (valLength < 0) {
          LOG.debug(val + " is a zero-length value");
        }
      }
      return val;

    }

    @SuppressWarnings("unchecked")
    private Object deserializeValue(Object val) throws IOException {
      return valDeserializer.deserialize(val);
    }//反序列化value

    /** Read the next key in the file into <code>key</code>, skipping its
     * value.  True if another entry exists, and false at end of file. */
    public synchronized boolean next(Writable key) throws IOException {
      if (key.getClass() != getKeyClass())
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);

      if (!blockCompressed) {
        outBuf.reset();

        keyLength = next(outBuf);
        if (keyLength < 0)
          return false;

        valBuffer.reset(outBuf.getData(), outBuf.getLength());

        key.readFields(valBuffer);
        valBuffer.mark(0);
        if (valBuffer.getPosition() != keyLength)
          throw new IOException(key + " read " + valBuffer.getPosition()
                                + " bytes, should read " + keyLength);
      } else {
        //Reset syncSeen
        syncSeen = false;

        if (noBufferedKeys == 0) {
          try {
            readBlock();
          } catch (EOFException eof) {
            return false;
          }
        }

        int keyLength = WritableUtils.readVInt(keyLenIn);

        // Sanity check
        if (keyLength < 0) {
          return false;
        }

        //Read another compressed 'key'
        key.readFields(keyIn);
        --noBufferedKeys;
      }

      return true;
    }//读取下一个key,并且跳过他的value,如果存在则返回1,当到达末尾时则返回0

    /** Read the next key/value pair in the file into <code>key</code> and
     * <code>val</code>.  Returns true if such a pair exists and false when at
     * end of file */
    public synchronized boolean next(Writable key, Writable val)
      throws IOException {
      if (val.getClass() != getValueClass())
        throw new IOException("wrong value class: "+val+" is not "+valClass);

      boolean more = next(key);

      if (more) {
        getCurrentValue(val);
      }

      return more;
    }//读取文件中的key和value,存在返回true,到达文件末尾返回false

    /**
     * Read and return the next record length, potentially skipping over 
     * a sync block.
     * @return the length of the next record or -1 if there is no next record
     * @throws IOException
     */
    private synchronized int readRecordLength() throws IOException {
      if (in.getPos() >= end) {
        return -1;
      }      
      int length = in.readInt();
      if (version > 1 && sync != null &&
          length == SYNC_ESCAPE) {              // process a sync entry
        in.readFully(syncCheck);                // read syncCheck
        if (!Arrays.equals(sync, syncCheck))    // check it
          throw new IOException("File is corrupt!");
        syncSeen = true;
        if (in.getPos() >= end) {
          return -1;
        }
        length = in.readInt();                  // re-read length
      } else {
        syncSeen = false;
      }

      return length;
    }//读取记录的长度

    /** Read the next key/value pair in the file into <code>buffer</code>.
     * Returns the length of the key read, or -1 if at end of file.  The length
     * of the value may be computed by calling buffer.getLength() before and
     * after calls to this method. */
    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
    public synchronized int next(DataOutputBuffer buffer) throws IOException {
      // Unsupported for block-compressed sequence files
      if (blockCompressed) {
        throw new IOException("Unsupported call for block-compressed" +
                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
      }
      try {
        int length = readRecordLength();
        if (length == -1) {
          return -1;
        }
        int keyLength = in.readInt();
        buffer.write(in, length);
        return keyLength;
      } catch (ChecksumException e) {             // checksum failure
        handleChecksumException(e);
        return next(buffer);
      }
    }//从文件中读取下一个key-value到buffer中

    public ValueBytes createValueBytes() {
      ValueBytes val = null;
      if (!decompress || blockCompressed) {
        val = new UncompressedBytes();
      } else {
        val = new CompressedBytes(codec);
      }
      return val;
    }//将val写到字节数组中

    /**
     * Read 'raw' records.
     * @param key - The buffer into which the key is read
     * @param val - The 'raw' value
     * @return Returns the total record length or -1 for end of file
     * @throws IOException
     */
    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
      throws IOException {
      if (!blockCompressed) {
        int length = readRecordLength();
        if (length == -1) {
          return -1;
        }
        int keyLength = in.readInt();
        int valLength = length - keyLength;
        key.write(in, keyLength);
        if (decompress) {
          CompressedBytes value = (CompressedBytes)val;
          value.reset(in, valLength);
        } else {
          UncompressedBytes value = (UncompressedBytes)val;
          value.reset(in, valLength);
        }

        return length;
      } else {
        //Reset syncSeen
        syncSeen = false;

        // Read 'key'
        if (noBufferedKeys == 0) {
          if (in.getPos() >= end) 
            return -1;

          try { 
            readBlock();
          } catch (EOFException eof) {
            return -1;
          }
        }
        int keyLength = WritableUtils.readVInt(keyLenIn);
        if (keyLength < 0) {
          throw new IOException("zero length key found!");
        }
        key.write(keyIn, keyLength);
        --noBufferedKeys;

        // Read raw 'value'
        seekToCurrentValue();
        int valLength = WritableUtils.readVInt(valLenIn);
        UncompressedBytes rawValue = (UncompressedBytes)val;
        rawValue.reset(valIn, valLength);
        --noBufferedValues;

        return (keyLength+valLength);
      }

    }

    /**
     * Read 'raw' keys.
     * @param key - The buffer into which the key is read
     * @return Returns the key length or -1 for end of file
     * @throws IOException
     */
    public int nextRawKey(DataOutputBuffer key) 
      throws IOException {
      if (!blockCompressed) {
        recordLength = readRecordLength();
        if (recordLength == -1) {
          return -1;
        }
        keyLength = in.readInt();
        key.write(in, keyLength);
        return keyLength;
      } else {
        //Reset syncSeen
        syncSeen = false;

        // Read 'key'
        if (noBufferedKeys == 0) {
          if (in.getPos() >= end) 
            return -1;

          try { 
            readBlock();
          } catch (EOFException eof) {
            return -1;
          }
        }
        int keyLength = WritableUtils.readVInt(keyLenIn);
        if (keyLength < 0) {
          throw new IOException("zero length key found!");
        }
        key.write(keyIn, keyLength);
        --noBufferedKeys;

        return keyLength;
      }

    }

    /** Read the next key in the file, skipping its
     * value.  Return null at end of file. */
    public synchronized Object next(Object key) throws IOException {
      if (key != null && key.getClass() != getKeyClass()) {
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);
      }

      if (!blockCompressed) {
        outBuf.reset();

        keyLength = next(outBuf);
        if (keyLength < 0)
          return null;

        valBuffer.reset(outBuf.getData(), outBuf.getLength());

        key = deserializeKey(key);
        valBuffer.mark(0);
        if (valBuffer.getPosition() != keyLength)
          throw new IOException(key + " read " + valBuffer.getPosition()
                                + " bytes, should read " + keyLength);
      } else {
        //Reset syncSeen
        syncSeen = false;

        if (noBufferedKeys == 0) {
          try {
            readBlock();
          } catch (EOFException eof) {
            return null;
          }
        }

        int keyLength = WritableUtils.readVInt(keyLenIn);

        // Sanity check
        if (keyLength < 0) {
          return null;
        }

        //Read another compressed 'key'
        key = deserializeKey(key);
        --noBufferedKeys;
      }

      return key;
    }

    @SuppressWarnings("unchecked")
    private Object deserializeKey(Object key) throws IOException {
      return keyDeserializer.deserialize(key);
    }

    /**
     * Read 'raw' values.
     * @param val - The 'raw' value
     * @return Returns the value length
     * @throws IOException
     */
    public synchronized int nextRawValue(ValueBytes val) 
      throws IOException {

      // Position stream to current value
      seekToCurrentValue();

      if (!blockCompressed) {
        int valLength = recordLength - keyLength;
        if (decompress) {
          CompressedBytes value = (CompressedBytes)val;
          value.reset(in, valLength);
        } else {
          UncompressedBytes value = (UncompressedBytes)val;
          value.reset(in, valLength);
        }

        return valLength;
      } else {
        int valLength = WritableUtils.readVInt(valLenIn);
        UncompressedBytes rawValue = (UncompressedBytes)val;
        rawValue.reset(valIn, valLength);
        --noBufferedValues;
        return valLength;
      }

    }

    private void handleChecksumException(ChecksumException e)
      throws IOException {
      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
      } else {
        throw e;
      }
    }

    /** Set the current byte position in the input file.
     *
     * <p>The position passed must be a position returned by {@link
     * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
     * position, use {@link SequenceFile.Reader#sync(long)}.
     */
    public synchronized void seek(long position) throws IOException {
      in.seek(position);
      if (blockCompressed) {                      // trigger block read
        noBufferedKeys = 0;
        valuesDecompressed = true;
      }
    }//指定到文件位置,position必须是 SequenceFile.Writer.getLength()返回的

    /** Seek to the next sync mark past a given position.*/
    public synchronized void sync(long position) throws IOException {
      if (position+SYNC_SIZE >= end) {
        seek(end);
        return;
      }

      try {
        seek(position+4);                         // skip escape
        in.readFully(syncCheck);
        int syncLen = sync.length;
        for (int i = 0; in.getPos() < end; i++) {
          int j = 0;
          for (; j < syncLen; j++) {
            if (sync[j] != syncCheck[(i+j)%syncLen])
              break;
          }
          if (j == syncLen) {
            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
            return;
          }
          syncCheck[i%syncLen] = in.readByte();
        }
      } catch (ChecksumException e) {             // checksum failure
        handleChecksumException(e);
      }
    }

    /** Returns true iff the previous call to next passed a sync mark.*/
    public boolean syncSeen() { return syncSeen; }

    /** Return the current byte position in the input file. */
    public synchronized long getPosition() throws IOException {
      return in.getPos();
    }

    /** Returns the name of the file. */
    public String toString() {
      return file.toString();
    }

  }

  /** Sorts key/value pairs in a sequence-format file.
   *
   * <p>For best performance, applications should make sure that the {@link
   * Writable#readFields(DataInput)} implementation of their keys is
   * very efficient.  In particular, it should avoid allocating memory.
   */
  public static class Sorter {

    private RawComparator comparator;

    private MergeSort mergeSort; //the implementation of merge sort

    private Path[] inFiles;                     // when merging or sorting

    private Path outFile;

    private int memory; // bytes
    private int factor; // merged per pass

    private FileSystem fs = null;

    private Class keyClass;
    private Class valClass;

    private Configuration conf;

    private Progressable progressable = null;

    /** Sort and merge files containing the named classes. */
    /*排序和混洗*/
    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
                  Class valClass, Configuration conf)  {
      this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
    }

    /** Sort and merge using an arbitrary {@link RawComparator}. */
    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
                  Class valClass, Configuration conf) {
      this.fs = fs;
      this.comparator = comparator;
      this.keyClass = keyClass;
      this.valClass = valClass;
      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
      this.factor = conf.getInt("io.sort.factor", 100);
      this.conf = conf;
    }

    /** Set the number of streams to merge at once.*/
    public void setFactor(int factor) { this.factor = factor; }

    /** Get the number of streams to merge at once.*/
    public int getFactor() { return factor; }

    /** Set the total amount of buffer memory, in bytes.*/
    public void setMemory(int memory) { this.memory = memory; }

    /** Get the total amount of buffer memory, in bytes.*/
    public int getMemory() { return memory; }

    /** Set the progressable object in order to report progress. */
    public void setProgressable(Progressable progressable) {
      this.progressable = progressable;
    }

    /** 
     * Perform a file sort from a set of input files into an output file.
     * @param inFiles the files to be sorted
     * @param outFile the sorted output file
     * @param deleteInput should the input files be deleted as they are read?
     */
    public void sort(Path[] inFiles, Path outFile,
                     boolean deleteInput) throws IOException {
      if (fs.exists(outFile)) {
        throw new IOException("already exists: " + outFile);
      }

      this.inFiles = inFiles;
      this.outFile = outFile;

      int segments = sortPass(deleteInput);
      if (segments > 1) {
        mergePass(outFile.getParent());
      }
    }

    /** 
     * Perform a file sort from a set of input files and return an iterator.
     * @param inFiles the files to be sorted
     * @param tempDir the directory where temp files are created during sort
     * @param deleteInput should the input files be deleted as they are read?
     * @return iterator the RawKeyValueIterator
     */
    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
                                              boolean deleteInput) throws IOException {
      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
      if (fs.exists(outFile)) {
        throw new IOException("already exists: " + outFile);
      }
      this.inFiles = inFiles;
      //outFile will basically be used as prefix for temp files in the cases
      //where sort outputs multiple sorted segments. For the single segment
      //case, the outputFile itself will contain the sorted data for that
      //segment
      this.outFile = outFile;

      int segments = sortPass(deleteInput);
      if (segments > 1)
        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
                     tempDir);
      else if (segments == 1)
        return merge(new Path[]{outFile}, true, tempDir);
      else return null;
    }

    /**
     * The backwards compatible interface to sort.
     * @param inFile the input file to sort
     * @param outFile the sorted output file
     */
    public void sort(Path inFile, Path outFile) throws IOException {
      sort(new Path[]{inFile}, outFile, false);
    }

    private int sortPass(boolean deleteInput) throws IOException {
      LOG.debug("running sort pass");
      SortPass sortPass = new SortPass();         // make the SortPass
      sortPass.setProgressable(progressable);
      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
      try {
        return sortPass.run(deleteInput);         // run it
      } finally {
        sortPass.close();                         // close it
      }
    }

    private class SortPass {
      private int memoryLimit = memory/4;
      private int recordLimit = 1000000;

      private DataOutputBuffer rawKeys = new DataOutputBuffer();
      private byte[] rawBuffer;

      private int[] keyOffsets = new int[1024];
      private int[] pointers = new int[keyOffsets.length];
      private int[] pointersCopy = new int[keyOffsets.length];
      private int[] keyLengths = new int[keyOffsets.length];
      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];

      private ArrayList segmentLengths = new ArrayList();

      private Reader in = null;
      private FSDataOutputStream out = null;
      private FSDataOutputStream indexOut = null;
      private Path outName;

      private Progressable progressable = null;

      public int run(boolean deleteInput) throws IOException {
        int segments = 0;
        int currentFile = 0;
        boolean atEof = (currentFile >= inFiles.length);
        boolean isCompressed = false;
        boolean isBlockCompressed = false;
        CompressionCodec codec = null;
        segmentLengths.clear();
        if (atEof) {
          return 0;
        }

        // Initialize
        in = new Reader(fs, inFiles[currentFile], conf);
        isCompressed = in.isCompressed();
        isBlockCompressed = in.isBlockCompressed();
        codec = in.getCompressionCodec();

        for (int i=0; i < rawValues.length; ++i) {
          rawValues[i] = null;
        }

        while (!atEof) {
          int count = 0;
          int bytesProcessed = 0;
          rawKeys.reset();
          while (!atEof && 
                 bytesProcessed < memoryLimit && count < recordLimit) {

            // Read a record into buffer
            // Note: Attempt to re-use 'rawValue' as far as possible
            int keyOffset = rawKeys.getLength();       
            ValueBytes rawValue = 
              (count == keyOffsets.length || rawValues[count] == null) ? 
              in.createValueBytes() : 
              rawValues[count];
            int recordLength = in.nextRaw(rawKeys, rawValue);
            if (recordLength == -1) {
              in.close();
              if (deleteInput) {
                fs.delete(inFiles[currentFile], true);
              }
              currentFile += 1;
              atEof = currentFile >= inFiles.length;
              if (!atEof) {
                in = new Reader(fs, inFiles[currentFile], conf);
              } else {
                in = null;
              }
              continue;
            }

            int keyLength = rawKeys.getLength() - keyOffset;

            if (count == keyOffsets.length)
              grow();

            keyOffsets[count] = keyOffset;                // update pointers
            pointers[count] = count;
            keyLengths[count] = keyLength;
            rawValues[count] = rawValue;

            bytesProcessed += recordLength; 
            count++;
          }

          // buffer is full -- sort & flush it
          LOG.debug("flushing segment " + segments);
          rawBuffer = rawKeys.getData();
          sort(count);
          // indicate we're making progress
          if (progressable != null) {
            progressable.progress();
          }
          flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec, 
                segments==0 && atEof);
          segments++;
        }
        return segments;
      }

      public void close() throws IOException {
        if (in != null) {
          in.close();
        }
        if (out != null) {
          out.close();
        }
        if (indexOut != null) {
          indexOut.close();
        }
      }

      private void grow() {
        int newLength = keyOffsets.length * 3 / 2;
        keyOffsets = grow(keyOffsets, newLength);
        pointers = grow(pointers, newLength);
        pointersCopy = new int[newLength];
        keyLengths = grow(keyLengths, newLength);
        rawValues = grow(rawValues, newLength);
      }

      private int[] grow(int[] old, int newLength) {
        int[] result = new int[newLength];
        System.arraycopy(old, 0, result, 0, old.length);
        return result;
      }

      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
        ValueBytes[] result = new ValueBytes[newLength];
        System.arraycopy(old, 0, result, 0, old.length);
        for (int i=old.length; i < newLength; ++i) {
          result[i] = null;
        }
        return result;
      }

      private void flush(int count, int bytesProcessed, boolean isCompressed, 
                         boolean isBlockCompressed, CompressionCodec codec, boolean done) 
        throws IOException {
        if (out == null) {
          outName = done ? outFile : outFile.suffix(".0");
          out = fs.create(outName);
          if (!done) {
            indexOut = fs.create(outName.suffix(".index"));
          }
        }

        long segmentStart = out.getPos();
        Writer writer = createWriter(conf, out, keyClass, valClass, 
                                     isCompressed, isBlockCompressed, codec, 
                                     new Metadata());

        if (!done) {
          writer.sync = null;                     // disable sync on temp files
        }

        for (int i = 0; i < count; i++) {         // write in sorted order
          int p = pointers[i];
          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
        }
        writer.close();

        if (!done) {
          // Save the segment length
          WritableUtils.writeVLong(indexOut, segmentStart);
          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
          indexOut.flush();
        }
      }

      private void sort(int count) {
        System.arraycopy(pointers, 0, pointersCopy, 0, count);
        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
      }
      class SeqFileComparator implements Comparator<IntWritable> {
        public int compare(IntWritable I, IntWritable J) {
          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
                                    keyLengths[I.get()], rawBuffer, 
                                    keyOffsets[J.get()], keyLengths[J.get()]);
        }
      }

      /** set the progressable object in order to report progress */
      public void setProgressable(Progressable progressable)
      {
        this.progressable = progressable;
      }

    } // SequenceFile.Sorter.SortPass

    /** The interface to iterate over raw keys/values of SequenceFiles. */
    public static interface RawKeyValueIterator {
      /** Gets the current raw key
       * @return DataOutputBuffer
       * @throws IOException
       */
      DataOutputBuffer getKey() throws IOException; 
      /** Gets the current raw value
       * @return ValueBytes 
       * @throws IOException
       */
      ValueBytes getValue() throws IOException; 
      /** Sets up the current key and value (for getKey and getValue)
       * @return true if there exists a key/value, false otherwise 
       * @throws IOException
       */
      boolean next() throws IOException;
      /** closes the iterator so that the underlying streams can be closed
       * @throws IOException
       */
      void close() throws IOException;
      /** Gets the Progress object; this has a float (0.0 - 1.0) 
       * indicating the bytes processed by the iterator so far
       */
      Progress getProgress();
    }    

    /**
     * Merges the list of segments of type <code>SegmentDescriptor</code>
     * @param segments the list of SegmentDescriptors
     * @param tmpDir the directory to write temporary files into
     * @return RawKeyValueIterator
     * @throws IOException
     */
    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
                                     Path tmpDir) 
      throws IOException {
      // pass in object to report progress, if present
      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
      return mQueue.merge();
    }

    /**
     * Merges the contents of files passed in Path[] using a max factor value
     * that is already set
     * @param inNames the array of path names
     * @param deleteInputs true if the input files should be deleted when 
     * unnecessary
     * @param tmpDir the directory to write temporary files into
     * @return RawKeyValueIteratorMergeQueue
     * @throws IOException
     */
    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
                                     Path tmpDir) 
      throws IOException {
      return merge(inNames, deleteInputs, 
                   (inNames.length < factor) ? inNames.length : factor,
                   tmpDir);
    }

    /**
     * Merges the contents of files passed in Path[]
     * @param inNames the array of path names
     * @param deleteInputs true if the input files should be deleted when 
     * unnecessary
     * @param factor the factor that will be used as the maximum merge fan-in
     * @param tmpDir the directory to write temporary files into
     * @return RawKeyValueIteratorMergeQueue
     * @throws IOException
     */
    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
                                     int factor, Path tmpDir) 
      throws IOException {
      //get the segments from inNames
      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
      for (int i = 0; i < inNames.length; i++) {
        SegmentDescriptor s = new SegmentDescriptor(0, 
                                                    fs.getLength(inNames[i]), inNames[i]);
        s.preserveInput(!deleteInputs);
        s.doSync();
        a.add(s);
      }
      this.factor = factor;
      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
      return mQueue.merge();
    }

    /**
     * Merges the contents of files passed in Path[]
     * @param inNames the array of path names
     * @param tempDir the directory for creating temp files during merge
     * @param deleteInputs true if the input files should be deleted when 
     * unnecessary
     * @return RawKeyValueIteratorMergeQueue
     * @throws IOException
     */
    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
                                     boolean deleteInputs) 
      throws IOException {
      //outFile will basically be used as prefix for temp files for the
      //intermediate merge outputs           
      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
      //get the segments from inNames
      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
      for (int i = 0; i < inNames.length; i++) {
        SegmentDescriptor s = new SegmentDescriptor(0, 
                                                    fs.getLength(inNames[i]), inNames[i]);
        s.preserveInput(!deleteInputs);
        s.doSync();
        a.add(s);
      }
      factor = (inNames.length < factor) ? inNames.length : factor;
      // pass in object to report progress, if present
      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
      return mQueue.merge();
    }

    /**
     * Clones the attributes (like compression of the input file and creates a 
     * corresponding Writer
     * @param inputFile the path of the input file whose attributes should be 
     * cloned
     * @param outputFile the path of the output file 
     * @param prog the Progressable to report status during the file write
     * @return Writer
     * @throws IOException
     */
    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
                                      Progressable prog) 
    throws IOException {
      FileSystem srcFileSys = inputFile.getFileSystem(conf);
      Reader reader = new Reader(srcFileSys, inputFile, 4096, conf, true);
      boolean compress = reader.isCompressed();
      boolean blockCompress = reader.isBlockCompressed();
      CompressionCodec codec = reader.getCompressionCodec();
      reader.close();

      Writer writer = createWriter(outputFile.getFileSystem(conf), conf, 
                                   outputFile, keyClass, valClass, compress, 
                                   blockCompress, codec, prog,
                                   new Metadata());
      return writer;
    }

    /**
     * Writes records from RawKeyValueIterator into a file represented by the 
     * passed writer
     * @param records the RawKeyValueIterator
     * @param writer the Writer created earlier 
     * @throws IOException
     */
    public void writeFile(RawKeyValueIterator records, Writer writer) 
      throws IOException {
      while(records.next()) {
        writer.appendRaw(records.getKey().getData(), 0, 
                         records.getKey().getLength(), records.getValue());
      }
      writer.sync();
    }

    /** Merge the provided files.
     * @param inFiles the array of input path names
     * @param outFile the final output file
     * @throws IOException
     */
    public void merge(Path[] inFiles, Path outFile) throws IOException {
      if (fs.exists(outFile)) {
        throw new IOException("already exists: " + outFile);
      }
      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);

      writeFile(r, writer);

      writer.close();
    }

    /** sort calls this to generate the final merged output */
    private int mergePass(Path tmpDir) throws IOException {
      LOG.debug("running merge pass");
      Writer writer = cloneFileAttributes(
                                          outFile.suffix(".0"), outFile, null);
      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
                                    outFile.suffix(".0.index"), tmpDir);
      writeFile(r, writer);

      writer.close();
      return 0;
    }

    /** Used by mergePass to merge the output of the sort
     * @param inName the name of the input file containing sorted segments
     * @param indexIn the offsets of the sorted segments
     * @param tmpDir the relative directory to store intermediate results in
     * @return RawKeyValueIterator
     * @throws IOException
     */
    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
      throws IOException {
      //get the segments from indexIn
      //we create a SegmentContainer so that we can track segments belonging to
      //inName and delete inName as soon as we see that we have looked at all
      //the contained segments during the merge process & hence don't need 
      //them anymore
      SegmentContainer container = new SegmentContainer(inName, indexIn);
      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
      return mQueue.merge();
    }

    /** This class implements the core of the merge logic */
    private class MergeQueue extends PriorityQueue 
      implements RawKeyValueIterator {
      private boolean compress;
      private boolean blockCompress;
      private DataOutputBuffer rawKey = new DataOutputBuffer();
      private ValueBytes rawValue;
      private long totalBytesProcessed;
      private float progPerByte;
      private Progress mergeProgress = new Progress();
      private Path tmpDir;
      private Progressable progress = null; //handle to the progress reporting object
      private SegmentDescriptor minSegment;

      //a TreeMap used to store the segments sorted by size (segment offset and
      //segment path name is used to break ties between segments of same sizes)
      private Map<SegmentDescriptor, Void> sortedSegmentSizes =
        new TreeMap<SegmentDescriptor, Void>();

      @SuppressWarnings("unchecked")
      public void put(SegmentDescriptor stream) throws IOException {
        if (size() == 0) {
          compress = stream.in.isCompressed();
          blockCompress = stream.in.isBlockCompressed();
        } else if (compress != stream.in.isCompressed() || 
                   blockCompress != stream.in.isBlockCompressed()) {
          throw new IOException("All merged files must be compressed or not.");
        } 
        super.put(stream);
      }

      /**
       * A queue of file segments to merge
       * @param segments the file segments to merge
       * @param tmpDir a relative local directory to save intermediate files in
       * @param progress the reference to the Progressable object
       */
      public MergeQueue(List <SegmentDescriptor> segments,
          Path tmpDir, Progressable progress) {
        int size = segments.size();
        for (int i = 0; i < size; i++) {
          sortedSegmentSizes.put(segments.get(i), null);
        }
        this.tmpDir = tmpDir;
        this.progress = progress;
      }
      protected boolean lessThan(Object a, Object b) {
        // indicate we're making progress
        if (progress != null) {
          progress.progress();
        }
        SegmentDescriptor msa = (SegmentDescriptor)a;
        SegmentDescriptor msb = (SegmentDescriptor)b;
        return comparator.compare(msa.getKey().getData(), 0, 
                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
                                  msb.getKey().getLength()) < 0;
      }
      public void close() throws IOException {
        SegmentDescriptor ms;                           // close inputs
        while ((ms = (SegmentDescriptor)pop()) != null) {
          ms.cleanup();
        }
        minSegment = null;
      }
      public DataOutputBuffer getKey() throws IOException {
        return rawKey;
      }
      public ValueBytes getValue() throws IOException {
        return rawValue;
      }
      public boolean next() throws IOException {
        if (size() == 0)
          return false;
        if (minSegment != null) {
          //minSegment is non-null for all invocations of next except the first
          //one. For the first invocation, the priority queue is ready for use
          //but for the subsequent invocations, first adjust the queue 
          adjustPriorityQueue(minSegment);
          if (size() == 0) {
            minSegment = null;
            return false;
          }
        }
        minSegment = (SegmentDescriptor)top();
        long startPos = minSegment.in.getPosition(); // Current position in stream
        //save the raw key reference
        rawKey = minSegment.getKey();
        //load the raw value. Re-use the existing rawValue buffer
        if (rawValue == null) {
          rawValue = minSegment.in.createValueBytes();
        }
        minSegment.nextRawValue(rawValue);
        long endPos = minSegment.in.getPosition(); // End position after reading value
        updateProgress(endPos - startPos);
        return true;
      }

      public Progress getProgress() {
        return mergeProgress; 
      }

      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
        long startPos = ms.in.getPosition(); // Current position in stream
        boolean hasNext = ms.nextRawKey();
        long endPos = ms.in.getPosition(); // End position after reading key
        updateProgress(endPos - startPos);
        if (hasNext) {
          adjustTop();
        } else {
          pop();
          ms.cleanup();
        }
      }

      private void updateProgress(long bytesProcessed) {
        totalBytesProcessed += bytesProcessed;
        if (progPerByte > 0) {
          mergeProgress.set(totalBytesProcessed * progPerByte);
        }
      }

      /** This is the single level merge that is called multiple times 
       * depending on the factor size and the number of segments
       * @return RawKeyValueIterator
       * @throws IOException
       */
      public RawKeyValueIterator merge() throws IOException {
        //create the MergeStreams from the sorted map created in the constructor
        //and dump the final output to a file
        int numSegments = sortedSegmentSizes.size();
        int origFactor = factor;
        int passNo = 1;
        LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
        do {
          //get the factor for this pass of merge
          factor = getPassFactor(passNo, numSegments);
          List<SegmentDescriptor> segmentsToMerge =
            new ArrayList<SegmentDescriptor>();
          int segmentsConsidered = 0;
          int numSegmentsToConsider = factor;
          while (true) {
            //extract the smallest 'factor' number of segment pointers from the 
            //TreeMap. Call cleanup on the empty segments (no key/value data)
            SegmentDescriptor[] mStream = 
              getSegmentDescriptors(numSegmentsToConsider);
            for (int i = 0; i < mStream.length; i++) {
              if (mStream[i].nextRawKey()) {
                segmentsToMerge.add(mStream[i]);
                segmentsConsidered++;
                // Count the fact that we read some bytes in calling nextRawKey()
                updateProgress(mStream[i].in.getPosition());
              }
              else {
                mStream[i].cleanup();
                numSegments--; //we ignore this segment for the merge
              }
            }
            //if we have the desired number of segments
            //or looked at all available segments, we break
            if (segmentsConsidered == factor || 
                sortedSegmentSizes.size() == 0) {
              break;
            }

            numSegmentsToConsider = factor - segmentsConsidered;
          }
          //feed the streams to the priority queue
          initialize(segmentsToMerge.size()); clear();
          for (int i = 0; i < segmentsToMerge.size(); i++) {
            put(segmentsToMerge.get(i));
          }
          //if we have lesser number of segments remaining, then just return the
          //iterator, else do another single level merge
          if (numSegments <= factor) {
            //calculate the length of the remaining segments. Required for 
            //calculating the merge progress
            long totalBytes = 0;
            for (int i = 0; i < segmentsToMerge.size(); i++) {
              totalBytes += segmentsToMerge.get(i).segmentLength;
            }
            if (totalBytes != 0) //being paranoid
              progPerByte = 1.0f / (float)totalBytes;
            //reset factor to what it originally was
            factor = origFactor;
            return this;
          } else {
            //we want to spread the creation of temp files on multiple disks if 
            //available under the space constraints
            long approxOutputSize = 0; 
            for (SegmentDescriptor s : segmentsToMerge) {
              approxOutputSize += s.segmentLength + 
                                  ChecksumFileSystem.getApproxChkSumLength(
                                  s.segmentLength);
            }
            Path tmpFilename = 
              new Path(tmpDir, "intermediate").suffix("." + passNo);

            Path outputFile =  lDirAlloc.getLocalPathForWrite(
                                                tmpFilename.toString(),
                                                approxOutputSize, conf);
            LOG.debug("writing intermediate results to " + outputFile);
            Writer writer = cloneFileAttributes(
                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
                                                fs.makeQualified(outputFile), null);
            writer.sync = null; //disable sync for temp files
            writeFile(this, writer);
            writer.close();

            //we finished one single level merge; now clean up the priority 
            //queue
            this.close();

            SegmentDescriptor tempSegment = 
              new SegmentDescriptor(0, fs.getLength(outputFile), outputFile);
            //put the segment back in the TreeMap
            sortedSegmentSizes.put(tempSegment, null);
            numSegments = sortedSegmentSizes.size();
            passNo++;
          }
          //we are worried about only the first pass merge factor. So reset the 
          //factor to what it originally was
          factor = origFactor;
        } while(true);
      }

      //Hadoop-591
      public int getPassFactor(int passNo, int numSegments) {
        if (passNo > 1 || numSegments <= factor || factor == 1) 
          return factor;
        int mod = (numSegments - 1) % (factor - 1);
        if (mod == 0)
          return factor;
        return mod + 1;
      }

      /** Return (& remove) the requested number of segment descriptors from the
       * sorted map.
       */
      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
        if (numDescriptors > sortedSegmentSizes.size())
          numDescriptors = sortedSegmentSizes.size();
        SegmentDescriptor[] SegmentDescriptors = 
          new SegmentDescriptor[numDescriptors];
        Iterator iter = sortedSegmentSizes.keySet().iterator();
        int i = 0;
        while (i < numDescriptors) {
          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
          iter.remove();
        }
        return SegmentDescriptors;
      }
    } // SequenceFile.Sorter.MergeQueue

    /** This class defines a merge segment. This class can be subclassed to 
     * provide a customized cleanup method implementation. In this 
     * implementation, cleanup closes the file handle and deletes the file 
     */
    public class SegmentDescriptor implements Comparable {

      long segmentOffset; //the start of the segment in the file
      long segmentLength; //the length of the segment
      Path segmentPathName; //the path name of the file containing the segment
      boolean ignoreSync = true; //set to true for temp files
      private Reader in = null; 
      private DataOutputBuffer rawKey = null; //this will hold the current key
      private boolean preserveInput = false; //delete input segment files?

      /** Constructs a segment
       * @param segmentOffset the offset of the segment in the file
       * @param segmentLength the length of the segment
       * @param segmentPathName the path name of the file containing the segment
       */
      public SegmentDescriptor (long segmentOffset, long segmentLength, 
                                Path segmentPathName) {
        this.segmentOffset = segmentOffset;
        this.segmentLength = segmentLength;
        this.segmentPathName = segmentPathName;
      }

      /** Do the sync checks */
      public void doSync() {ignoreSync = false;}

      /** Whether to delete the files when no longer needed */
      public void preserveInput(boolean preserve) {
        preserveInput = preserve;
      }

      public boolean shouldPreserveInput() {
        return preserveInput;
      }

      public int compareTo(Object o) {
        SegmentDescriptor that = (SegmentDescriptor)o;
        if (this.segmentLength != that.segmentLength) {
          return (this.segmentLength < that.segmentLength ? -1 : 1);
        }
        if (this.segmentOffset != that.segmentOffset) {
          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
        }
        return (this.segmentPathName.toString()).
          compareTo(that.segmentPathName.toString());
      }

      public boolean equals(Object o) {
        if (!(o instanceof SegmentDescriptor)) {
          return false;
        }
        SegmentDescriptor that = (SegmentDescriptor)o;
        if (this.segmentLength == that.segmentLength &&
            this.segmentOffset == that.segmentOffset &&
            this.segmentPathName.toString().equals(
              that.segmentPathName.toString())) {
          return true;
        }
        return false;
      }

      public int hashCode() {
        return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
      }

      /** Fills up the rawKey object with the key returned by the Reader
       * @return true if there is a key returned; false, otherwise
       * @throws IOException
       */
      public boolean nextRawKey() throws IOException {
        if (in == null) {
          int bufferSize = conf.getInt("io.file.buffer.size", 4096); 
          if (fs.getUri().getScheme().startsWith("ramfs")) {
            bufferSize = conf.getInt("io.bytes.per.checksum", 512);
          }
          Reader reader = new Reader(fs, segmentPathName, 
                                     bufferSize, segmentOffset, 
                                     segmentLength, conf, false);

          //sometimes we ignore syncs especially for temp merge files
          if (ignoreSync) reader.sync = null;

          if (reader.getKeyClass() != keyClass)
            throw new IOException("wrong key class: " + reader.getKeyClass() +
                                  " is not " + keyClass);
          if (reader.getValueClass() != valClass)
            throw new IOException("wrong value class: "+reader.getValueClass()+
                                  " is not " + valClass);
          this.in = reader;
          rawKey = new DataOutputBuffer();
        }
        rawKey.reset();
        int keyLength = 
          in.nextRawKey(rawKey);
        return (keyLength >= 0);
      }

      /** Fills up the passed rawValue with the value corresponding to the key
       * read earlier
       * @param rawValue
       * @return the length of the value
       * @throws IOException
       */
      public int nextRawValue(ValueBytes rawValue) throws IOException {
        int valLength = in.nextRawValue(rawValue);
        return valLength;
      }

      /** Returns the stored rawKey */
      public DataOutputBuffer getKey() {
        return rawKey;
      }

      /** closes the underlying reader */
      private void close() throws IOException {
        this.in.close();
        this.in = null;
      }

      /** The default cleanup. Subclasses can override this with a custom 
       * cleanup 
       */
      public void cleanup() throws IOException {
        close();
        if (!preserveInput) {
          fs.delete(segmentPathName, true);
        }
      }
    } // SequenceFile.Sorter.SegmentDescriptor

    /** This class provisions multiple segments contained within a single
     *  file
     */
    private class LinkedSegmentsDescriptor extends SegmentDescriptor {

      SegmentContainer parentContainer = null;

      /** Constructs a segment
       * @param segmentOffset the offset of the segment in the file
       * @param segmentLength the length of the segment
       * @param segmentPathName the path name of the file containing the segment
       * @param parent the parent SegmentContainer that holds the segment
       */
      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
                                       Path segmentPathName, SegmentContainer parent) {
        super(segmentOffset, segmentLength, segmentPathName);
        this.parentContainer = parent;
      }
      /** The default cleanup. Subclasses can override this with a custom 
       * cleanup 
       */
      public void cleanup() throws IOException {
        super.close();
        if (super.shouldPreserveInput()) return;
        parentContainer.cleanup();
      }
    } //SequenceFile.Sorter.LinkedSegmentsDescriptor

    /** The class that defines a container for segments to be merged. Primarily
     * required to delete temp files as soon as all the contained segments
     * have been looked at */
    private class SegmentContainer {
      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
      private int numSegmentsContained; //# of segments contained
      private Path inName; //input file from where segments are created

      //the list of segments read from the file
      private ArrayList <SegmentDescriptor> segments = 
        new ArrayList <SegmentDescriptor>();
      /** This constructor is there primarily to serve the sort routine that 
       * generates a single output file with an associated index file */
      public SegmentContainer(Path inName, Path indexIn) throws IOException {
        //get the segments from indexIn
        FSDataInputStream fsIndexIn = fs.open(indexIn);
        long end = fs.getLength(indexIn);
        while (fsIndexIn.getPos() < end) {
          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
          long segmentLength = WritableUtils.readVLong(fsIndexIn);
          Path segmentName = inName;
          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
                                                    segmentLength, segmentName, this));
        }
        fsIndexIn.close();
        fs.delete(indexIn, true);
        numSegmentsContained = segments.size();
        this.inName = inName;
      }

      public List <SegmentDescriptor> getSegmentList() {
        return segments;
      }
      public void cleanup() throws IOException {
        numSegmentsCleanedUp++;
        if (numSegmentsCleanedUp == numSegmentsContained) {
          fs.delete(inName, true);
        }
      }
    } //SequenceFile.Sorter.SegmentContainer

  } // SequenceFile.Sorter

} // SequenceFile

2.MapFile映射文件

MapFile使用的是java的map类型存储数据,使用key-value的形式,其中的数据是SequenceFile经Sortr之后的数据。MapFile分为部分,一是索引index,二是数据data。该类使用内部类Writer和Reader,这两个类中也使用了MapFile中的方法。
MapFile有两个直接的子类ArraryFile和SetFile,ArraryFile中定义了一个稠密型的MapFile且键值都是整型的,SetFile只有key,所有的value都是NUllWritable的。


package org.apache.hadoop.io;

import java.io.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;

public class MapFile {
  private static final Log LOG = LogFactory.getLog(MapFile.class);//配置对象

  /** The name of the index file. */
  public static final String INDEX_FILE_NAME = "index";//代表索引文件的名称

  /** The name of the data file. */
  public static final String DATA_FILE_NAME = "data";//数据文件的名称

  protected MapFile() {}                          // no public ctor

  /** Writes a new map. */
  public static class Writer implements java.io.Closeable {
    private SequenceFile.Writer data;//代表对数据文件进行写的Writer
    private SequenceFile.Writer index;//代表对索引文件进行写的Writer

    final private static String INDEX_INTERVAL = "io.map.index.interval";//与索引间隔相对应的属性字符串
    private int indexInterval = 128;//每隔128个键值对设置一个索引

    private long size;//添加到data文件中的键值对个数
    private LongWritable position = new LongWritable();//读取的位置信息

    // the following fields are used only for checking key order
    //下面四个用于检测key值的顺序
    private WritableComparator comparator;
    private DataInputBuffer inBuf = new DataInputBuffer();
    private DataOutputBuffer outBuf = new DataOutputBuffer();
    private WritableComparable lastKey;


    /** Create the named map for keys of the named class. */
    public Writer(Configuration conf, FileSystem fs, String dirName,
                  Class<? extends WritableComparable> keyClass, Class valClass)
      throws IOException {
      this(conf, fs, dirName,
           WritableComparator.get(keyClass), valClass,
           SequenceFile.getCompressionType(conf));
    }

    /** Create the named map for keys of the named class. */
    public Writer(Configuration conf, FileSystem fs, String dirName,
                  Class<? extends WritableComparable> keyClass, Class valClass,
                  CompressionType compress, Progressable progress)
      throws IOException {
      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
           compress, progress);
    }

    /** Create the named map for keys of the named class. */
    public Writer(Configuration conf, FileSystem fs, String dirName,
                  Class<? extends WritableComparable> keyClass, Class valClass,
                  CompressionType compress, CompressionCodec codec,
                  Progressable progress)
      throws IOException {
      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
           compress, codec, progress);
    }

    /** Create the named map for keys of the named class. */
    public Writer(Configuration conf, FileSystem fs, String dirName,
                  Class<? extends WritableComparable> keyClass, Class valClass,
                  CompressionType compress)
      throws IOException {
      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress);
    }

    /** Create the named map using the named key comparator. */
    public Writer(Configuration conf, FileSystem fs, String dirName,
                  WritableComparator comparator, Class valClass)
      throws IOException {
      this(conf, fs, dirName, comparator, valClass,
           SequenceFile.getCompressionType(conf));
    }
    /** Create the named map using the named key comparator. */
    public Writer(Configuration conf, FileSystem fs, String dirName,
                  WritableComparator comparator, Class valClass,
                  SequenceFile.CompressionType compress)
      throws IOException {
      this(conf, fs, dirName, comparator, valClass, compress, null);
    }
    /** Create the named map using the named key comparator. */
    public Writer(Configuration conf, FileSystem fs, String dirName,
                  WritableComparator comparator, Class valClass,
                  SequenceFile.CompressionType compress,
                  Progressable progress)
      throws IOException {
      this(conf, fs, dirName, comparator, valClass, 
           compress, new DefaultCodec(), progress);
    }
    /** Create the named map using the named key comparator. */
    public Writer(Configuration conf, FileSystem fs, String dirName,
                  WritableComparator comparator, Class valClass,
                  SequenceFile.CompressionType compress, CompressionCodec codec,
                  Progressable progress)
      throws IOException {

      this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);

      this.comparator = comparator;
      this.lastKey = comparator.newKey();

      Path dir = new Path(dirName);
      if (!fs.mkdirs(dir)) {
        throw new IOException("Mkdirs failed to create directory " + dir.toString());
      }
      Path dataFile = new Path(dir, DATA_FILE_NAME);
      Path indexFile = new Path(dir, INDEX_FILE_NAME);

      Class keyClass = comparator.getKeyClass();
      this.data =
        SequenceFile.createWriter
        (fs, conf, dataFile, keyClass, valClass, compress, codec, progress);
      this.index =
        SequenceFile.createWriter
        (fs, conf, indexFile, keyClass, LongWritable.class,
         CompressionType.BLOCK, progress);
    }

    /** The number of entries that are added before an index entry is added.*/
    public int getIndexInterval() { return indexInterval; } //返回索引间隔

    /** Sets the index interval.
     * @see #getIndexInterval()
     *///设置索引间隔
    public void setIndexInterval(int interval) { indexInterval = interval; }

    /** Sets the index interval and stores it in conf
     * @see #getIndexInterval()
     *///设置索引间隔并存储在conf中
    public static void setIndexInterval(Configuration conf, int interval) {
      conf.setInt(INDEX_INTERVAL, interval);
    }

    /** Close the map. */
    public synchronized void close() throws IOException {
      data.close();
      index.close();
    }//关闭map

    /** Append a key/value pair to the map.  The key must be greater or equal
     * to the previous key added to the map. */
    /*添加一个key到map中,如果size是indexInterval的倍数,则新建一个index
     * 然后将key添加进去,否则则直接添加key*/
    public synchronized void append(WritableComparable key, Writable val)
      throws IOException {

      checkKey(key);

      if (size % indexInterval == 0) {            // add an index entry
        position.set(data.getLength());           // point to current eof
        index.append(key, position);
      }

      data.append(key, val);                      // append key/value to data
      size++;
    }

    private void checkKey(WritableComparable key) throws IOException {
      // check that keys are well-ordered检查key是否顺序正确
      if (size != 0 && comparator.compare(lastKey, key) > 0)
        throw new IOException("key out of order: "+key+" after "+lastKey);

      // update lastKey with a copy of key by writing and reading
      outBuf.reset();
      key.write(outBuf);                          // write new key

      inBuf.reset(outBuf.getData(), outBuf.getLength());
      lastKey.readFields(inBuf);                  // read into lastKey
    }

  }

  /** Provide access to an existing map. 
   * 提供一个通道使可以独处map*/
  public static class Reader implements java.io.Closeable {

    /** Number of index entries to skip between each entry.  Zero by default.
     * Setting this to values larger than zero can facilitate opening large map
     * files using less memory. */
    private int INDEX_SKIP = 0;//代表跳过的索引数,设置大于零的数可实现小内存打开大文件

    private WritableComparator comparator;//key使用的比较器

    private WritableComparable nextKey;//将要读取的下一个key
    private long seekPosition = -1;
    private int seekIndex = -1;//上面两个标识用于读取的位置信息
    private long firstPosition;

    // the data, on disk用于标识用于读取数据和索引的Reader
    private SequenceFile.Reader data;
    private SequenceFile.Reader index;

    // whether the index Reader was closed
    private boolean indexClosed = false;//Reader是否关闭

    // the index, in memory
    private int count = -1;//index存放在内存中的数量
    private WritableComparable[] keys;//存储内存中的key集合
    private long[] positions;//key的位置

    /** Returns the class of keys in this file. 
     * 返回key的类型*/
    public Class<?> getKeyClass() { return data.getKeyClass(); }

    /** Returns the class of values in this file. 
     * 返回value的类型*/
    public Class<?> getValueClass() { return data.getValueClass(); }

    /** Construct a map reader for the named map.*/
    public Reader(FileSystem fs, String dirName, Configuration conf) throws IOException {
      this(fs, dirName, null, conf);
      INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
    }

    /** Construct a map reader for the named map using the named comparator.*/
    public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf)
      throws IOException {
      this(fs, dirName, comparator, conf, true);
    }

    /**
     * Hook to allow subclasses to defer opening streams until further
     * initialization is complete.
     * @see #createDataFileReader(FileSystem, Path, Configuration)
     */
    protected Reader(FileSystem fs, String dirName,
        WritableComparator comparator, Configuration conf, boolean open)
      throws IOException {

      if (open) {
        open(fs, dirName, comparator, conf);
      }
    }

    protected synchronized void open(FileSystem fs, String dirName,
        WritableComparator comparator, Configuration conf) throws IOException {
      Path dir = new Path(dirName);
      Path dataFile = new Path(dir, DATA_FILE_NAME);
      Path indexFile = new Path(dir, INDEX_FILE_NAME);

      // open the data
      this.data = createDataFileReader(fs, dataFile, conf);
      this.firstPosition = data.getPosition();

      if (comparator == null)
        this.comparator = WritableComparator.get(data.getKeyClass().asSubclass(WritableComparable.class));
      else
        this.comparator = comparator;

      // open the index
      this.index = new SequenceFile.Reader(fs, indexFile, conf);
    }//打开文件和索引文件

    /**
     * Override this method to specialize the type of
     * {@link SequenceFile.Reader} returned.
     */
    protected SequenceFile.Reader createDataFileReader(FileSystem fs,
        Path dataFile, Configuration conf) throws IOException {
      return new SequenceFile.Reader(fs, dataFile,  conf);
    }

    private void readIndex() throws IOException {
      // read the index entirely into memory
      if (this.keys != null)
        return;
      this.count = 0;
      this.keys = new WritableComparable[1024];
      this.positions = new long[1024];
      try {
        int skip = INDEX_SKIP;
        LongWritable position = new LongWritable();
        WritableComparable lastKey = null;
        while (true) {
          WritableComparable k = comparator.newKey();

          if (!index.next(k, position))
            break;

          // check order to make sure comparator is compatible
          if (lastKey != null && comparator.compare(lastKey, k) > 0)
            throw new IOException("key out of order: "+k+" after "+lastKey);
          lastKey = k;

          if (skip > 0) {
            skip--;
            continue;                             // skip this entry
          } else {
            skip = INDEX_SKIP;                    // reset skip
          }

          if (count == keys.length) {                // time to grow arrays
            int newLength = (keys.length*3)/2;
            WritableComparable[] newKeys = new WritableComparable[newLength];
            long[] newPositions = new long[newLength];
            System.arraycopy(keys, 0, newKeys, 0, count);
            System.arraycopy(positions, 0, newPositions, 0, count);
            keys = newKeys;
            positions = newPositions;
          }

          keys[count] = k;
          positions[count] = position.get();
          count++;
        }
      } catch (EOFException e) {
        LOG.warn("Unexpected EOF reading " + index +
                              " at entry #" + count + ".  Ignoring.");
      } finally {
    indexClosed = true;
        index.close();
      }
    }//将索引文件读进内存

    /** Re-positions the reader before its first key. */
    public synchronized void reset() throws IOException {
      data.seek(firstPosition);
    }//将data的读取器置于第一个key之前

    /** Get the key at approximately the middle of the file.
     * 
     * @throws IOException
     */
    public synchronized WritableComparable midKey() throws IOException {

      readIndex();
      int pos = ((count - 1) / 2);              // middle of the index
      if (pos < 0) {
        throw new IOException("MapFile empty");
      }

      return keys[pos];
    }//取得文件中大约是中间的key

    /** Reads the final key from the file.
     *
     * @param key key to read into
     */
    public synchronized void finalKey(WritableComparable key)
      throws IOException {

      long originalPosition = data.getPosition(); // save position
      try {
        readIndex();                              // make sure index is valid
        if (count > 0) {
          data.seek(positions[count-1]);          // skip to last indexed entry
        } else {
          reset();                                // start at the beginning
        }
        while (data.next(key)) {}                 // scan to eof

      } finally {
        data.seek(originalPosition);              // restore position
      }
    }//得到最后一个key值

    /*
     */
    public synchronized boolean seek(WritableComparable key) throws IOException {
      return seekInternal(key) == 0;
    }//是否找到key

    /** 
     * 下面的方法用于:
     *定位到指定的key,如果找到返回0,
     * 返回1代表在下一个记录中,
     * 等于1说明到文件末尾
     */
    private synchronized int seekInternal(WritableComparable key)
      throws IOException {
      return seekInternal(key, false);
    }

    /** 
     * Positions the reader at the named key, or if none such exists, at the
     * key that falls just before or just after dependent on how the
     * <code>before</code> parameter is set.
     * 
     * @param before - IF true, and <code>key</code> does not exist, position
     * file at entry that falls just before <code>key</code>.  Otherwise,
     * position file at record that sorts just after.
     * @return  0   - exact match found
     *          < 0 - positioned at next record
     *          1   - no more records in file
     */
    private synchronized int seekInternal(WritableComparable key,
        final boolean before)
      throws IOException {
      readIndex();                                // make sure index is read

      if (seekIndex != -1                         // seeked before
          && seekIndex+1 < count           
          && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
          && comparator.compare(key, nextKey)
          >= 0) {                                 // but after last seeked
        // do nothing
      } else {
        seekIndex = binarySearch(key);
        if (seekIndex < 0)                        // decode insertion point
          seekIndex = -seekIndex-2;

        if (seekIndex == -1)                      // belongs before first entry
          seekPosition = firstPosition;           // use beginning of file
        else
          seekPosition = positions[seekIndex];    // else use index
      }
      data.seek(seekPosition);

      if (nextKey == null)
        nextKey = comparator.newKey();

      // If we're looking for the key before, we need to keep track
      // of the position we got the current key as well as the position
      // of the key before it.
      long prevPosition = -1;
      long curPosition = seekPosition;

      while (data.next(nextKey)) {
        int c = comparator.compare(key, nextKey);
        if (c <= 0) {                             // at or beyond desired
          if (before && c != 0) {
            if (prevPosition == -1) {
              // We're on the first record of this index block
              // and we've already passed the search key. Therefore
              // we must be at the beginning of the file, so seek
              // to the beginning of this block and return c
              data.seek(curPosition);
            } else {
              // We have a previous record to back up to
              data.seek(prevPosition);
              data.next(nextKey);
              // now that we've rewound, the search key must be greater than this key
              return 1;
            }
          }
          return c;
        }
        if (before) {
          prevPosition = curPosition;
          curPosition = data.getPosition();
        }
      }

      return 1;
    }

    private int binarySearch(WritableComparable key) {
      int low = 0;
      int high = count-1;

      while (low <= high) {
        int mid = (low + high) >>> 1;
        WritableComparable midVal = keys[mid];
        int cmp = comparator.compare(midVal, key);

        if (cmp < 0)
          low = mid + 1;
        else if (cmp > 0)
          high = mid - 1;
        else
          return mid;                             // key found
      }
      return -(low + 1);                          // key not found.
    }

    /** Read the next key/value pair in the map into <code>key</code> and
     * <code>val</code>.  Returns true if such a pair exists and false when at
     * the end of the map */
    public synchronized boolean next(WritableComparable key, Writable val)
      throws IOException {
      return data.next(key, val);
    }//读取下一个键值对

    /** Return the value for the named key, or null if none exists. */
    public synchronized Writable get(WritableComparable key, Writable val)
      throws IOException {
      if (seek(key)) {
        data.getCurrentValue(val);
        return val;
      } else
        return null;
    }//返回指定的key对应的value,没有找到则返回null

    /** 
     * Finds the record that is the closest match to the specified key.
     * Returns <code>key</code> or if it does not exist, at the first entry
     * after the named key.
     * 
-     * @param key       - key that we're trying to find
-     * @param val       - data value if key is found
-     * @return          - the key that was the closest match or null if eof.
     */
    public synchronized WritableComparable getClosest(WritableComparable key,
      Writable val)
    throws IOException {
      return getClosest(key, val, false);
    }//返回与某个key最靠近的key,没有找到就返回null

    /** 
     * Finds the record that is the closest match to the specified key.
     * 
     * @param key       - key that we're trying to find
     * @param val       - data value if key is found
     * @param before    - IF true, and <code>key</code> does not exist, return
     * the first entry that falls just before the <code>key</code>.  Otherwise,
     * return the record that sorts just after.
     * @return          - the key that was the closest match or null if eof.
     */
    public synchronized WritableComparable getClosest(WritableComparable key,
        Writable val, final boolean before)
      throws IOException {

      int c = seekInternal(key, before);

      // If we didn't get an exact match, and we ended up in the wrong
      // direction relative to the query key, return null since we
      // must be at the beginning or end of the file.
      if ((!before && c > 0) ||
          (before && c < 0)) {
        return null;
      }

      data.getCurrentValue(val);
      return nextKey;
    }

    /** Close the map. */
    public synchronized void close() throws IOException {
      if (!indexClosed) {
        index.close();
      }
      data.close();
    }

  }

  /** Renames an existing map directory. */
  public static void rename(FileSystem fs, String oldName, String newName)
    throws IOException {
    Path oldDir = new Path(oldName);
    Path newDir = new Path(newName);
    if (!fs.rename(oldDir, newDir)) {
      throw new IOException("Could not rename " + oldDir + " to " + newDir);
    }
  }//对map目录进行重命名

  /** Deletes the named map file. */
  public static void delete(FileSystem fs, String name) throws IOException {
    Path dir = new Path(name);
    Path data = new Path(dir, DATA_FILE_NAME);
    Path index = new Path(dir, INDEX_FILE_NAME);

    fs.delete(data, true);//删除data
    fs.delete(index, true);//删除idex
    fs.delete(dir, true);//删除目录
  }//删除map目录

  /**
   * This method attempts to fix a corrupt MapFile by re-creating its index.
   * @param fs filesystem
   * @param dir directory containing the MapFile data and index
   * @param keyClass key class (has to be a subclass of Writable)
   * @param valueClass value class (has to be a subclass of Writable)
   * @param dryrun do not perform any changes, just report what needs to be done
   * @return number of valid entries in this MapFile, or -1 if no fixing was needed
   * @throws Exception
   */
  public static long fix(FileSystem fs, Path dir,
                         Class<? extends Writable> keyClass,
                         Class<? extends Writable> valueClass, boolean dryrun,
                         Configuration conf) throws Exception {
    String dr = (dryrun ? "[DRY RUN ] " : "");
    Path data = new Path(dir, DATA_FILE_NAME);
    Path index = new Path(dir, INDEX_FILE_NAME);
    int indexInterval = 128;
    if (!fs.exists(data)) {
      // there's nothing we can do to fix this!
      throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
    }
    if (fs.exists(index)) {
      // no fixing needed
      return -1;
    }
    SequenceFile.Reader dataReader = new SequenceFile.Reader(fs, data, conf);
    if (!dataReader.getKeyClass().equals(keyClass)) {
      throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
                          ", got " + dataReader.getKeyClass().getName());
    }
    if (!dataReader.getValueClass().equals(valueClass)) {
      throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
                          ", got " + dataReader.getValueClass().getName());
    }
    long cnt = 0L;
    Writable key = ReflectionUtils.newInstance(keyClass, conf);
    Writable value = ReflectionUtils.newInstance(valueClass, conf);
    SequenceFile.Writer indexWriter = null;
    if (!dryrun) indexWriter = SequenceFile.createWriter(fs, conf, index, keyClass, LongWritable.class);
    try {
      long pos = 0L;
      LongWritable position = new LongWritable();
      while(dataReader.next(key, value)) {
        cnt++;
        if (cnt % indexInterval == 0) {
          position.set(pos);
          if (!dryrun) indexWriter.append(key, position);
        }
        pos = dataReader.getPosition();
      }
    } catch(Throwable t) {
      // truncated data file. swallow it.
    }
    dataReader.close();
    if (!dryrun) indexWriter.close();
    return cnt;
  }//通过重新创建索引文件来对出错的MapFile文件修复


  public static void main(String[] args) throws Exception {
    String usage = "Usage: MapFile inFile outFile";

    if (args.length != 2) {
      System.err.println(usage);
      System.exit(-1);
    }

    String in = args[0];
    String out = args[1];

    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.getLocal(conf);
    MapFile.Reader reader = new MapFile.Reader(fs, in, conf);
    MapFile.Writer writer =
      new MapFile.Writer(conf, fs, out,
          reader.getKeyClass().asSubclass(WritableComparable.class),
          reader.getValueClass());

    WritableComparable key =
      ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf);
    Writable value =
      ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf);

    while (reader.next(key, value))               // copy all entries
      writer.append(key, value);

    writer.close();
  }

}

猜你喜欢

转载自blog.csdn.net/gaoyang8320/article/details/51164264