ObjectWritable是一个封装类,适用于字段需要使用多种类型。
ObjectWritable有三个主要的成员变量,被封装的对象实例instance、该对象运行时类的Class 对象和Configuration 对象
private Class declaredClass; private Object instance; private Configuration conf;
ObjectWritable中有个内部类NullInstance,从类名就可以看出,这是用来处理instance为空的。NullInstance有两个构造参数,一个默认构造参数,返回一个null,另一个构造参数有两个参数:declaredClass、conf。
private static class NullInstance extends Configured implements Writable { private Class<?> declaredClass; public NullInstance() { super(null); } public NullInstance(Class declaredClass, Configuration conf) { super(conf); this.declaredClass = declaredClass; }
NullInstance有两个主要方法,write和readFields。write方法仅仅将类名称进行序列化,readFields方法,首先从流中读取一个String,然后在基本类型映射PRIMITIVE_NAMES中找到对应的类型,如果找不到,则去conf里
@Override public void readFields(DataInput in) throws IOException { String className = UTF8.readString(in); declaredClass = PRIMITIVE_NAMES.get(className); if (declaredClass == null) { try { declaredClass = getConf().getClassByName(className); } catch (ClassNotFoundException e) { throw new RuntimeException(e.toString()); } } }
继续回到ObjectWritable,下面介绍ObjectWritable的两个核心方法,writeObject和readObject
writeObject顾名思义就是将对象序列化,writeObject有2个重载方法,不过最后都有调用writeObject(DataOutput out, Object instance, Class declaredClass, Configuration conf, boolean allowCompactArrays)。本方法主要分为两部分,第一部分将类名写入序列化,第二部分根据不同的类型,将数据进行序列化
/** Write a {@link Writable}, {@link String}, primitive type, or an array of * the preceding. */ public static void writeObject(DataOutput out, Object instance, Class declaredClass, Configuration conf) throws IOException { writeObject(out, instance, declaredClass, conf, false); } /** * Write a {@link Writable}, {@link String}, primitive type, or an array of * the preceding. * * @param allowCompactArrays - set true for RPC and internal or intra-cluster * usages. Set false for inter-cluster, File, and other persisted output * usages, to preserve the ability to interchange files with other clusters * that may not be running the same version of software. Sometime in ~2013 * we can consider removing this parameter and always using the compact format. */ public static void writeObject(DataOutput out, Object instance, Class declaredClass, Configuration conf, boolean allowCompactArrays) throws IOException { if (instance == null) { // 如果实例为空,则new NullInstance作为实例 instance = new NullInstance(declaredClass, conf); declaredClass = Writable.class; } // Special case: must come before writing out the declaredClass. // If this is an eligible array of primitives, // wrap it in an ArrayPrimitiveWritable$Internal wrapper class. //如果declaredClass是个数组,且允许压缩数组(allowCompactArrays=true) if (allowCompactArrays && declaredClass.isArray() && instance.getClass().getName().equals(declaredClass.getName()) && instance.getClass().getComponentType().isPrimitive()) { instance = new ArrayPrimitiveWritable.Internal(instance); declaredClass = ArrayPrimitiveWritable.Internal.class; } //序列化类名 UTF8.writeString(out, declaredClass.getName()); // always write declared //allowCompactArrays=false,declaredClass是个数组 if (declaredClass.isArray()) { // non-primitive or non-compact array int length = Array.getLength(instance); out.writeInt(length); for (int i = 0; i < length; i++) { writeObject(out, Array.get(instance, i), declaredClass.getComponentType(), conf, allowCompactArrays); } } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) { ((ArrayPrimitiveWritable.Internal) instance).write(out); } else if (declaredClass == String.class) { // String UTF8.writeString(out, (String)instance); } else if (declaredClass.isPrimitive()) { // primitive type if (declaredClass == Boolean.TYPE) { // boolean out.writeBoolean(((Boolean)instance).booleanValue()); } else if (declaredClass == Character.TYPE) { // char out.writeChar(((Character)instance).charValue()); } else if (declaredClass == Byte.TYPE) { // byte out.writeByte(((Byte)instance).byteValue()); } else if (declaredClass == Short.TYPE) { // short out.writeShort(((Short)instance).shortValue()); } else if (declaredClass == Integer.TYPE) { // int out.writeInt(((Integer)instance).intValue()); } else if (declaredClass == Long.TYPE) { // long out.writeLong(((Long)instance).longValue()); } else if (declaredClass == Float.TYPE) { // float out.writeFloat(((Float)instance).floatValue()); } else if (declaredClass == Double.TYPE) { // double out.writeDouble(((Double)instance).doubleValue()); } else if (declaredClass == Void.TYPE) { // void } else { throw new IllegalArgumentException("Not a primitive: "+declaredClass); } } else if (declaredClass.isEnum()) { // enum UTF8.writeString(out, ((Enum)instance).name()); } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable UTF8.writeString(out, instance.getClass().getName()); ((Writable)instance).write(out); } else if (Message.class.isAssignableFrom(declaredClass)) { ((Message)instance).writeDelimitedTo( DataOutputOutputStream.constructOutputStream(out)); } else { throw new IOException("Can't write: "+instance+" as "+declaredClass); } }
readObject反序列化。和writeObject一样,有2个重载方法,最后实现方法是readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)。一样分为两部分,首先读取类名,接着读取数据,并根据不同的类型调用不同的读取方法
/** Read a {@link Writable}, {@link String}, primitive type, or an array of * the preceding. */ @SuppressWarnings("unchecked") public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf) throws IOException { //读取类型 String className = UTF8.readString(in); //从PRIMITIVE_NAMES匹配基本数据类型 Class<?> declaredClass = PRIMITIVE_NAMES.get(className); if (declaredClass == null) { //从conf中找类型 declaredClass = loadClass(conf, className); } Object instance; //如果是基础类型,根据不同类型进行不同的读取 if (declaredClass.isPrimitive()) { // primitive types if (declaredClass == Boolean.TYPE) { // boolean instance = Boolean.valueOf(in.readBoolean()); } else if (declaredClass == Character.TYPE) { // char instance = Character.valueOf(in.readChar()); } else if (declaredClass == Byte.TYPE) { // byte instance = Byte.valueOf(in.readByte()); } else if (declaredClass == Short.TYPE) { // short instance = Short.valueOf(in.readShort()); } else if (declaredClass == Integer.TYPE) { // int instance = Integer.valueOf(in.readInt()); } else if (declaredClass == Long.TYPE) { // long instance = Long.valueOf(in.readLong()); } else if (declaredClass == Float.TYPE) { // float instance = Float.valueOf(in.readFloat()); } else if (declaredClass == Double.TYPE) { // double instance = Double.valueOf(in.readDouble()); } else if (declaredClass == Void.TYPE) { // void instance = null; } else { throw new IllegalArgumentException("Not a primitive: "+declaredClass); } } else if (declaredClass.isArray()) { // array int length = in.readInt(); instance = Array.newInstance(declaredClass.getComponentType(), length); for (int i = 0; i < length; i++) { Array.set(instance, i, readObject(in, conf)); } } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) { // Read and unwrap ArrayPrimitiveWritable$Internal array. // Always allow the read, even if write is disabled by allowCompactArrays. ArrayPrimitiveWritable.Internal temp = new ArrayPrimitiveWritable.Internal(); temp.readFields(in); instance = temp.get(); declaredClass = instance.getClass(); } else if (declaredClass == String.class) { // String instance = UTF8.readString(in); } else if (declaredClass.isEnum()) { // enum instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in)); } else if (Message.class.isAssignableFrom(declaredClass)) { instance = tryInstantiateProtobuf(declaredClass, in); } else { // Writable Class instanceClass = null; String str = UTF8.readString(in); instanceClass = loadClass(conf, str); Writable writable = WritableFactories.newInstance(instanceClass, conf); writable.readFields(in); instance = writable; if (instanceClass == NullInstance.class) { // null declaredClass = ((NullInstance)instance).declaredClass; instance = null; } } if (objectWritable != null) { // store values objectWritable.declaredClass = declaredClass; objectWritable.instance = instance; } return instance; }