前沿
序列化
在spark中内置了两个数据序列化类:JavaSerializer和KryoSerializer,这两个继承于抽象类Serializer,而在Spark SQL中SparkSqlSerializer继承于KryoSerializer,它们之间的关系如下所示:
Spark初始序列化是在SparkEnv类进行创建,在该类中根据spark.serializer配置,初始化序列化实例,然后把该实例作为参数初始化SerializerManager实例,而SerializerManager作为参数初始化BlockManager,代码如下所示:
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
// closureSerializer : org.apache.spark.serializer.Serializer
val closureSerializer = new JavaSerializer(conf)
closureSerializer用法:
Spark的作业会通过DAGScheduler的处理生产许多的Task并构建成DAG图,而分割出的Task最终是需要经过网络分发到不同的Executor。在分发的时候,Task一般都会依赖一些文件和Jar包,这些依赖的文件和Jar会对增加分发的时间,所以Spark在分发Task的时候会将Task进行序列化,包括对依赖文件和Jar包的序列化。这个是通过spark.closure.serializer参数设置的。
SparkEnv中,用SparkConf作为参数,初始化JavaSerializer实例;
DAGScheduler中用法:
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
进入该方法,其实就是初始化一个JavaSerializerInstance实例
override def newInstance(): SerializerInstance = {
val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
new JavaSerializerInstance(counterReset, extraDebugInfo, classLoader)
}
DAGScheduler拿到这个JavaSerializerInstance实例,做什么?在何处做?经过源码分析,定位到submitMissingTasks方法
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
...省略不相关代码
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.===serialize===((stage.rdd, stage.func): AnyRef))
}
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
// Abort execution
return
case NonFatal(e) =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
...省略不相关代码
}
根据stage进行匹配,如果是ShuffleMapStage ,调用
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
即调用JavaSerializerInstance.serialize()。 在该方法中,首先初始化一个ByteBufferOutputStream实例bos,然后以bos为参数,调用JavaSerializerInstance.serializeStream()方法,实际上就是初始化一个JavaSerializationStream实例out,接着调用了ObjectOutputStream.writeObject()方法,至此,实现了把对象t,也就是(stage.rdd, stage.shuffleDep)发送到输出流ByteBufferOutputStream。再接着,调用ByteBufferOutputStream.toByteBuffer()方法,其实返回了一个HeapByteBuffer实例。最后,调用JavaUtils.bufferToArray(HeapByteBuffer)方法
JavaSerializerInstance.serialize()方法。
override def serialize[T: ClassTag](t: T): ByteBuffer = {
val bos = new ByteBufferOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
bos.toByteBuffer
}
我们来看JavaSerializerInstance.serializeStream()方法
override def serializeStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s, counterReset, extraDebugInfo)
}
我们来看ByteBufferOutputStream.toByteBuffer(),该方法调用了ByteBuffer.wrap()方法
def toByteBuffer: ByteBuffer = {
require(closed, "can only call toByteBuffer() after ByteBufferOutputStream has been closed")
ByteBuffer.wrap(buf, 0, count)
}
我们来看ByteBuffer.wrap()方法
public static ByteBuffer wrap(byte[] array,
int offset, int length)
{
try {
return new HeapByteBuffer(array, offset, length);
} catch (IllegalArgumentException x) {
throw new IndexOutOfBoundsException();
}
}