hadoop的基本数据类型就是其他语言对应数据类型的封装,这里不再多说。hadoop最重要的就是对文件进行操作,一个文件必须被hadoop序列化读入、存储,才能被接下来的map和reduce过程执行,下面来看文件类型的数据的读取和存储方式
1.SequenceFile文件
SequenceFile存储的是key-value形式的纯文本数据,按照压缩方式分为三种:无压缩,仅压缩value不压缩key以及key和value都压缩(块压缩)。
SequenceFile文件对无压缩的方法是Writer,其私有变量如下
* Configuration conf//是hadoop的配置对象
* FSDataOutputStream out//是文件的输出流
* boolean ownOutputStream //用于表示是否使用自己的输出流
* DataOutputBuffer buffer=new DataOutputBuffer()//输出缓存区
* Class keyClass//是输出key所代表的类型
* Class valueClass//是输出value所代表的类型
* private boolean compress//用于标识是否进行压缩
* CompressionCodec codec//是压缩编码解码器
* CompressionOutputStream deflateFilter//是压缩输出流
* DataOutputStream deflateOut//未压缩的输出流
* Metadata metadata//文件元数据
* Compress compress//文件输出所用的压缩器
* Serializer keySerializer//对于key进行序列化的序列化器
* Serializer uncompressedValSerializer//对于未压缩的value的序列化器
* Serializer compressedValSerializer//对压缩的value进行序列化的序列化器
* long lastSyncPos//最后一个同步位置
* byte[] sync//存储最后一个同步字节
Writer()方法可通过多种不同的参数进行实例化,这里不再细说,请看下面源码部分。还可以通过createWriter()静态方法创建SequenceFile对象,并返SequenceFile.Writer实例,该静态方法有多个重载版本,详见源码部分。
SequenceFile对另外两种压缩形式的处理分别是是RecordCompressWriter和BlockCompressWriter,他们继承自Writer
此外SequenceFile中还有sortr类,是对key-value的一个整理排序,其输出存储在MapFille中。
package org.apache.hadoop.io;
import java.io.*;
import java.util.*;
import java.rmi.server.UID;
import java.security.MessageDigest;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.MergeSort;
import org.apache.hadoop.util.PriorityQueue;
/*
@see CompressionCodec
*/
public class SequenceFile {
private static final Log LOG = LogFactory.getLog(SequenceFile.class);
private SequenceFile() {} // no public ctor
private static final byte BLOCK_COMPRESS_VERSION = (byte)4;//标识压缩块的版本
private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;//标识客户端压缩的版本
private static final byte VERSION_WITH_METADATA = (byte)6;//用于表示带有元数据的版本
private static byte[] VERSION = new byte[] { //标识组成版本的字节数组
(byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
};
private static final int SYNC_ESCAPE = -1; // "length" of sync entries同步的标识
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 同步hash中的字节数
private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash同步标识的大小
/** The number of bytes between sync points.*/
public static final int SYNC_INTERVAL = 100*SYNC_SIZE;//两同步点之间的字节数
/**
* The compression type used to compress key/value pairs in the
* {@link SequenceFile}.
*
* @see SequenceFile.Writer
*/
public static enum CompressionType {
/** Do not compress records. */
NONE, //没有压缩
/** Compress values only, each separately. */
RECORD,//只压缩value,不压缩Key
/** Compress sequences of records together in blocks. */
BLOCK//key和value都压缩
} //枚举型的三种压缩形式
@Deprecated
static public CompressionType getCompressionType(Configuration job) {
String name = job.get("io.seqfile.compression.type");
return name == null ? CompressionType.RECORD :
CompressionType.valueOf(name);
}//返回压缩类型
@Deprecated
static public void setCompressionType(Configuration job,
CompressionType val) {
job.set("io.seqfile.compression.type", val.toString());
}//设置压缩类型
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass)
throws IOException {
return createWriter(fs, conf, name, keyClass, valClass,
getCompressionType(conf));
}
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionType compressionType)
throws IOException {
return createWriter(fs, conf, name, keyClass, valClass,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), fs.getDefaultBlockSize(),
compressionType, new DefaultCodec(), null, new Metadata());
}
SequenceFile Writer.
* @throws IOException
*/
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionType compressionType,
Progressable progress) throws IOException {
return createWriter(fs, conf, name, keyClass, valClass,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), fs.getDefaultBlockSize(),
compressionType, new DefaultCodec(), progress, new Metadata());
}
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
CompressionType compressionType, CompressionCodec codec)
throws IOException {
return createWriter(fs, conf, name, keyClass, valClass,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), fs.getDefaultBlockSize(),
compressionType, codec, null, new Metadata());
}
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
CompressionType compressionType, CompressionCodec codec,
Progressable progress, Metadata metadata) throws IOException {
return createWriter(fs, conf, name, keyClass, valClass,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), fs.getDefaultBlockSize(),
compressionType, codec, progress, metadata);
}
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, int bufferSize,
short replication, long blockSize,
CompressionType compressionType, CompressionCodec codec,
Progressable progress, Metadata metadata) throws IOException {
if ((codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
!ZlibFactory.isNativeZlibLoaded(conf)) {
throw new IllegalArgumentException("SequenceFile doesn't work with " +
"GzipCodec without native-hadoop code!");
}
Writer writer = null;
if (compressionType == CompressionType.NONE) {
writer = new Writer(fs, conf, name, keyClass, valClass,
bufferSize, replication, blockSize,
progress, metadata);
} else if (compressionType == CompressionType.RECORD) {
writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,
bufferSize, replication, blockSize,
codec, progress, metadata);
} else if (compressionType == CompressionType.BLOCK){
writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,
bufferSize, replication, blockSize,
codec, progress, metadata);
}
return writer;
}
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, int bufferSize,
short replication, long blockSize, boolean createParent,
CompressionType compressionType, CompressionCodec codec,
Metadata metadata) throws IOException {
if ((codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
!ZlibFactory.isNativeZlibLoaded(conf)) {
throw new IllegalArgumentException("SequenceFile doesn't work with " +
"GzipCodec without native-hadoop code!");
}
FSDataOutputStream fsos;
if (createParent) {
fsos = fs.create(name, true, bufferSize, replication, blockSize);
} else {
fsos = fs.createNonRecursive(name, true, bufferSize, replication,
blockSize, null);
}
switch (compressionType) {
case NONE:
return new Writer(conf, fsos, keyClass, valClass, metadata).ownStream();
case RECORD:
return new RecordCompressWriter(conf, fsos, keyClass, valClass, codec,
metadata).ownStream();
case BLOCK:
return new BlockCompressWriter(conf, fsos, keyClass, valClass, codec,
metadata).ownStream();
default:
return null;
}
}
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
CompressionType compressionType, CompressionCodec codec,
Progressable progress) throws IOException {
Writer writer = createWriter(fs, conf, name, keyClass, valClass,
compressionType, codec, progress, new Metadata());
return writer;
}
private static Writer
createWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, boolean compress, boolean blockCompress,
CompressionCodec codec, Metadata metadata)
throws IOException {
if (codec != null && (codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
!ZlibFactory.isNativeZlibLoaded(conf)) {
throw new IllegalArgumentException("SequenceFile doesn't work with " +
"GzipCodec without native-hadoop code!");
}
Writer writer = null;
if (!compress) {
writer = new Writer(conf, out, keyClass, valClass, metadata);
} else if (compress && !blockCompress) {
writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
} else {
writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
}
return writer;
}
private static Writer
createWriter(FileSystem fs, Configuration conf, Path file,
Class keyClass, Class valClass,
boolean compress, boolean blockCompress,
CompressionCodec codec, Progressable progress, Metadata metadata)
throws IOException {
if (codec != null && (codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
!ZlibFactory.isNativeZlibLoaded(conf)) {
throw new IllegalArgumentException("SequenceFile doesn't work with " +
"GzipCodec without native-hadoop code!");
}
Writer writer = null;
if (!compress) {
writer = new Writer(fs, conf, file, keyClass, valClass, progress, metadata);
} else if (compress && !blockCompress) {
writer = new RecordCompressWriter(fs, conf, file, keyClass, valClass,
codec, progress, metadata);
} else {
writer = new BlockCompressWriter(fs, conf, file, keyClass, valClass,
codec, progress, metadata);
}
return writer;
}
public static Writer
createWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, CompressionType compressionType,
CompressionCodec codec, Metadata metadata)
throws IOException {
if ((codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
!ZlibFactory.isNativeZlibLoaded(conf)) {
throw new IllegalArgumentException("SequenceFile doesn't work with " +
"GzipCodec without native-hadoop code!");
}
Writer writer = null;
if (compressionType == CompressionType.NONE) {
writer = new Writer(conf, out, keyClass, valClass, metadata);
} else if (compressionType == CompressionType.RECORD) {
writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
} else if (compressionType == CompressionType.BLOCK){
writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
}
return writer;
}
public static Writer
createWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, CompressionType compressionType,
CompressionCodec codec)
throws IOException {
Writer writer = createWriter(conf, out, keyClass, valClass, compressionType,
codec, new Metadata());
return writer;
}
/** The interface to 'raw' values of SequenceFiles. */
public static interface ValueBytes {
/** Writes the uncompressed bytes to the outStream.
* @param outStream : Stream to write uncompressed bytes into.
* @throws IOException
*/
public void writeUncompressedBytes(DataOutputStream outStream)
throws IOException;
public int getSize();
}
private static class UncompressedBytes implements ValueBytes {
private int dataSize;
private byte[] data;
private UncompressedBytes() {
data = null;
dataSize = 0;
}
private void reset(DataInputStream in, int length) throws IOException {
data = new byte[length];
dataSize = -1;
in.readFully(data);
dataSize = data.length;
}
public int getSize() {
return dataSize;
}
public void writeUncompressedBytes(DataOutputStream outStream)
throws IOException {
outStream.write(data, 0, dataSize);
}
public void writeCompressedBytes(DataOutputStream outStream)
throws IllegalArgumentException, IOException {
throw
new IllegalArgumentException("UncompressedBytes cannot be compressed!");
}
} // UncompressedBytes
private static class CompressedBytes implements ValueBytes {
private int dataSize;
private byte[] data;
DataInputBuffer rawData = null;
CompressionCodec codec = null;
CompressionInputStream decompressedStream = null;
private CompressedBytes(CompressionCodec codec) {
data = null;
dataSize = 0;
this.codec = codec;
}
private void reset(DataInputStream in, int length) throws IOException {
data = new byte[length];
dataSize = -1;
in.readFully(data);
dataSize = data.length;
}
public int getSize() {
return dataSize;
}
public void writeUncompressedBytes(DataOutputStream outStream)
throws IOException {
if (decompressedStream == null) {
rawData = new DataInputBuffer();
decompressedStream = codec.createInputStream(rawData);
} else {
decompressedStream.resetState();
}
rawData.reset(data, 0, dataSize);
byte[] buffer = new byte[8192];
int bytesRead = 0;
while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
outStream.write(buffer, 0, bytesRead);
}
}
public void writeCompressedBytes(DataOutputStream outStream)
throws IllegalArgumentException, IOException {
outStream.write(data, 0, dataSize);
}
} // CompressedBytes
/**
* The class encapsulating with the metadata of a file.
* The metadata of a file is a list of attribute name/value
* pairs of Text type.
*
*/
public static class Metadata implements Writable {
private TreeMap<Text, Text> theMetadata;//存放元数据的map,map的key-value都是Text
public Metadata() {
this(new TreeMap<Text, Text>());
}
public Metadata(TreeMap<Text, Text> arg) {
if (arg == null) {
this.theMetadata = new TreeMap<Text, Text>();
} else {
this.theMetadata = arg;
}
}//以上为构造函数
public Text get(Text name) {
return this.theMetadata.get(name);
}//get方法用于根据name获取元数据的信息
public void set(Text name, Text value) {
this.theMetadata.put(name, value);
}//set方法用于设置元数据信息
public TreeMap<Text, Text> getMetadata() {
return new TreeMap<Text, Text>(this.theMetadata);
}//该方法用于取得所有的元数据信息
public void write(DataOutput out) throws IOException {
out.writeInt(this.theMetadata.size());//元数据map的大小
Iterator<Map.Entry<Text, Text>> iter =
this.theMetadata.entrySet().iterator();
while (iter.hasNext()) { //输出所有的key-value
Map.Entry<Text, Text> en = iter.next();
en.getKey().write(out);
en.getValue().write(out);
}
}//将元数据序列化
public void readFields(DataInput in) throws IOException {
int sz = in.readInt();//输入数据的大小
if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
this.theMetadata = new TreeMap<Text, Text>();
for (int i = 0; i < sz; i++) {
Text key = new Text();
Text val = new Text();
key.readFields(in);
val.readFields(in);
this.theMetadata.put(key, val);
}
}//将元数据反序列化为对象
public boolean equals(Metadata other) {
if (other == null) return false;
if (this.theMetadata.size() != other.theMetadata.size()) {
return false;
}
Iterator<Map.Entry<Text, Text>> iter1 =
this.theMetadata.entrySet().iterator();
Iterator<Map.Entry<Text, Text>> iter2 =
other.theMetadata.entrySet().iterator();
while (iter1.hasNext() && iter2.hasNext()) {
Map.Entry<Text, Text> en1 = iter1.next();
Map.Entry<Text, Text> en2 = iter2.next();
if (!en1.getKey().equals(en2.getKey())) {
return false;
}
if (!en1.getValue().equals(en2.getValue())) {
return false;
}
}
if (iter1.hasNext() || iter2.hasNext()) {
return false;
}
return true;
}//判断是否相等
public int hashCode() {
assert false : "hashCode not designed";
return 42; // any arbitrary constant will do
}//得到hashcode
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("size: ").append(this.theMetadata.size()).append("\n");
Iterator<Map.Entry<Text, Text>> iter =
this.theMetadata.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Text, Text> en = iter.next();
sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
sb.append("\n");
}
return sb.toString();
}
}//转化为字符串输出
/** Write key/value pairs to a sequence-format file. */
/*Write是用于读取不压缩的key和value的SequenceFile
* Configuration conf是hadoop的配置对象
* FSDataOutputStream out是文件的输出流
* boolean ownOutputStream 用于表示是否使用自己的输出流
* DataOutputBuffer buffer=new DataOutputBuffer()输出缓存区
* Class keyClass是输出key所代表的类型
* Class valueClass是输出value所代表的类型
* private boolean compress用于标识是否进行压缩
* CompressionCodec codec是压缩编码解码器
* CompressionOutputStream deflateFilter是压缩输出流
* DataOutputStream deflateOut未压缩的输出流
* Metadata metadata文件元数据
* Compress compress文件输出所用的压缩器
* Serializer keySerializer对于key进行序列化的序列化器
* Serializer uncompressedValSerializer对于未压缩的value的序列化器
* Serializer compressedValSerializer对压缩的value进行序列化的序列化器
* long lastSyncPos最后一个同步位置
* byte[] sync存储最后一个同步字节
* */
public static class Writer implements java.io.Closeable {
Configuration conf;//hadoop的配置对象
FSDataOutputStream out;//文件的输出流
boolean ownOutputStream = true;//用于表示是否使用自己的输出流
DataOutputBuffer buffer = new DataOutputBuffer();//输出缓存区
Class keyClass;//是输出key所代表的类型
Class valClass;//是输出value所代表的类型
private boolean compress;//用于标识是否进行压缩
CompressionCodec codec = null;//是压缩编码解码器
CompressionOutputStream deflateFilter = null;//是压缩输出流
DataOutputStream deflateOut = null;//未压缩的输出流
Metadata metadata = null;//文件元数据
Compressor compressor = null;//文件输出所用的压缩器
protected Serializer keySerializer;//对于key进行序列化的序列化器
protected Serializer uncompressedValSerializer;//对于未压缩的value的序列化器
protected Serializer compressedValSerializer;//对压缩的value进行序列化的序列化器
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
// starts and ends by scanning for this value.
long lastSyncPos; // position of last sync最后一个同步位置
byte[] sync; // 16 random bytes存储最后一个16位的同步字符
{
try {
MessageDigest digester = MessageDigest.getInstance("MD5");
long time = System.currentTimeMillis();
digester.update((new UID()+"@"+time).getBytes());
sync = digester.digest();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/** Implicit constructor: needed for the period of transition!*/
Writer()
{}
/** Create the named file. */
public Writer(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass)
throws IOException {
this(fs, conf, name, keyClass, valClass, null, new Metadata());
}
/** Create the named file with write-progress reporter. */
public Writer(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
Progressable progress, Metadata metadata)
throws IOException {
this(fs, conf, name, keyClass, valClass,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), fs.getDefaultBlockSize(),
progress, metadata);
}
/** Create the named file with write-progress reporter. */
public Writer(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
int bufferSize, short replication, long blockSize,
Progressable progress, Metadata metadata)
throws IOException {
init(name, conf,
fs.create(name, true, bufferSize, replication, blockSize, progress),
keyClass, valClass, false, null, metadata);
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
}
/** Write to an arbitrary stream using a specified buffer size. */
Writer(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, Metadata metadata)
throws IOException {
this.ownOutputStream = false;
init(null, conf, out, keyClass, valClass, false, null, metadata);
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
} //以上为构造函数
/** Write the initial part of file header. */
void initializeFileHeader()
throws IOException{
out.write(VERSION);
}//向文件中写入文件头的初始化部分,即版本信息
/** Write the final part of file header. */
void finalizeFileHeader()
throws IOException{
out.write(sync); // write the sync bytes写入同步字节
out.flush(); // flush header写入文件header部分
}//向文件写入文件头最后部分
boolean isCompressed() { return compress; }//是否压缩,返回文件压缩的标志compress
boolean isBlockCompressed() { return false; }//返回是否可以对文件进行块压缩,默认false
Writer ownStream() { this.ownOutputStream = true; return this; }
/** Write and flush the file header. */
void writeFileHeader()
throws IOException {
Text.writeString(out, keyClass.getName());//写入key
Text.writeString(out, valClass.getName());//写入value
out.writeBoolean(this.isCompressed());//写入是否可以压缩
out.writeBoolean(this.isBlockCompressed());//写入是否可以进行块压缩
if (this.isCompressed()) {
Text.writeString(out, (codec.getClass()).getName());
//如果可以压缩则写入相应的压缩编码解码器
}
this.metadata.write(out);//写入元数据
}//向文件中写入文件头的内容部分
/** Initialize. */
@SuppressWarnings("unchecked")
void init(Path name, Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass,
boolean compress, CompressionCodec codec, Metadata metadata)
throws IOException {
this.conf = conf;
this.out = out;
this.keyClass = keyClass;
this.valClass = valClass;
this.compress = compress;
this.codec = codec;
this.metadata = metadata;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keySerializer = serializationFactory.getSerializer(keyClass);
this.keySerializer.open(buffer);
this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
this.uncompressedValSerializer.open(buffer);
if (this.codec != null) {
ReflectionUtils.setConf(this.codec, this.conf);
this.compressor = CodecPool.getCompressor(this.codec);
this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
this.deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter));
this.compressedValSerializer = serializationFactory.getSerializer(valClass);
this.compressedValSerializer.open(deflateOut);
}
}//初始化函数
/** Returns the class of keys in this file. */
public Class getKeyClass() { return keyClass; } //返回key的类型
/** Returns the class of values in this file. */
public Class getValueClass() { return valClass; }//返回value的类型
/** Returns the compression codec of data in this file. */
public CompressionCodec getCompressionCodec() { return codec; }//返回压缩编码解码器
/** create a sync point */
public void sync() throws IOException {
if (sync != null && lastSyncPos != out.getPos()) {
out.writeInt(SYNC_ESCAPE); // mark the start of the sync标记同步的开始
out.write(sync); // write sync写入同步字节
lastSyncPos = out.getPos(); // update lastSyncPos更新最后同步的位置
}
}//创建同步点
/** flush all currently written data to the file system */
public void syncFs() throws IOException {
if (out != null) {
out.sync(); // flush contents to file system
}
}//同步信息写入到文件
/** Returns the configuration of this file. */
Configuration getConf() { return conf; }//返回配置信息
/** Close the file. */
public synchronized void close() throws IOException {
keySerializer.close();//关闭key序列化器
uncompressedValSerializer.close();//关闭未压缩的value的序列器
if (compressedValSerializer != null) {
compressedValSerializer.close();//关闭压缩的value的序列化器
}
CodecPool.returnCompressor(compressor);//将压缩器放回池中
compressor = null;
if (out != null) {
// Close the underlying stream iff we own it...
//如果我们拥有输出流则关闭它
if (ownOutputStream) {
out.close();
} else {
out.flush();//同步输出流内容
}
out = null;
}
}//关闭对文件的写入
synchronized void checkAndWriteSync() throws IOException {
if (sync != null &&
out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
sync();
}
}//检查是否写入同步字节数组标识
/** Append a key/value pair.
* 向文件的末尾追加key和value对 */
public synchronized void append(Writable key, Writable val)
throws IOException {
append((Object) key, (Object) val);
}
/** Append a key/value pair. */
@SuppressWarnings("unchecked")
public synchronized void append(Object key, Object val)
throws IOException {
if (key.getClass() != keyClass)
throw new IOException("wrong key class: "+key.getClass().getName()
+" is not "+keyClass);
if (val.getClass() != valClass)
throw new IOException("wrong value class: "+val.getClass().getName()
+" is not "+valClass);
//只能向文件中追加具有相同key和value相同类型的键值对
buffer.reset();//重置buffer
// Append the 'key'追加key值
keySerializer.serialize(key);
int keyLength = buffer.getLength();
if (keyLength < 0)
throw new IOException("negative length keys not allowed: " + key);
// Append the 'value'追加value
if (compress) { //如果可以压缩则进行压缩
deflateFilter.resetState();
compressedValSerializer.serialize(val);
deflateOut.flush();
deflateFilter.finish();
} else {
uncompressedValSerializer.serialize(val);//不对value进行压缩
}
// Write the record out写入文件中
checkAndWriteSync(); // sync写入同步标志
out.writeInt(buffer.getLength()); // total record length记录的总长度
out.writeInt(keyLength); // key portion length key的长度
out.write(buffer.getData(), 0, buffer.getLength()); // data写入数据
}//向文件的末尾追加key和value对
public synchronized void appendRaw(byte[] keyData, int keyOffset,
int keyLength, ValueBytes val) throws IOException {
if (keyLength < 0)
throw new IOException("negative length keys not allowed: " + keyLength);
int valLength = val.getSize();
checkAndWriteSync();
out.writeInt(keyLength+valLength); // total record length
out.writeInt(keyLength); // key portion length
out.write(keyData, keyOffset, keyLength); // key
val.writeUncompressedBytes(out); // value
}
/** Returns the current length of the output file.
*
* <p>This always returns a synchronized position. In other words,
* immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
* returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However
* the key may be earlier in the file than key last written when this
* method was called (e.g., with block-compression, it may be the first key
* in the block that was being written when this method was called).
*/
public synchronized long getLength() throws IOException {
return out.getPos();
}//返回当前输出文件的长度,即同步位置
} // class Writer
/** Write key/compressed-value pairs to a sequence-format file. */
/*RecordCompressWriter继承自writer,是对key和压缩的value操作的*/
static class RecordCompressWriter extends Writer {
/** Create the named file. */
public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionCodec codec)
throws IOException {
this(conf, fs.create(name), keyClass, valClass, codec, new Metadata());
}
/** Create the named file with write-progress reporter. */
public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionCodec codec,
Progressable progress, Metadata metadata)
throws IOException {
this(fs, conf, name, keyClass, valClass,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
progress, metadata);
}
/** Create the named file with write-progress reporter. */
public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
int bufferSize, short replication, long blockSize,
CompressionCodec codec,
Progressable progress, Metadata metadata)
throws IOException {
super.init(name, conf,
fs.create(name, true, bufferSize, replication, blockSize, progress),
keyClass, valClass, true, codec, metadata);
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
}
/** Create the named file with write-progress reporter. */
public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionCodec codec,
Progressable progress)
throws IOException {
this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
}
/** Write to an arbitrary stream using a specified buffer size. */
RecordCompressWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
throws IOException {
this.ownOutputStream = false;
super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
}
boolean isCompressed() { return true; }
boolean isBlockCompressed() { return false; }
/** Append a key/value pair. */
@SuppressWarnings("unchecked")
public synchronized void append(Object key, Object val)
throws IOException {
if (key.getClass() != keyClass)
throw new IOException("wrong key class: "+key.getClass().getName()
+" is not "+keyClass);
if (val.getClass() != valClass)
throw new IOException("wrong value class: "+val.getClass().getName()
+" is not "+valClass);
buffer.reset();
// Append the 'key'
keySerializer.serialize(key);
int keyLength = buffer.getLength();
if (keyLength < 0)
throw new IOException("negative length keys not allowed: " + key);
// Compress 'value' and append it
deflateFilter.resetState();
compressedValSerializer.serialize(val);
deflateOut.flush();
deflateFilter.finish();
// Write the record out
checkAndWriteSync(); // sync
out.writeInt(buffer.getLength()); // total record length
out.writeInt(keyLength); // key portion length
out.write(buffer.getData(), 0, buffer.getLength()); // data
}
/** Append a key/value pair. */
public synchronized void appendRaw(byte[] keyData, int keyOffset,
int keyLength, ValueBytes val) throws IOException {
if (keyLength < 0)
throw new IOException("negative length keys not allowed: " + keyLength);
int valLength = val.getSize();
checkAndWriteSync(); // sync
out.writeInt(keyLength+valLength); // total record length
out.writeInt(keyLength); // key portion length
out.write(keyData, keyOffset, keyLength); // 'key' data
val.writeCompressedBytes(out); // 'value' data
}
} // RecordCompressionWriter/*RecordCompressWriter继承自writer,是对key和压缩的value操作的*/
/** Write compressed key/value blocks to a sequence-format file. */
/*BlockCompressWriter继承自writer,是对压缩的key和value操作的*/
static class BlockCompressWriter extends Writer {
private int noBufferedRecords = 0;
private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();//压缩的key长度的缓冲区
private DataOutputBuffer keyBuffer = new DataOutputBuffer();//压缩的key缓冲区
private DataOutputBuffer valLenBuffer = new DataOutputBuffer();//压缩的value长度的缓冲区
private DataOutputBuffer valBuffer = new DataOutputBuffer();//压缩的value缓冲区
private int compressionBlockSize;//压缩的块大小
/** Create the named file. */
public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionCodec codec)
throws IOException {
this(fs, conf, name, keyClass, valClass,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
null, new Metadata());
}
/** Create the named file with write-progress reporter. */
public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionCodec codec,
Progressable progress, Metadata metadata)
throws IOException {
this(fs, conf, name, keyClass, valClass,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
progress, metadata);
}
/** Create the named file with write-progress reporter. */
public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
int bufferSize, short replication, long blockSize,
CompressionCodec codec,
Progressable progress, Metadata metadata)
throws IOException {
super.init(name, conf,
fs.create(name, true, bufferSize, replication, blockSize, progress),
keyClass, valClass, true, codec, metadata);
init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
}
/** Create the named file with write-progress reporter. */
public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionCodec codec,
Progressable progress)
throws IOException {
this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
}
/** Write to an arbitrary stream using a specified buffer size. */
BlockCompressWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
throws IOException {
this.ownOutputStream = false;
super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
}
boolean isCompressed() { return true; }//是否压缩,默认为true
boolean isBlockCompressed() { return true; }//是否可以块压缩,默认为true
/** Initialize */
void init(int compressionBlockSize) throws IOException {
this.compressionBlockSize = compressionBlockSize;
keySerializer.close();
keySerializer.open(keyBuffer);
uncompressedValSerializer.close();
uncompressedValSerializer.open(valBuffer);
}//初始化的函数
/** Workhorse to check and write out compressed data/lengths */
private synchronized
void writeBuffer(DataOutputBuffer uncompressedDataBuffer)
throws IOException {
deflateFilter.resetState();
buffer.reset();
deflateOut.write(uncompressedDataBuffer.getData(), 0,
uncompressedDataBuffer.getLength());
deflateOut.flush();
deflateFilter.finish();
WritableUtils.writeVInt(out, buffer.getLength());
out.write(buffer.getData(), 0, buffer.getLength());
}//检查输出压缩的数据和长度
/** Compress and flush contents to dfs */
public synchronized void sync() throws IOException {
if (noBufferedRecords > 0) {
super.sync();
// No. of records
WritableUtils.writeVInt(out, noBufferedRecords);
// Write 'keys' and lengths
writeBuffer(keyLenBuffer);
writeBuffer(keyBuffer);
// Write 'values' and lengths
writeBuffer(valLenBuffer);
writeBuffer(valBuffer);
// Flush the file-stream
out.flush();
// Reset internal states
keyLenBuffer.reset();
keyBuffer.reset();
valLenBuffer.reset();
valBuffer.reset();
noBufferedRecords = 0;
}
}//压缩并且更新同步信息到文件
/** Close the file. */
public synchronized void close() throws IOException {
if (out != null) {
sync();
}
super.close();
}//关闭文件
/** Append a key/value pair. */
@SuppressWarnings("unchecked")
public synchronized void append(Object key, Object val)
throws IOException {
if (key.getClass() != keyClass)
throw new IOException("wrong key class: "+key+" is not "+keyClass);
if (val.getClass() != valClass)
throw new IOException("wrong value class: "+val+" is not "+valClass);
// Save key/value into respective buffers
int oldKeyLength = keyBuffer.getLength();
keySerializer.serialize(key);
int keyLength = keyBuffer.getLength() - oldKeyLength;
if (keyLength < 0)
throw new IOException("negative length keys not allowed: " + key);
WritableUtils.writeVInt(keyLenBuffer, keyLength);
int oldValLength = valBuffer.getLength();
uncompressedValSerializer.serialize(val);
int valLength = valBuffer.getLength() - oldValLength;
WritableUtils.writeVInt(valLenBuffer, valLength);
// Added another key/value pair
++noBufferedRecords;
// Compress and flush?
int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
if (currentBlockSize >= compressionBlockSize) {
sync();
}
}
/** Append a key/value pair. */
public synchronized void appendRaw(byte[] keyData, int keyOffset,
int keyLength, ValueBytes val) throws IOException {
if (keyLength < 0)
throw new IOException("negative length keys not allowed");
int valLength = val.getSize();
// Save key/value data in relevant buffers
WritableUtils.writeVInt(keyLenBuffer, keyLength);
keyBuffer.write(keyData, keyOffset, keyLength);
WritableUtils.writeVInt(valLenBuffer, valLength);
val.writeUncompressedBytes(valBuffer);
// Added another key/value pair
++noBufferedRecords;
// Compress and flush?
int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
if (currentBlockSize >= compressionBlockSize) {
sync();
}
}
} // BlockCompressionWriter
/** Reads key/value pairs from a sequence-format file. */
public static class Reader implements java.io.Closeable {
private Path file; //文件的路径
private FSDataInputStream in;//输入流
private DataOutputBuffer outBuf = new DataOutputBuffer();//缓存
private byte version;
private String keyClassName;
private String valClassName;
private Class keyClass;
private Class valClass;
private CompressionCodec codec = null;
private Metadata metadata = null;
private byte[] sync = new byte[SYNC_HASH_SIZE];
private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
private boolean syncSeen;
private long end;
private int keyLength;
private int recordLength;
private boolean decompress;
private boolean blockCompressed;
private Configuration conf;
private int noBufferedRecords = 0;
private boolean lazyDecompress = true;
private boolean valuesDecompressed = true;
private int noBufferedKeys = 0;
private int noBufferedValues = 0;
private DataInputBuffer keyLenBuffer = null;
private CompressionInputStream keyLenInFilter = null;
private DataInputStream keyLenIn = null;
private Decompressor keyLenDecompressor = null;
private DataInputBuffer keyBuffer = null;
private CompressionInputStream keyInFilter = null;
private DataInputStream keyIn = null;
private Decompressor keyDecompressor = null;
private DataInputBuffer valLenBuffer = null;
private CompressionInputStream valLenInFilter = null;
private DataInputStream valLenIn = null;
private Decompressor valLenDecompressor = null;
private DataInputBuffer valBuffer = null;
private CompressionInputStream valInFilter = null;
private DataInputStream valIn = null;
private Decompressor valDecompressor = null;
private Deserializer keyDeserializer;
private Deserializer valDeserializer;
/** Open the named file. */
public Reader(FileSystem fs, Path file, Configuration conf)
throws IOException {
this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
}
private Reader(FileSystem fs, Path file, int bufferSize,
Configuration conf, boolean tempReader) throws IOException {
this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader);
}
private Reader(FileSystem fs, Path file, int bufferSize, long start,
long length, Configuration conf, boolean tempReader)
throws IOException {
this.file = file;
this.in = openFile(fs, file, bufferSize, length);
this.conf = conf;
seek(start);
this.end = in.getPos() + length;
init(tempReader);
}
/**
* Override this method to specialize the type of
* {@link FSDataInputStream} returned.
*/
protected FSDataInputStream openFile(FileSystem fs, Path file,
int bufferSize, long length) throws IOException {
return fs.open(file, bufferSize);
}//打开SequenceFile文件
/**
* Initialize the {@link Reader}
* @param tmpReader <code>true</code> if we are constructing a temporary
* reader {@link SequenceFile.Sorter.cloneFileAttributes},
* and hence do not initialize every component;
* <code>false</code> otherwise.
* @throws IOException
*/
private void init(boolean tempReader) throws IOException {
byte[] versionBlock = new byte[VERSION.length];
in.readFully(versionBlock);
if ((versionBlock[0] != VERSION[0]) ||
(versionBlock[1] != VERSION[1]) ||
(versionBlock[2] != VERSION[2]))
throw new IOException(file + " not a SequenceFile");
// Set 'version'
version = versionBlock[3];
if (version > VERSION[3])
throw new VersionMismatchException(VERSION[3], version);
if (version < BLOCK_COMPRESS_VERSION) {
UTF8 className = new UTF8();
className.readFields(in);
keyClassName = className.toStringChecked(); // key class name
className.readFields(in);
valClassName = className.toStringChecked(); // val class name
} else {
keyClassName = Text.readString(in);
valClassName = Text.readString(in);
}
if (version > 2) { // if version > 2
this.decompress = in.readBoolean(); // is compressed?
} else {
decompress = false;
}
if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4
this.blockCompressed = in.readBoolean(); // is block-compressed?
} else {
blockCompressed = false;
}
// if version >= 5
// setup the compression codec
if (decompress) {
if (version >= CUSTOM_COMPRESS_VERSION) {
String codecClassname = Text.readString(in);
try {
Class<? extends CompressionCodec> codecClass
= conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
this.codec = ReflectionUtils.newInstance(codecClass, conf);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("Unknown codec: " +
codecClassname, cnfe);
}
} else {
codec = new DefaultCodec();
((Configurable)codec).setConf(conf);
}
}
this.metadata = new Metadata();
if (version >= VERSION_WITH_METADATA) { // if version >= 6
this.metadata.readFields(in);
}
if (version > 1) { // if version > 1
in.readFully(sync); // read sync bytes
}
// Initialize... *not* if this we are constructing a temporary Reader
if (!tempReader) {
valBuffer = new DataInputBuffer();
if (decompress) {
valDecompressor = CodecPool.getDecompressor(codec);
valInFilter = codec.createInputStream(valBuffer, valDecompressor);
valIn = new DataInputStream(valInFilter);
} else {
valIn = valBuffer;
}
if (blockCompressed) {
keyLenBuffer = new DataInputBuffer();
keyBuffer = new DataInputBuffer();
valLenBuffer = new DataInputBuffer();
keyLenDecompressor = CodecPool.getDecompressor(codec);
keyLenInFilter = codec.createInputStream(keyLenBuffer,
keyLenDecompressor);
keyLenIn = new DataInputStream(keyLenInFilter);
keyDecompressor = CodecPool.getDecompressor(codec);
keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
keyIn = new DataInputStream(keyInFilter);
valLenDecompressor = CodecPool.getDecompressor(codec);
valLenInFilter = codec.createInputStream(valLenBuffer,
valLenDecompressor);
valLenIn = new DataInputStream(valLenInFilter);
}
SerializationFactory serializationFactory =
new SerializationFactory(conf);
this.keyDeserializer =
getDeserializer(serializationFactory, getKeyClass());
if (!blockCompressed) {
this.keyDeserializer.open(valBuffer);
} else {
this.keyDeserializer.open(keyIn);
}
this.valDeserializer =
getDeserializer(serializationFactory, getValueClass());
this.valDeserializer.open(valIn);
}
}
@SuppressWarnings("unchecked")
private Deserializer getDeserializer(SerializationFactory sf, Class c) {
return sf.getDeserializer(c);
}
/** Close the file. */
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(keyLenDecompressor);
CodecPool.returnDecompressor(keyDecompressor);
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
if (keyDeserializer != null) {
keyDeserializer.close();
}
if (valDeserializer != null) {
valDeserializer.close();
}
// Close the input-stream
in.close();
}//关闭SequenceFile文件
/** Returns the name of the key class. */
public String getKeyClassName() {
return keyClassName;
}//返回key的名字?
/** Returns the class of keys in this file. */
public synchronized Class<?> getKeyClass() {
if (null == keyClass) {
try {
keyClass = WritableName.getClass(getKeyClassName(), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return keyClass;
}//返回key的类型
/** Returns the name of the value class. */
public String getValueClassName() {
return valClassName;
}//返回value的名字?
/** Returns the class of values in this file. */
public synchronized Class<?> getValueClass() {
if (null == valClass) {
try {
valClass = WritableName.getClass(getValueClassName(), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return valClass;
}//返回value的类型
/** Returns true if values are compressed. */
public boolean isCompressed() { return decompress; }//是否value压缩
/** Returns true if records are block-compressed. */
public boolean isBlockCompressed() { return blockCompressed; }//是否块压缩
/** Returns the compression codec of data in this file. */
public CompressionCodec getCompressionCodec() { return codec; }//返回压缩编码的解码器
/** Returns the metadata object of the file */
public Metadata getMetadata() {
return this.metadata;
}//返回元数据
/** Returns the configuration used for this file. */
Configuration getConf() { return conf; }//返回配置信息
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();//将数据读到临时缓冲区
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream将缓冲区与输入流连起来
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}//读取被压缩的缓冲区
/** Read the next 'compressed' block */
private synchronized void readBlock() throws IOException {
// Check if we need to throw away a whole block of
// 'values' due to 'lazy decompression'
/*判断是否因为延迟解压缩而舍弃一个块*/
if (lazyDecompress && !valuesDecompressed) {
in.seek(WritableUtils.readVInt(in)+in.getPos());
in.seek(WritableUtils.readVInt(in)+in.getPos());
}
// Reset internal states重置内部状态
noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
valuesDecompressed = false;
//Process sync处理同步过程
if (sync != null) {
in.readInt();
in.readFully(syncCheck); // read syncCheck
if (!Arrays.equals(sync, syncCheck)) // check it
throw new IOException("File is corrupt!");
}
syncSeen = true;
// Read number of records in this block读取块中的记录的数量
noBufferedRecords = WritableUtils.readVInt(in);
// Read key lengths and keys读取key的长度和key
readBuffer(keyLenBuffer, keyLenInFilter);
readBuffer(keyBuffer, keyInFilter);
noBufferedKeys = noBufferedRecords;
// Read value lengths and values
if (!lazyDecompress) {
readBuffer(valLenBuffer, valLenInFilter);
readBuffer(valBuffer, valInFilter);
noBufferedValues = noBufferedRecords;
valuesDecompressed = true;
}
}//读取下一个压缩的块
/**
* Position valLenIn/valIn to the 'value'
* corresponding to the 'current' key
*/
private synchronized void seekToCurrentValue() throws IOException {
if (!blockCompressed) {
if (decompress) {
valInFilter.resetState();
}
valBuffer.reset();
} else {
// Check if this is the first value in the 'block' to be read
if (lazyDecompress && !valuesDecompressed) {
// Read the value lengths and values
readBuffer(valLenBuffer, valLenInFilter);
readBuffer(valBuffer, valInFilter);
noBufferedValues = noBufferedRecords;
valuesDecompressed = true;
}
// Calculate the no. of bytes to skip
// Note: 'current' key has already been read!
int skipValBytes = 0;
int currentKey = noBufferedKeys + 1;
for (int i=noBufferedValues; i > currentKey; --i) {
skipValBytes += WritableUtils.readVInt(valLenIn);
--noBufferedValues;
}
// Skip to the 'val' corresponding to 'current' key
if (skipValBytes > 0) {
if (valIn.skipBytes(skipValBytes) != skipValBytes) {
throw new IOException("Failed to seek to " + currentKey +
"(th) value!");
}
}
}
}//定位到当前的value
/**
* Get the 'value' corresponding to the last read 'key'.
* @param val : The 'value' to be read.
* @throws IOException
*/
public synchronized void getCurrentValue(Writable val)
throws IOException {
if (val instanceof Configurable) {
((Configurable) val).setConf(this.conf);
}
// Position stream to 'current' value
seekToCurrentValue();
if (!blockCompressed) {
val.readFields(valIn);
if (valIn.read() > 0) {
LOG.info("available bytes: " + valIn.available());
throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
+ " bytes, should read " +
(valBuffer.getLength()-keyLength));
}
} else {
// Get the value
int valLength = WritableUtils.readVInt(valLenIn);
val.readFields(valIn);
// Read another compressed 'value'
--noBufferedValues;
// Sanity check
if (valLength < 0) {
LOG.debug(val + " is a zero-length value");
}
}
}//得到读取的最后一个key对应的的value,未反序列化
/**
* Get the 'value' corresponding to the last read 'key'.
* @param val : The 'value' to be read.
* @throws IOException
*/
public synchronized Object getCurrentValue(Object val)
throws IOException {
if (val instanceof Configurable) {
((Configurable) val).setConf(this.conf);
}
// Position stream to 'current' value
seekToCurrentValue();
if (!blockCompressed) {
val = deserializeValue(val);
if (valIn.read() > 0) {
LOG.info("available bytes: " + valIn.available());
throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
+ " bytes, should read " +
(valBuffer.getLength()-keyLength));
}
} else {
// Get the value
int valLength = WritableUtils.readVInt(valLenIn);
val = deserializeValue(val);
// Read another compressed 'value'
--noBufferedValues;
// Sanity check
if (valLength < 0) {
LOG.debug(val + " is a zero-length value");
}
}
return val;
}
@SuppressWarnings("unchecked")
private Object deserializeValue(Object val) throws IOException {
return valDeserializer.deserialize(val);
}//反序列化value
/** Read the next key in the file into <code>key</code>, skipping its
* value. True if another entry exists, and false at end of file. */
public synchronized boolean next(Writable key) throws IOException {
if (key.getClass() != getKeyClass())
throw new IOException("wrong key class: "+key.getClass().getName()
+" is not "+keyClass);
if (!blockCompressed) {
outBuf.reset();
keyLength = next(outBuf);
if (keyLength < 0)
return false;
valBuffer.reset(outBuf.getData(), outBuf.getLength());
key.readFields(valBuffer);
valBuffer.mark(0);
if (valBuffer.getPosition() != keyLength)
throw new IOException(key + " read " + valBuffer.getPosition()
+ " bytes, should read " + keyLength);
} else {
//Reset syncSeen
syncSeen = false;
if (noBufferedKeys == 0) {
try {
readBlock();
} catch (EOFException eof) {
return false;
}
}
int keyLength = WritableUtils.readVInt(keyLenIn);
// Sanity check
if (keyLength < 0) {
return false;
}
//Read another compressed 'key'
key.readFields(keyIn);
--noBufferedKeys;
}
return true;
}//读取下一个key,并且跳过他的value,如果存在则返回1,当到达末尾时则返回0
/** Read the next key/value pair in the file into <code>key</code> and
* <code>val</code>. Returns true if such a pair exists and false when at
* end of file */
public synchronized boolean next(Writable key, Writable val)
throws IOException {
if (val.getClass() != getValueClass())
throw new IOException("wrong value class: "+val+" is not "+valClass);
boolean more = next(key);
if (more) {
getCurrentValue(val);
}
return more;
}//读取文件中的key和value,存在返回true,到达文件末尾返回false
/**
* Read and return the next record length, potentially skipping over
* a sync block.
* @return the length of the next record or -1 if there is no next record
* @throws IOException
*/
private synchronized int readRecordLength() throws IOException {
if (in.getPos() >= end) {
return -1;
}
int length = in.readInt();
if (version > 1 && sync != null &&
length == SYNC_ESCAPE) { // process a sync entry
in.readFully(syncCheck); // read syncCheck
if (!Arrays.equals(sync, syncCheck)) // check it
throw new IOException("File is corrupt!");
syncSeen = true;
if (in.getPos() >= end) {
return -1;
}
length = in.readInt(); // re-read length
} else {
syncSeen = false;
}
return length;
}//读取记录的长度
/** Read the next key/value pair in the file into <code>buffer</code>.
* Returns the length of the key read, or -1 if at end of file. The length
* of the value may be computed by calling buffer.getLength() before and
* after calls to this method. */
/** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
public synchronized int next(DataOutputBuffer buffer) throws IOException {
// Unsupported for block-compressed sequence files
if (blockCompressed) {
throw new IOException("Unsupported call for block-compressed" +
" SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
}
try {
int length = readRecordLength();
if (length == -1) {
return -1;
}
int keyLength = in.readInt();
buffer.write(in, length);
return keyLength;
} catch (ChecksumException e) { // checksum failure
handleChecksumException(e);
return next(buffer);
}
}//从文件中读取下一个key-value到buffer中
public ValueBytes createValueBytes() {
ValueBytes val = null;
if (!decompress || blockCompressed) {
val = new UncompressedBytes();
} else {
val = new CompressedBytes(codec);
}
return val;
}//将val写到字节数组中
/**
* Read 'raw' records.
* @param key - The buffer into which the key is read
* @param val - The 'raw' value
* @return Returns the total record length or -1 for end of file
* @throws IOException
*/
public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val)
throws IOException {
if (!blockCompressed) {
int length = readRecordLength();
if (length == -1) {
return -1;
}
int keyLength = in.readInt();
int valLength = length - keyLength;
key.write(in, keyLength);
if (decompress) {
CompressedBytes value = (CompressedBytes)val;
value.reset(in, valLength);
} else {
UncompressedBytes value = (UncompressedBytes)val;
value.reset(in, valLength);
}
return length;
} else {
//Reset syncSeen
syncSeen = false;
// Read 'key'
if (noBufferedKeys == 0) {
if (in.getPos() >= end)
return -1;
try {
readBlock();
} catch (EOFException eof) {
return -1;
}
}
int keyLength = WritableUtils.readVInt(keyLenIn);
if (keyLength < 0) {
throw new IOException("zero length key found!");
}
key.write(keyIn, keyLength);
--noBufferedKeys;
// Read raw 'value'
seekToCurrentValue();
int valLength = WritableUtils.readVInt(valLenIn);
UncompressedBytes rawValue = (UncompressedBytes)val;
rawValue.reset(valIn, valLength);
--noBufferedValues;
return (keyLength+valLength);
}
}
/**
* Read 'raw' keys.
* @param key - The buffer into which the key is read
* @return Returns the key length or -1 for end of file
* @throws IOException
*/
public int nextRawKey(DataOutputBuffer key)
throws IOException {
if (!blockCompressed) {
recordLength = readRecordLength();
if (recordLength == -1) {
return -1;
}
keyLength = in.readInt();
key.write(in, keyLength);
return keyLength;
} else {
//Reset syncSeen
syncSeen = false;
// Read 'key'
if (noBufferedKeys == 0) {
if (in.getPos() >= end)
return -1;
try {
readBlock();
} catch (EOFException eof) {
return -1;
}
}
int keyLength = WritableUtils.readVInt(keyLenIn);
if (keyLength < 0) {
throw new IOException("zero length key found!");
}
key.write(keyIn, keyLength);
--noBufferedKeys;
return keyLength;
}
}
/** Read the next key in the file, skipping its
* value. Return null at end of file. */
public synchronized Object next(Object key) throws IOException {
if (key != null && key.getClass() != getKeyClass()) {
throw new IOException("wrong key class: "+key.getClass().getName()
+" is not "+keyClass);
}
if (!blockCompressed) {
outBuf.reset();
keyLength = next(outBuf);
if (keyLength < 0)
return null;
valBuffer.reset(outBuf.getData(), outBuf.getLength());
key = deserializeKey(key);
valBuffer.mark(0);
if (valBuffer.getPosition() != keyLength)
throw new IOException(key + " read " + valBuffer.getPosition()
+ " bytes, should read " + keyLength);
} else {
//Reset syncSeen
syncSeen = false;
if (noBufferedKeys == 0) {
try {
readBlock();
} catch (EOFException eof) {
return null;
}
}
int keyLength = WritableUtils.readVInt(keyLenIn);
// Sanity check
if (keyLength < 0) {
return null;
}
//Read another compressed 'key'
key = deserializeKey(key);
--noBufferedKeys;
}
return key;
}
@SuppressWarnings("unchecked")
private Object deserializeKey(Object key) throws IOException {
return keyDeserializer.deserialize(key);
}
/**
* Read 'raw' values.
* @param val - The 'raw' value
* @return Returns the value length
* @throws IOException
*/
public synchronized int nextRawValue(ValueBytes val)
throws IOException {
// Position stream to current value
seekToCurrentValue();
if (!blockCompressed) {
int valLength = recordLength - keyLength;
if (decompress) {
CompressedBytes value = (CompressedBytes)val;
value.reset(in, valLength);
} else {
UncompressedBytes value = (UncompressedBytes)val;
value.reset(in, valLength);
}
return valLength;
} else {
int valLength = WritableUtils.readVInt(valLenIn);
UncompressedBytes rawValue = (UncompressedBytes)val;
rawValue.reset(valIn, valLength);
--noBufferedValues;
return valLength;
}
}
private void handleChecksumException(ChecksumException e)
throws IOException {
if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
} else {
throw e;
}
}
/** Set the current byte position in the input file.
*
* <p>The position passed must be a position returned by {@link
* SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary
* position, use {@link SequenceFile.Reader#sync(long)}.
*/
public synchronized void seek(long position) throws IOException {
in.seek(position);
if (blockCompressed) { // trigger block read
noBufferedKeys = 0;
valuesDecompressed = true;
}
}//指定到文件位置,position必须是 SequenceFile.Writer.getLength()返回的
/** Seek to the next sync mark past a given position.*/
public synchronized void sync(long position) throws IOException {
if (position+SYNC_SIZE >= end) {
seek(end);
return;
}
try {
seek(position+4); // skip escape
in.readFully(syncCheck);
int syncLen = sync.length;
for (int i = 0; in.getPos() < end; i++) {
int j = 0;
for (; j < syncLen; j++) {
if (sync[j] != syncCheck[(i+j)%syncLen])
break;
}
if (j == syncLen) {
in.seek(in.getPos() - SYNC_SIZE); // position before sync
return;
}
syncCheck[i%syncLen] = in.readByte();
}
} catch (ChecksumException e) { // checksum failure
handleChecksumException(e);
}
}
/** Returns true iff the previous call to next passed a sync mark.*/
public boolean syncSeen() { return syncSeen; }
/** Return the current byte position in the input file. */
public synchronized long getPosition() throws IOException {
return in.getPos();
}
/** Returns the name of the file. */
public String toString() {
return file.toString();
}
}
/** Sorts key/value pairs in a sequence-format file.
*
* <p>For best performance, applications should make sure that the {@link
* Writable#readFields(DataInput)} implementation of their keys is
* very efficient. In particular, it should avoid allocating memory.
*/
public static class Sorter {
private RawComparator comparator;
private MergeSort mergeSort; //the implementation of merge sort
private Path[] inFiles; // when merging or sorting
private Path outFile;
private int memory; // bytes
private int factor; // merged per pass
private FileSystem fs = null;
private Class keyClass;
private Class valClass;
private Configuration conf;
private Progressable progressable = null;
/** Sort and merge files containing the named classes. */
/*排序和混洗*/
public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
Class valClass, Configuration conf) {
this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
}
/** Sort and merge using an arbitrary {@link RawComparator}. */
public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
Class valClass, Configuration conf) {
this.fs = fs;
this.comparator = comparator;
this.keyClass = keyClass;
this.valClass = valClass;
this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
this.factor = conf.getInt("io.sort.factor", 100);
this.conf = conf;
}
/** Set the number of streams to merge at once.*/
public void setFactor(int factor) { this.factor = factor; }
/** Get the number of streams to merge at once.*/
public int getFactor() { return factor; }
/** Set the total amount of buffer memory, in bytes.*/
public void setMemory(int memory) { this.memory = memory; }
/** Get the total amount of buffer memory, in bytes.*/
public int getMemory() { return memory; }
/** Set the progressable object in order to report progress. */
public void setProgressable(Progressable progressable) {
this.progressable = progressable;
}
/**
* Perform a file sort from a set of input files into an output file.
* @param inFiles the files to be sorted
* @param outFile the sorted output file
* @param deleteInput should the input files be deleted as they are read?
*/
public void sort(Path[] inFiles, Path outFile,
boolean deleteInput) throws IOException {
if (fs.exists(outFile)) {
throw new IOException("already exists: " + outFile);
}
this.inFiles = inFiles;
this.outFile = outFile;
int segments = sortPass(deleteInput);
if (segments > 1) {
mergePass(outFile.getParent());
}
}
/**
* Perform a file sort from a set of input files and return an iterator.
* @param inFiles the files to be sorted
* @param tempDir the directory where temp files are created during sort
* @param deleteInput should the input files be deleted as they are read?
* @return iterator the RawKeyValueIterator
*/
public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir,
boolean deleteInput) throws IOException {
Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
if (fs.exists(outFile)) {
throw new IOException("already exists: " + outFile);
}
this.inFiles = inFiles;
//outFile will basically be used as prefix for temp files in the cases
//where sort outputs multiple sorted segments. For the single segment
//case, the outputFile itself will contain the sorted data for that
//segment
this.outFile = outFile;
int segments = sortPass(deleteInput);
if (segments > 1)
return merge(outFile.suffix(".0"), outFile.suffix(".0.index"),
tempDir);
else if (segments == 1)
return merge(new Path[]{outFile}, true, tempDir);
else return null;
}
/**
* The backwards compatible interface to sort.
* @param inFile the input file to sort
* @param outFile the sorted output file
*/
public void sort(Path inFile, Path outFile) throws IOException {
sort(new Path[]{inFile}, outFile, false);
}
private int sortPass(boolean deleteInput) throws IOException {
LOG.debug("running sort pass");
SortPass sortPass = new SortPass(); // make the SortPass
sortPass.setProgressable(progressable);
mergeSort = new MergeSort(sortPass.new SeqFileComparator());
try {
return sortPass.run(deleteInput); // run it
} finally {
sortPass.close(); // close it
}
}
private class SortPass {
private int memoryLimit = memory/4;
private int recordLimit = 1000000;
private DataOutputBuffer rawKeys = new DataOutputBuffer();
private byte[] rawBuffer;
private int[] keyOffsets = new int[1024];
private int[] pointers = new int[keyOffsets.length];
private int[] pointersCopy = new int[keyOffsets.length];
private int[] keyLengths = new int[keyOffsets.length];
private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
private ArrayList segmentLengths = new ArrayList();
private Reader in = null;
private FSDataOutputStream out = null;
private FSDataOutputStream indexOut = null;
private Path outName;
private Progressable progressable = null;
public int run(boolean deleteInput) throws IOException {
int segments = 0;
int currentFile = 0;
boolean atEof = (currentFile >= inFiles.length);
boolean isCompressed = false;
boolean isBlockCompressed = false;
CompressionCodec codec = null;
segmentLengths.clear();
if (atEof) {
return 0;
}
// Initialize
in = new Reader(fs, inFiles[currentFile], conf);
isCompressed = in.isCompressed();
isBlockCompressed = in.isBlockCompressed();
codec = in.getCompressionCodec();
for (int i=0; i < rawValues.length; ++i) {
rawValues[i] = null;
}
while (!atEof) {
int count = 0;
int bytesProcessed = 0;
rawKeys.reset();
while (!atEof &&
bytesProcessed < memoryLimit && count < recordLimit) {
// Read a record into buffer
// Note: Attempt to re-use 'rawValue' as far as possible
int keyOffset = rawKeys.getLength();
ValueBytes rawValue =
(count == keyOffsets.length || rawValues[count] == null) ?
in.createValueBytes() :
rawValues[count];
int recordLength = in.nextRaw(rawKeys, rawValue);
if (recordLength == -1) {
in.close();
if (deleteInput) {
fs.delete(inFiles[currentFile], true);
}
currentFile += 1;
atEof = currentFile >= inFiles.length;
if (!atEof) {
in = new Reader(fs, inFiles[currentFile], conf);
} else {
in = null;
}
continue;
}
int keyLength = rawKeys.getLength() - keyOffset;
if (count == keyOffsets.length)
grow();
keyOffsets[count] = keyOffset; // update pointers
pointers[count] = count;
keyLengths[count] = keyLength;
rawValues[count] = rawValue;
bytesProcessed += recordLength;
count++;
}
// buffer is full -- sort & flush it
LOG.debug("flushing segment " + segments);
rawBuffer = rawKeys.getData();
sort(count);
// indicate we're making progress
if (progressable != null) {
progressable.progress();
}
flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec,
segments==0 && atEof);
segments++;
}
return segments;
}
public void close() throws IOException {
if (in != null) {
in.close();
}
if (out != null) {
out.close();
}
if (indexOut != null) {
indexOut.close();
}
}
private void grow() {
int newLength = keyOffsets.length * 3 / 2;
keyOffsets = grow(keyOffsets, newLength);
pointers = grow(pointers, newLength);
pointersCopy = new int[newLength];
keyLengths = grow(keyLengths, newLength);
rawValues = grow(rawValues, newLength);
}
private int[] grow(int[] old, int newLength) {
int[] result = new int[newLength];
System.arraycopy(old, 0, result, 0, old.length);
return result;
}
private ValueBytes[] grow(ValueBytes[] old, int newLength) {
ValueBytes[] result = new ValueBytes[newLength];
System.arraycopy(old, 0, result, 0, old.length);
for (int i=old.length; i < newLength; ++i) {
result[i] = null;
}
return result;
}
private void flush(int count, int bytesProcessed, boolean isCompressed,
boolean isBlockCompressed, CompressionCodec codec, boolean done)
throws IOException {
if (out == null) {
outName = done ? outFile : outFile.suffix(".0");
out = fs.create(outName);
if (!done) {
indexOut = fs.create(outName.suffix(".index"));
}
}
long segmentStart = out.getPos();
Writer writer = createWriter(conf, out, keyClass, valClass,
isCompressed, isBlockCompressed, codec,
new Metadata());
if (!done) {
writer.sync = null; // disable sync on temp files
}
for (int i = 0; i < count; i++) { // write in sorted order
int p = pointers[i];
writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
}
writer.close();
if (!done) {
// Save the segment length
WritableUtils.writeVLong(indexOut, segmentStart);
WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
indexOut.flush();
}
}
private void sort(int count) {
System.arraycopy(pointers, 0, pointersCopy, 0, count);
mergeSort.mergeSort(pointersCopy, pointers, 0, count);
}
class SeqFileComparator implements Comparator<IntWritable> {
public int compare(IntWritable I, IntWritable J) {
return comparator.compare(rawBuffer, keyOffsets[I.get()],
keyLengths[I.get()], rawBuffer,
keyOffsets[J.get()], keyLengths[J.get()]);
}
}
/** set the progressable object in order to report progress */
public void setProgressable(Progressable progressable)
{
this.progressable = progressable;
}
} // SequenceFile.Sorter.SortPass
/** The interface to iterate over raw keys/values of SequenceFiles. */
public static interface RawKeyValueIterator {
/** Gets the current raw key
* @return DataOutputBuffer
* @throws IOException
*/
DataOutputBuffer getKey() throws IOException;
/** Gets the current raw value
* @return ValueBytes
* @throws IOException
*/
ValueBytes getValue() throws IOException;
/** Sets up the current key and value (for getKey and getValue)
* @return true if there exists a key/value, false otherwise
* @throws IOException
*/
boolean next() throws IOException;
/** closes the iterator so that the underlying streams can be closed
* @throws IOException
*/
void close() throws IOException;
/** Gets the Progress object; this has a float (0.0 - 1.0)
* indicating the bytes processed by the iterator so far
*/
Progress getProgress();
}
/**
* Merges the list of segments of type <code>SegmentDescriptor</code>
* @param segments the list of SegmentDescriptors
* @param tmpDir the directory to write temporary files into
* @return RawKeyValueIterator
* @throws IOException
*/
public RawKeyValueIterator merge(List <SegmentDescriptor> segments,
Path tmpDir)
throws IOException {
// pass in object to report progress, if present
MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
return mQueue.merge();
}
/**
* Merges the contents of files passed in Path[] using a max factor value
* that is already set
* @param inNames the array of path names
* @param deleteInputs true if the input files should be deleted when
* unnecessary
* @param tmpDir the directory to write temporary files into
* @return RawKeyValueIteratorMergeQueue
* @throws IOException
*/
public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
Path tmpDir)
throws IOException {
return merge(inNames, deleteInputs,
(inNames.length < factor) ? inNames.length : factor,
tmpDir);
}
/**
* Merges the contents of files passed in Path[]
* @param inNames the array of path names
* @param deleteInputs true if the input files should be deleted when
* unnecessary
* @param factor the factor that will be used as the maximum merge fan-in
* @param tmpDir the directory to write temporary files into
* @return RawKeyValueIteratorMergeQueue
* @throws IOException
*/
public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
int factor, Path tmpDir)
throws IOException {
//get the segments from inNames
ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
for (int i = 0; i < inNames.length; i++) {
SegmentDescriptor s = new SegmentDescriptor(0,
fs.getLength(inNames[i]), inNames[i]);
s.preserveInput(!deleteInputs);
s.doSync();
a.add(s);
}
this.factor = factor;
MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
return mQueue.merge();
}
/**
* Merges the contents of files passed in Path[]
* @param inNames the array of path names
* @param tempDir the directory for creating temp files during merge
* @param deleteInputs true if the input files should be deleted when
* unnecessary
* @return RawKeyValueIteratorMergeQueue
* @throws IOException
*/
public RawKeyValueIterator merge(Path [] inNames, Path tempDir,
boolean deleteInputs)
throws IOException {
//outFile will basically be used as prefix for temp files for the
//intermediate merge outputs
this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
//get the segments from inNames
ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
for (int i = 0; i < inNames.length; i++) {
SegmentDescriptor s = new SegmentDescriptor(0,
fs.getLength(inNames[i]), inNames[i]);
s.preserveInput(!deleteInputs);
s.doSync();
a.add(s);
}
factor = (inNames.length < factor) ? inNames.length : factor;
// pass in object to report progress, if present
MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
return mQueue.merge();
}
/**
* Clones the attributes (like compression of the input file and creates a
* corresponding Writer
* @param inputFile the path of the input file whose attributes should be
* cloned
* @param outputFile the path of the output file
* @param prog the Progressable to report status during the file write
* @return Writer
* @throws IOException
*/
public Writer cloneFileAttributes(Path inputFile, Path outputFile,
Progressable prog)
throws IOException {
FileSystem srcFileSys = inputFile.getFileSystem(conf);
Reader reader = new Reader(srcFileSys, inputFile, 4096, conf, true);
boolean compress = reader.isCompressed();
boolean blockCompress = reader.isBlockCompressed();
CompressionCodec codec = reader.getCompressionCodec();
reader.close();
Writer writer = createWriter(outputFile.getFileSystem(conf), conf,
outputFile, keyClass, valClass, compress,
blockCompress, codec, prog,
new Metadata());
return writer;
}
/**
* Writes records from RawKeyValueIterator into a file represented by the
* passed writer
* @param records the RawKeyValueIterator
* @param writer the Writer created earlier
* @throws IOException
*/
public void writeFile(RawKeyValueIterator records, Writer writer)
throws IOException {
while(records.next()) {
writer.appendRaw(records.getKey().getData(), 0,
records.getKey().getLength(), records.getValue());
}
writer.sync();
}
/** Merge the provided files.
* @param inFiles the array of input path names
* @param outFile the final output file
* @throws IOException
*/
public void merge(Path[] inFiles, Path outFile) throws IOException {
if (fs.exists(outFile)) {
throw new IOException("already exists: " + outFile);
}
RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
writeFile(r, writer);
writer.close();
}
/** sort calls this to generate the final merged output */
private int mergePass(Path tmpDir) throws IOException {
LOG.debug("running merge pass");
Writer writer = cloneFileAttributes(
outFile.suffix(".0"), outFile, null);
RawKeyValueIterator r = merge(outFile.suffix(".0"),
outFile.suffix(".0.index"), tmpDir);
writeFile(r, writer);
writer.close();
return 0;
}
/** Used by mergePass to merge the output of the sort
* @param inName the name of the input file containing sorted segments
* @param indexIn the offsets of the sorted segments
* @param tmpDir the relative directory to store intermediate results in
* @return RawKeyValueIterator
* @throws IOException
*/
private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir)
throws IOException {
//get the segments from indexIn
//we create a SegmentContainer so that we can track segments belonging to
//inName and delete inName as soon as we see that we have looked at all
//the contained segments during the merge process & hence don't need
//them anymore
SegmentContainer container = new SegmentContainer(inName, indexIn);
MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
return mQueue.merge();
}
/** This class implements the core of the merge logic */
private class MergeQueue extends PriorityQueue
implements RawKeyValueIterator {
private boolean compress;
private boolean blockCompress;
private DataOutputBuffer rawKey = new DataOutputBuffer();
private ValueBytes rawValue;
private long totalBytesProcessed;
private float progPerByte;
private Progress mergeProgress = new Progress();
private Path tmpDir;
private Progressable progress = null; //handle to the progress reporting object
private SegmentDescriptor minSegment;
//a TreeMap used to store the segments sorted by size (segment offset and
//segment path name is used to break ties between segments of same sizes)
private Map<SegmentDescriptor, Void> sortedSegmentSizes =
new TreeMap<SegmentDescriptor, Void>();
@SuppressWarnings("unchecked")
public void put(SegmentDescriptor stream) throws IOException {
if (size() == 0) {
compress = stream.in.isCompressed();
blockCompress = stream.in.isBlockCompressed();
} else if (compress != stream.in.isCompressed() ||
blockCompress != stream.in.isBlockCompressed()) {
throw new IOException("All merged files must be compressed or not.");
}
super.put(stream);
}
/**
* A queue of file segments to merge
* @param segments the file segments to merge
* @param tmpDir a relative local directory to save intermediate files in
* @param progress the reference to the Progressable object
*/
public MergeQueue(List <SegmentDescriptor> segments,
Path tmpDir, Progressable progress) {
int size = segments.size();
for (int i = 0; i < size; i++) {
sortedSegmentSizes.put(segments.get(i), null);
}
this.tmpDir = tmpDir;
this.progress = progress;
}
protected boolean lessThan(Object a, Object b) {
// indicate we're making progress
if (progress != null) {
progress.progress();
}
SegmentDescriptor msa = (SegmentDescriptor)a;
SegmentDescriptor msb = (SegmentDescriptor)b;
return comparator.compare(msa.getKey().getData(), 0,
msa.getKey().getLength(), msb.getKey().getData(), 0,
msb.getKey().getLength()) < 0;
}
public void close() throws IOException {
SegmentDescriptor ms; // close inputs
while ((ms = (SegmentDescriptor)pop()) != null) {
ms.cleanup();
}
minSegment = null;
}
public DataOutputBuffer getKey() throws IOException {
return rawKey;
}
public ValueBytes getValue() throws IOException {
return rawValue;
}
public boolean next() throws IOException {
if (size() == 0)
return false;
if (minSegment != null) {
//minSegment is non-null for all invocations of next except the first
//one. For the first invocation, the priority queue is ready for use
//but for the subsequent invocations, first adjust the queue
adjustPriorityQueue(minSegment);
if (size() == 0) {
minSegment = null;
return false;
}
}
minSegment = (SegmentDescriptor)top();
long startPos = minSegment.in.getPosition(); // Current position in stream
//save the raw key reference
rawKey = minSegment.getKey();
//load the raw value. Re-use the existing rawValue buffer
if (rawValue == null) {
rawValue = minSegment.in.createValueBytes();
}
minSegment.nextRawValue(rawValue);
long endPos = minSegment.in.getPosition(); // End position after reading value
updateProgress(endPos - startPos);
return true;
}
public Progress getProgress() {
return mergeProgress;
}
private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
long startPos = ms.in.getPosition(); // Current position in stream
boolean hasNext = ms.nextRawKey();
long endPos = ms.in.getPosition(); // End position after reading key
updateProgress(endPos - startPos);
if (hasNext) {
adjustTop();
} else {
pop();
ms.cleanup();
}
}
private void updateProgress(long bytesProcessed) {
totalBytesProcessed += bytesProcessed;
if (progPerByte > 0) {
mergeProgress.set(totalBytesProcessed * progPerByte);
}
}
/** This is the single level merge that is called multiple times
* depending on the factor size and the number of segments
* @return RawKeyValueIterator
* @throws IOException
*/
public RawKeyValueIterator merge() throws IOException {
//create the MergeStreams from the sorted map created in the constructor
//and dump the final output to a file
int numSegments = sortedSegmentSizes.size();
int origFactor = factor;
int passNo = 1;
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
do {
//get the factor for this pass of merge
factor = getPassFactor(passNo, numSegments);
List<SegmentDescriptor> segmentsToMerge =
new ArrayList<SegmentDescriptor>();
int segmentsConsidered = 0;
int numSegmentsToConsider = factor;
while (true) {
//extract the smallest 'factor' number of segment pointers from the
//TreeMap. Call cleanup on the empty segments (no key/value data)
SegmentDescriptor[] mStream =
getSegmentDescriptors(numSegmentsToConsider);
for (int i = 0; i < mStream.length; i++) {
if (mStream[i].nextRawKey()) {
segmentsToMerge.add(mStream[i]);
segmentsConsidered++;
// Count the fact that we read some bytes in calling nextRawKey()
updateProgress(mStream[i].in.getPosition());
}
else {
mStream[i].cleanup();
numSegments--; //we ignore this segment for the merge
}
}
//if we have the desired number of segments
//or looked at all available segments, we break
if (segmentsConsidered == factor ||
sortedSegmentSizes.size() == 0) {
break;
}
numSegmentsToConsider = factor - segmentsConsidered;
}
//feed the streams to the priority queue
initialize(segmentsToMerge.size()); clear();
for (int i = 0; i < segmentsToMerge.size(); i++) {
put(segmentsToMerge.get(i));
}
//if we have lesser number of segments remaining, then just return the
//iterator, else do another single level merge
if (numSegments <= factor) {
//calculate the length of the remaining segments. Required for
//calculating the merge progress
long totalBytes = 0;
for (int i = 0; i < segmentsToMerge.size(); i++) {
totalBytes += segmentsToMerge.get(i).segmentLength;
}
if (totalBytes != 0) //being paranoid
progPerByte = 1.0f / (float)totalBytes;
//reset factor to what it originally was
factor = origFactor;
return this;
} else {
//we want to spread the creation of temp files on multiple disks if
//available under the space constraints
long approxOutputSize = 0;
for (SegmentDescriptor s : segmentsToMerge) {
approxOutputSize += s.segmentLength +
ChecksumFileSystem.getApproxChkSumLength(
s.segmentLength);
}
Path tmpFilename =
new Path(tmpDir, "intermediate").suffix("." + passNo);
Path outputFile = lDirAlloc.getLocalPathForWrite(
tmpFilename.toString(),
approxOutputSize, conf);
LOG.debug("writing intermediate results to " + outputFile);
Writer writer = cloneFileAttributes(
fs.makeQualified(segmentsToMerge.get(0).segmentPathName),
fs.makeQualified(outputFile), null);
writer.sync = null; //disable sync for temp files
writeFile(this, writer);
writer.close();
//we finished one single level merge; now clean up the priority
//queue
this.close();
SegmentDescriptor tempSegment =
new SegmentDescriptor(0, fs.getLength(outputFile), outputFile);
//put the segment back in the TreeMap
sortedSegmentSizes.put(tempSegment, null);
numSegments = sortedSegmentSizes.size();
passNo++;
}
//we are worried about only the first pass merge factor. So reset the
//factor to what it originally was
factor = origFactor;
} while(true);
}
//Hadoop-591
public int getPassFactor(int passNo, int numSegments) {
if (passNo > 1 || numSegments <= factor || factor == 1)
return factor;
int mod = (numSegments - 1) % (factor - 1);
if (mod == 0)
return factor;
return mod + 1;
}
/** Return (& remove) the requested number of segment descriptors from the
* sorted map.
*/
public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
if (numDescriptors > sortedSegmentSizes.size())
numDescriptors = sortedSegmentSizes.size();
SegmentDescriptor[] SegmentDescriptors =
new SegmentDescriptor[numDescriptors];
Iterator iter = sortedSegmentSizes.keySet().iterator();
int i = 0;
while (i < numDescriptors) {
SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
iter.remove();
}
return SegmentDescriptors;
}
} // SequenceFile.Sorter.MergeQueue
/** This class defines a merge segment. This class can be subclassed to
* provide a customized cleanup method implementation. In this
* implementation, cleanup closes the file handle and deletes the file
*/
public class SegmentDescriptor implements Comparable {
long segmentOffset; //the start of the segment in the file
long segmentLength; //the length of the segment
Path segmentPathName; //the path name of the file containing the segment
boolean ignoreSync = true; //set to true for temp files
private Reader in = null;
private DataOutputBuffer rawKey = null; //this will hold the current key
private boolean preserveInput = false; //delete input segment files?
/** Constructs a segment
* @param segmentOffset the offset of the segment in the file
* @param segmentLength the length of the segment
* @param segmentPathName the path name of the file containing the segment
*/
public SegmentDescriptor (long segmentOffset, long segmentLength,
Path segmentPathName) {
this.segmentOffset = segmentOffset;
this.segmentLength = segmentLength;
this.segmentPathName = segmentPathName;
}
/** Do the sync checks */
public void doSync() {ignoreSync = false;}
/** Whether to delete the files when no longer needed */
public void preserveInput(boolean preserve) {
preserveInput = preserve;
}
public boolean shouldPreserveInput() {
return preserveInput;
}
public int compareTo(Object o) {
SegmentDescriptor that = (SegmentDescriptor)o;
if (this.segmentLength != that.segmentLength) {
return (this.segmentLength < that.segmentLength ? -1 : 1);
}
if (this.segmentOffset != that.segmentOffset) {
return (this.segmentOffset < that.segmentOffset ? -1 : 1);
}
return (this.segmentPathName.toString()).
compareTo(that.segmentPathName.toString());
}
public boolean equals(Object o) {
if (!(o instanceof SegmentDescriptor)) {
return false;
}
SegmentDescriptor that = (SegmentDescriptor)o;
if (this.segmentLength == that.segmentLength &&
this.segmentOffset == that.segmentOffset &&
this.segmentPathName.toString().equals(
that.segmentPathName.toString())) {
return true;
}
return false;
}
public int hashCode() {
return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
}
/** Fills up the rawKey object with the key returned by the Reader
* @return true if there is a key returned; false, otherwise
* @throws IOException
*/
public boolean nextRawKey() throws IOException {
if (in == null) {
int bufferSize = conf.getInt("io.file.buffer.size", 4096);
if (fs.getUri().getScheme().startsWith("ramfs")) {
bufferSize = conf.getInt("io.bytes.per.checksum", 512);
}
Reader reader = new Reader(fs, segmentPathName,
bufferSize, segmentOffset,
segmentLength, conf, false);
//sometimes we ignore syncs especially for temp merge files
if (ignoreSync) reader.sync = null;
if (reader.getKeyClass() != keyClass)
throw new IOException("wrong key class: " + reader.getKeyClass() +
" is not " + keyClass);
if (reader.getValueClass() != valClass)
throw new IOException("wrong value class: "+reader.getValueClass()+
" is not " + valClass);
this.in = reader;
rawKey = new DataOutputBuffer();
}
rawKey.reset();
int keyLength =
in.nextRawKey(rawKey);
return (keyLength >= 0);
}
/** Fills up the passed rawValue with the value corresponding to the key
* read earlier
* @param rawValue
* @return the length of the value
* @throws IOException
*/
public int nextRawValue(ValueBytes rawValue) throws IOException {
int valLength = in.nextRawValue(rawValue);
return valLength;
}
/** Returns the stored rawKey */
public DataOutputBuffer getKey() {
return rawKey;
}
/** closes the underlying reader */
private void close() throws IOException {
this.in.close();
this.in = null;
}
/** The default cleanup. Subclasses can override this with a custom
* cleanup
*/
public void cleanup() throws IOException {
close();
if (!preserveInput) {
fs.delete(segmentPathName, true);
}
}
} // SequenceFile.Sorter.SegmentDescriptor
/** This class provisions multiple segments contained within a single
* file
*/
private class LinkedSegmentsDescriptor extends SegmentDescriptor {
SegmentContainer parentContainer = null;
/** Constructs a segment
* @param segmentOffset the offset of the segment in the file
* @param segmentLength the length of the segment
* @param segmentPathName the path name of the file containing the segment
* @param parent the parent SegmentContainer that holds the segment
*/
public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength,
Path segmentPathName, SegmentContainer parent) {
super(segmentOffset, segmentLength, segmentPathName);
this.parentContainer = parent;
}
/** The default cleanup. Subclasses can override this with a custom
* cleanup
*/
public void cleanup() throws IOException {
super.close();
if (super.shouldPreserveInput()) return;
parentContainer.cleanup();
}
} //SequenceFile.Sorter.LinkedSegmentsDescriptor
/** The class that defines a container for segments to be merged. Primarily
* required to delete temp files as soon as all the contained segments
* have been looked at */
private class SegmentContainer {
private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
private int numSegmentsContained; //# of segments contained
private Path inName; //input file from where segments are created
//the list of segments read from the file
private ArrayList <SegmentDescriptor> segments =
new ArrayList <SegmentDescriptor>();
/** This constructor is there primarily to serve the sort routine that
* generates a single output file with an associated index file */
public SegmentContainer(Path inName, Path indexIn) throws IOException {
//get the segments from indexIn
FSDataInputStream fsIndexIn = fs.open(indexIn);
long end = fs.getLength(indexIn);
while (fsIndexIn.getPos() < end) {
long segmentOffset = WritableUtils.readVLong(fsIndexIn);
long segmentLength = WritableUtils.readVLong(fsIndexIn);
Path segmentName = inName;
segments.add(new LinkedSegmentsDescriptor(segmentOffset,
segmentLength, segmentName, this));
}
fsIndexIn.close();
fs.delete(indexIn, true);
numSegmentsContained = segments.size();
this.inName = inName;
}
public List <SegmentDescriptor> getSegmentList() {
return segments;
}
public void cleanup() throws IOException {
numSegmentsCleanedUp++;
if (numSegmentsCleanedUp == numSegmentsContained) {
fs.delete(inName, true);
}
}
} //SequenceFile.Sorter.SegmentContainer
} // SequenceFile.Sorter
} // SequenceFile
2.MapFile映射文件
MapFile使用的是java的map类型存储数据,使用key-value的形式,其中的数据是SequenceFile经Sortr之后的数据。MapFile分为部分,一是索引index,二是数据data。该类使用内部类Writer和Reader,这两个类中也使用了MapFile中的方法。
MapFile有两个直接的子类ArraryFile和SetFile,ArraryFile中定义了一个稠密型的MapFile且键值都是整型的,SetFile只有key,所有的value都是NUllWritable的。
package org.apache.hadoop.io;
import java.io.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
public class MapFile {
private static final Log LOG = LogFactory.getLog(MapFile.class);//配置对象
/** The name of the index file. */
public static final String INDEX_FILE_NAME = "index";//代表索引文件的名称
/** The name of the data file. */
public static final String DATA_FILE_NAME = "data";//数据文件的名称
protected MapFile() {} // no public ctor
/** Writes a new map. */
public static class Writer implements java.io.Closeable {
private SequenceFile.Writer data;//代表对数据文件进行写的Writer
private SequenceFile.Writer index;//代表对索引文件进行写的Writer
final private static String INDEX_INTERVAL = "io.map.index.interval";//与索引间隔相对应的属性字符串
private int indexInterval = 128;//每隔128个键值对设置一个索引
private long size;//添加到data文件中的键值对个数
private LongWritable position = new LongWritable();//读取的位置信息
// the following fields are used only for checking key order
//下面四个用于检测key值的顺序
private WritableComparator comparator;
private DataInputBuffer inBuf = new DataInputBuffer();
private DataOutputBuffer outBuf = new DataOutputBuffer();
private WritableComparable lastKey;
/** Create the named map for keys of the named class. */
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass)
throws IOException {
this(conf, fs, dirName,
WritableComparator.get(keyClass), valClass,
SequenceFile.getCompressionType(conf));
}
/** Create the named map for keys of the named class. */
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
CompressionType compress, Progressable progress)
throws IOException {
this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
compress, progress);
}
/** Create the named map for keys of the named class. */
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
CompressionType compress, CompressionCodec codec,
Progressable progress)
throws IOException {
this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
compress, codec, progress);
}
/** Create the named map for keys of the named class. */
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
CompressionType compress)
throws IOException {
this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress);
}
/** Create the named map using the named key comparator. */
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass)
throws IOException {
this(conf, fs, dirName, comparator, valClass,
SequenceFile.getCompressionType(conf));
}
/** Create the named map using the named key comparator. */
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
SequenceFile.CompressionType compress)
throws IOException {
this(conf, fs, dirName, comparator, valClass, compress, null);
}
/** Create the named map using the named key comparator. */
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
SequenceFile.CompressionType compress,
Progressable progress)
throws IOException {
this(conf, fs, dirName, comparator, valClass,
compress, new DefaultCodec(), progress);
}
/** Create the named map using the named key comparator. */
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
SequenceFile.CompressionType compress, CompressionCodec codec,
Progressable progress)
throws IOException {
this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
this.comparator = comparator;
this.lastKey = comparator.newKey();
Path dir = new Path(dirName);
if (!fs.mkdirs(dir)) {
throw new IOException("Mkdirs failed to create directory " + dir.toString());
}
Path dataFile = new Path(dir, DATA_FILE_NAME);
Path indexFile = new Path(dir, INDEX_FILE_NAME);
Class keyClass = comparator.getKeyClass();
this.data =
SequenceFile.createWriter
(fs, conf, dataFile, keyClass, valClass, compress, codec, progress);
this.index =
SequenceFile.createWriter
(fs, conf, indexFile, keyClass, LongWritable.class,
CompressionType.BLOCK, progress);
}
/** The number of entries that are added before an index entry is added.*/
public int getIndexInterval() { return indexInterval; } //返回索引间隔
/** Sets the index interval.
* @see #getIndexInterval()
*///设置索引间隔
public void setIndexInterval(int interval) { indexInterval = interval; }
/** Sets the index interval and stores it in conf
* @see #getIndexInterval()
*///设置索引间隔并存储在conf中
public static void setIndexInterval(Configuration conf, int interval) {
conf.setInt(INDEX_INTERVAL, interval);
}
/** Close the map. */
public synchronized void close() throws IOException {
data.close();
index.close();
}//关闭map
/** Append a key/value pair to the map. The key must be greater or equal
* to the previous key added to the map. */
/*添加一个key到map中,如果size是indexInterval的倍数,则新建一个index
* 然后将key添加进去,否则则直接添加key*/
public synchronized void append(WritableComparable key, Writable val)
throws IOException {
checkKey(key);
if (size % indexInterval == 0) { // add an index entry
position.set(data.getLength()); // point to current eof
index.append(key, position);
}
data.append(key, val); // append key/value to data
size++;
}
private void checkKey(WritableComparable key) throws IOException {
// check that keys are well-ordered检查key是否顺序正确
if (size != 0 && comparator.compare(lastKey, key) > 0)
throw new IOException("key out of order: "+key+" after "+lastKey);
// update lastKey with a copy of key by writing and reading
outBuf.reset();
key.write(outBuf); // write new key
inBuf.reset(outBuf.getData(), outBuf.getLength());
lastKey.readFields(inBuf); // read into lastKey
}
}
/** Provide access to an existing map.
* 提供一个通道使可以独处map*/
public static class Reader implements java.io.Closeable {
/** Number of index entries to skip between each entry. Zero by default.
* Setting this to values larger than zero can facilitate opening large map
* files using less memory. */
private int INDEX_SKIP = 0;//代表跳过的索引数,设置大于零的数可实现小内存打开大文件
private WritableComparator comparator;//key使用的比较器
private WritableComparable nextKey;//将要读取的下一个key
private long seekPosition = -1;
private int seekIndex = -1;//上面两个标识用于读取的位置信息
private long firstPosition;
// the data, on disk用于标识用于读取数据和索引的Reader
private SequenceFile.Reader data;
private SequenceFile.Reader index;
// whether the index Reader was closed
private boolean indexClosed = false;//Reader是否关闭
// the index, in memory
private int count = -1;//index存放在内存中的数量
private WritableComparable[] keys;//存储内存中的key集合
private long[] positions;//key的位置
/** Returns the class of keys in this file.
* 返回key的类型*/
public Class<?> getKeyClass() { return data.getKeyClass(); }
/** Returns the class of values in this file.
* 返回value的类型*/
public Class<?> getValueClass() { return data.getValueClass(); }
/** Construct a map reader for the named map.*/
public Reader(FileSystem fs, String dirName, Configuration conf) throws IOException {
this(fs, dirName, null, conf);
INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
}
/** Construct a map reader for the named map using the named comparator.*/
public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf)
throws IOException {
this(fs, dirName, comparator, conf, true);
}
/**
* Hook to allow subclasses to defer opening streams until further
* initialization is complete.
* @see #createDataFileReader(FileSystem, Path, Configuration)
*/
protected Reader(FileSystem fs, String dirName,
WritableComparator comparator, Configuration conf, boolean open)
throws IOException {
if (open) {
open(fs, dirName, comparator, conf);
}
}
protected synchronized void open(FileSystem fs, String dirName,
WritableComparator comparator, Configuration conf) throws IOException {
Path dir = new Path(dirName);
Path dataFile = new Path(dir, DATA_FILE_NAME);
Path indexFile = new Path(dir, INDEX_FILE_NAME);
// open the data
this.data = createDataFileReader(fs, dataFile, conf);
this.firstPosition = data.getPosition();
if (comparator == null)
this.comparator = WritableComparator.get(data.getKeyClass().asSubclass(WritableComparable.class));
else
this.comparator = comparator;
// open the index
this.index = new SequenceFile.Reader(fs, indexFile, conf);
}//打开文件和索引文件
/**
* Override this method to specialize the type of
* {@link SequenceFile.Reader} returned.
*/
protected SequenceFile.Reader createDataFileReader(FileSystem fs,
Path dataFile, Configuration conf) throws IOException {
return new SequenceFile.Reader(fs, dataFile, conf);
}
private void readIndex() throws IOException {
// read the index entirely into memory
if (this.keys != null)
return;
this.count = 0;
this.keys = new WritableComparable[1024];
this.positions = new long[1024];
try {
int skip = INDEX_SKIP;
LongWritable position = new LongWritable();
WritableComparable lastKey = null;
while (true) {
WritableComparable k = comparator.newKey();
if (!index.next(k, position))
break;
// check order to make sure comparator is compatible
if (lastKey != null && comparator.compare(lastKey, k) > 0)
throw new IOException("key out of order: "+k+" after "+lastKey);
lastKey = k;
if (skip > 0) {
skip--;
continue; // skip this entry
} else {
skip = INDEX_SKIP; // reset skip
}
if (count == keys.length) { // time to grow arrays
int newLength = (keys.length*3)/2;
WritableComparable[] newKeys = new WritableComparable[newLength];
long[] newPositions = new long[newLength];
System.arraycopy(keys, 0, newKeys, 0, count);
System.arraycopy(positions, 0, newPositions, 0, count);
keys = newKeys;
positions = newPositions;
}
keys[count] = k;
positions[count] = position.get();
count++;
}
} catch (EOFException e) {
LOG.warn("Unexpected EOF reading " + index +
" at entry #" + count + ". Ignoring.");
} finally {
indexClosed = true;
index.close();
}
}//将索引文件读进内存
/** Re-positions the reader before its first key. */
public synchronized void reset() throws IOException {
data.seek(firstPosition);
}//将data的读取器置于第一个key之前
/** Get the key at approximately the middle of the file.
*
* @throws IOException
*/
public synchronized WritableComparable midKey() throws IOException {
readIndex();
int pos = ((count - 1) / 2); // middle of the index
if (pos < 0) {
throw new IOException("MapFile empty");
}
return keys[pos];
}//取得文件中大约是中间的key
/** Reads the final key from the file.
*
* @param key key to read into
*/
public synchronized void finalKey(WritableComparable key)
throws IOException {
long originalPosition = data.getPosition(); // save position
try {
readIndex(); // make sure index is valid
if (count > 0) {
data.seek(positions[count-1]); // skip to last indexed entry
} else {
reset(); // start at the beginning
}
while (data.next(key)) {} // scan to eof
} finally {
data.seek(originalPosition); // restore position
}
}//得到最后一个key值
/*
*/
public synchronized boolean seek(WritableComparable key) throws IOException {
return seekInternal(key) == 0;
}//是否找到key
/**
* 下面的方法用于:
*定位到指定的key,如果找到返回0,
* 返回1代表在下一个记录中,
* 等于1说明到文件末尾
*/
private synchronized int seekInternal(WritableComparable key)
throws IOException {
return seekInternal(key, false);
}
/**
* Positions the reader at the named key, or if none such exists, at the
* key that falls just before or just after dependent on how the
* <code>before</code> parameter is set.
*
* @param before - IF true, and <code>key</code> does not exist, position
* file at entry that falls just before <code>key</code>. Otherwise,
* position file at record that sorts just after.
* @return 0 - exact match found
* < 0 - positioned at next record
* 1 - no more records in file
*/
private synchronized int seekInternal(WritableComparable key,
final boolean before)
throws IOException {
readIndex(); // make sure index is read
if (seekIndex != -1 // seeked before
&& seekIndex+1 < count
&& comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
&& comparator.compare(key, nextKey)
>= 0) { // but after last seeked
// do nothing
} else {
seekIndex = binarySearch(key);
if (seekIndex < 0) // decode insertion point
seekIndex = -seekIndex-2;
if (seekIndex == -1) // belongs before first entry
seekPosition = firstPosition; // use beginning of file
else
seekPosition = positions[seekIndex]; // else use index
}
data.seek(seekPosition);
if (nextKey == null)
nextKey = comparator.newKey();
// If we're looking for the key before, we need to keep track
// of the position we got the current key as well as the position
// of the key before it.
long prevPosition = -1;
long curPosition = seekPosition;
while (data.next(nextKey)) {
int c = comparator.compare(key, nextKey);
if (c <= 0) { // at or beyond desired
if (before && c != 0) {
if (prevPosition == -1) {
// We're on the first record of this index block
// and we've already passed the search key. Therefore
// we must be at the beginning of the file, so seek
// to the beginning of this block and return c
data.seek(curPosition);
} else {
// We have a previous record to back up to
data.seek(prevPosition);
data.next(nextKey);
// now that we've rewound, the search key must be greater than this key
return 1;
}
}
return c;
}
if (before) {
prevPosition = curPosition;
curPosition = data.getPosition();
}
}
return 1;
}
private int binarySearch(WritableComparable key) {
int low = 0;
int high = count-1;
while (low <= high) {
int mid = (low + high) >>> 1;
WritableComparable midVal = keys[mid];
int cmp = comparator.compare(midVal, key);
if (cmp < 0)
low = mid + 1;
else if (cmp > 0)
high = mid - 1;
else
return mid; // key found
}
return -(low + 1); // key not found.
}
/** Read the next key/value pair in the map into <code>key</code> and
* <code>val</code>. Returns true if such a pair exists and false when at
* the end of the map */
public synchronized boolean next(WritableComparable key, Writable val)
throws IOException {
return data.next(key, val);
}//读取下一个键值对
/** Return the value for the named key, or null if none exists. */
public synchronized Writable get(WritableComparable key, Writable val)
throws IOException {
if (seek(key)) {
data.getCurrentValue(val);
return val;
} else
return null;
}//返回指定的key对应的value,没有找到则返回null
/**
* Finds the record that is the closest match to the specified key.
* Returns <code>key</code> or if it does not exist, at the first entry
* after the named key.
*
- * @param key - key that we're trying to find
- * @param val - data value if key is found
- * @return - the key that was the closest match or null if eof.
*/
public synchronized WritableComparable getClosest(WritableComparable key,
Writable val)
throws IOException {
return getClosest(key, val, false);
}//返回与某个key最靠近的key,没有找到就返回null
/**
* Finds the record that is the closest match to the specified key.
*
* @param key - key that we're trying to find
* @param val - data value if key is found
* @param before - IF true, and <code>key</code> does not exist, return
* the first entry that falls just before the <code>key</code>. Otherwise,
* return the record that sorts just after.
* @return - the key that was the closest match or null if eof.
*/
public synchronized WritableComparable getClosest(WritableComparable key,
Writable val, final boolean before)
throws IOException {
int c = seekInternal(key, before);
// If we didn't get an exact match, and we ended up in the wrong
// direction relative to the query key, return null since we
// must be at the beginning or end of the file.
if ((!before && c > 0) ||
(before && c < 0)) {
return null;
}
data.getCurrentValue(val);
return nextKey;
}
/** Close the map. */
public synchronized void close() throws IOException {
if (!indexClosed) {
index.close();
}
data.close();
}
}
/** Renames an existing map directory. */
public static void rename(FileSystem fs, String oldName, String newName)
throws IOException {
Path oldDir = new Path(oldName);
Path newDir = new Path(newName);
if (!fs.rename(oldDir, newDir)) {
throw new IOException("Could not rename " + oldDir + " to " + newDir);
}
}//对map目录进行重命名
/** Deletes the named map file. */
public static void delete(FileSystem fs, String name) throws IOException {
Path dir = new Path(name);
Path data = new Path(dir, DATA_FILE_NAME);
Path index = new Path(dir, INDEX_FILE_NAME);
fs.delete(data, true);//删除data
fs.delete(index, true);//删除idex
fs.delete(dir, true);//删除目录
}//删除map目录
/**
* This method attempts to fix a corrupt MapFile by re-creating its index.
* @param fs filesystem
* @param dir directory containing the MapFile data and index
* @param keyClass key class (has to be a subclass of Writable)
* @param valueClass value class (has to be a subclass of Writable)
* @param dryrun do not perform any changes, just report what needs to be done
* @return number of valid entries in this MapFile, or -1 if no fixing was needed
* @throws Exception
*/
public static long fix(FileSystem fs, Path dir,
Class<? extends Writable> keyClass,
Class<? extends Writable> valueClass, boolean dryrun,
Configuration conf) throws Exception {
String dr = (dryrun ? "[DRY RUN ] " : "");
Path data = new Path(dir, DATA_FILE_NAME);
Path index = new Path(dir, INDEX_FILE_NAME);
int indexInterval = 128;
if (!fs.exists(data)) {
// there's nothing we can do to fix this!
throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
}
if (fs.exists(index)) {
// no fixing needed
return -1;
}
SequenceFile.Reader dataReader = new SequenceFile.Reader(fs, data, conf);
if (!dataReader.getKeyClass().equals(keyClass)) {
throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
", got " + dataReader.getKeyClass().getName());
}
if (!dataReader.getValueClass().equals(valueClass)) {
throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
", got " + dataReader.getValueClass().getName());
}
long cnt = 0L;
Writable key = ReflectionUtils.newInstance(keyClass, conf);
Writable value = ReflectionUtils.newInstance(valueClass, conf);
SequenceFile.Writer indexWriter = null;
if (!dryrun) indexWriter = SequenceFile.createWriter(fs, conf, index, keyClass, LongWritable.class);
try {
long pos = 0L;
LongWritable position = new LongWritable();
while(dataReader.next(key, value)) {
cnt++;
if (cnt % indexInterval == 0) {
position.set(pos);
if (!dryrun) indexWriter.append(key, position);
}
pos = dataReader.getPosition();
}
} catch(Throwable t) {
// truncated data file. swallow it.
}
dataReader.close();
if (!dryrun) indexWriter.close();
return cnt;
}//通过重新创建索引文件来对出错的MapFile文件修复
public static void main(String[] args) throws Exception {
String usage = "Usage: MapFile inFile outFile";
if (args.length != 2) {
System.err.println(usage);
System.exit(-1);
}
String in = args[0];
String out = args[1];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
MapFile.Reader reader = new MapFile.Reader(fs, in, conf);
MapFile.Writer writer =
new MapFile.Writer(conf, fs, out,
reader.getKeyClass().asSubclass(WritableComparable.class),
reader.getValueClass());
WritableComparable key =
ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf);
Writable value =
ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf);
while (reader.next(key, value)) // copy all entries
writer.append(key, value);
writer.close();
}
}