spak---序列化和压缩

目录

前沿

序列化

在spark中内置了两个数据序列化类:JavaSerializer和KryoSerializer,这两个继承于抽象类Serializer,而在Spark SQL中SparkSqlSerializer继承于KryoSerializer,它们之间的关系如下所示:

Spark初始序列化是在SparkEnv类进行创建,在该类中根据spark.serializer配置,初始化序列化实例,然后把该实例作为参数初始化SerializerManager实例,而SerializerManager作为参数初始化BlockManager,代码如下所示:

val serializer = instantiateClassFromConf[Serializer](
  "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
// closureSerializer : org.apache.spark.serializer.Serializer
val closureSerializer = new JavaSerializer(conf)

closureSerializer用法:
 Spark的作业会通过DAGScheduler的处理生产许多的Task并构建成DAG图,而分割出的Task最终是需要经过网络分发到不同的Executor。在分发的时候,Task一般都会依赖一些文件和Jar包,这些依赖的文件和Jar会对增加分发的时间,所以Spark在分发Task的时候会将Task进行序列化,包括对依赖文件和Jar包的序列化。这个是通过spark.closure.serializer参数设置的。

SparkEnv中,用SparkConf作为参数,初始化JavaSerializer实例;
DAGScheduler中用法:

private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

进入该方法,其实就是初始化一个JavaSerializerInstance实例

override def newInstance(): SerializerInstance = {
    val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
    new JavaSerializerInstance(counterReset, extraDebugInfo, classLoader)
  }

DAGScheduler拿到这个JavaSerializerInstance实例,做什么?在何处做?经过源码分析,定位到submitMissingTasks方法

      /** Called when stage's parents are available and we can now do its task. */
      private def submitMissingTasks(stage: Stage, jobId: Int) {
        ...省略不相关代码
        var taskBinary: Broadcast[Array[Byte]] = null
        try {
          // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
          // For ResultTask, serialize and broadcast (rdd, func).
          val taskBinaryBytes: Array[Byte] = stage match {
            case stage: ShuffleMapStage =>
              JavaUtils.bufferToArray(
                closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
            case stage: ResultStage =>
              JavaUtils.bufferToArray(closureSerializer.===serialize===((stage.rdd, stage.func): AnyRef))
          }
    
          taskBinary = sc.broadcast(taskBinaryBytes)
        } catch {
          // In the case of a failure during serialization, abort the stage.
          case e: NotSerializableException =>
            abortStage(stage, "Task not serializable: " + e.toString, Some(e))
            runningStages -= stage
    
            // Abort execution
            return
          case NonFatal(e) =>
            abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
            runningStages -= stage
            return
        }
          ...省略不相关代码
      }

根据stage进行匹配,如果是ShuffleMapStage ,调用

      JavaUtils.bufferToArray(
                    closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))

即调用JavaSerializerInstance.serialize()。 在该方法中,首先初始化一个ByteBufferOutputStream实例bos,然后以bos为参数,调用JavaSerializerInstance.serializeStream()方法,实际上就是初始化一个JavaSerializationStream实例out,接着调用了ObjectOutputStream.writeObject()方法,至此,实现了把对象t,也就是(stage.rdd, stage.shuffleDep)发送到输出流ByteBufferOutputStream。再接着,调用ByteBufferOutputStream.toByteBuffer()方法,其实返回了一个HeapByteBuffer实例。最后,调用JavaUtils.bufferToArray(HeapByteBuffer)方法

JavaSerializerInstance.serialize()方法。
  override def serialize[T: ClassTag](t: T): ByteBuffer = {
    val bos = new ByteBufferOutputStream()
    val out = serializeStream(bos)
    out.writeObject(t)
    out.close()
    bos.toByteBuffer
  }

我们来看JavaSerializerInstance.serializeStream()方法

 override def serializeStream(s: OutputStream): SerializationStream = {
    new JavaSerializationStream(s, counterReset, extraDebugInfo)
  }

我们来看ByteBufferOutputStream.toByteBuffer(),该方法调用了ByteBuffer.wrap()方法

  def toByteBuffer: ByteBuffer = {
    require(closed, "can only call toByteBuffer() after ByteBufferOutputStream has been closed")
    ByteBuffer.wrap(buf, 0, count)
  }

我们来看ByteBuffer.wrap()方法

    public static ByteBuffer wrap(byte[] array,
                                    int offset, int length)
    {
        try {
            return new HeapByteBuffer(array, offset, length);
        } catch (IllegalArgumentException x) {
            throw new IndexOutOfBoundsException();
        }
    }

猜你喜欢

转载自blog.csdn.net/qq_29573903/article/details/82977001
今日推荐