Spark-Core源码学习记录 2 应用提交Submit 及Driver注册流程,以JavaWordCount为例

Spark-Core源码学习记录

该系列作为Spark源码回顾学习的记录,旨在捋清Spark分发程序运行的机制和流程,对部分关键源码进行追踪,争取做到知其所以然,对枝节部分源码仅进行文字说明,不深入下钻,避免混淆主干内容。
前面几篇文章从Spark集群的启动开始,对MasterWorkerDriverExecutorRpcEnv等组件的注册调用流程进行了介绍。从本篇文章开始,将从一个应用的提交开始,对Spark的任务划分及调度进行流程上的追踪,从而达到对Spark运行机制的一个整体把握。

spark-submit 脚本

首先,撇开具体应用程序,先来看看必经之路spark-submit脚本的内容:

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

直接调用spark-class进行处理,同时传入*.deploy.SparkSubmit,应该还对spark-class有印象,之前传入的是*.deploy.Master*.deploy.Workerspark-class内部调用的*.launcher.Main作为一个统一入口:

public static void main(String[] argsArray) throws Exception {
  if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
        AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
        cmd = buildCommand(builder, env, printLaunchCommand);
   }
   List<String> bashCmd = prepareBashCommand(cmd, env);
   for (String c : bashCmd) {
      System.out.print(c);
      System.out.print('\0');
   }
}

代码和之前的区别点在于SparkSubmitCommandBuilder方法:

SparkSubmitCommandBuilder(List<String> args) {
	...
  switch (args.get(0)) {
	// 可以看到对传入的第一个参数做了匹配,主要用于spark命令行和自带的运行实例
	case PYSPARK_SHELL:
	  ...
	case SPARKR_SHELL:
	  ...
	case RUN_EXAMPLE:
          isExample = true;
          appResource = SparkLauncher.NO_RESOURCE;
          // 这里args(0)就是‘run-example’,所以在参数列表里处理掉
          submitArgs = args.subList(1, args.size());
    this.isExample = isExample;
    // 对传入参数进行一些匹配复制和封装,不再展开
    OptionParser parser = new OptionParser(true);
    parser.parse(submitArgs);
}

经过和Master类似的参数输出,(不熟悉的朋友可以参见本系列的第一篇文章)最终会去调用*.deploy.SparkSubmit,并传入参数列表。下面直接进入该类的主方法。

deploy.SparkSubmit

override def main(args: Array[String]): Unit = {
  val submit = new SparkSubmit() {
      self =>
      ...
      override def doSubmit(args: Array[String]): Unit = {
        try {
          super.doSubmit(args)
        } catch {...}
      }
  }
  // 默认就是调用 SparkSubmit类的 doSubmit方法,当然可以在上面重写 doSubmit方法,完成自己的一些特定需求。
  submit.doSubmit(args)
}
def doSubmit(args: Array[String]): Unit = {
  // 对传入参数进行封装,并且提供了许多默认属性值,属性比较繁琐,不必深入了解
  val appArgs = parseArguments(args)
  // action = Option(action).getOrElse(SUBMIT)
  // action默认值就是 SUBMIT
  appArgs.action match {
    case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
    case SparkSubmitAction.KILL => kill(appArgs)
    case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    case SparkSubmitAction.PRINT_VERSION => printVersion()
  }
}
@tailrec // 尾递归标志
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
  // 此处省略一些对环境的判断,所有的判断结构最终都会指向 runMain方法,因此直接去runMain方法一探究竟
  runMain(args, uninitLog)
}

先来看一下runMain方法的注释:

/**
   * Run the main method of the child class using the submit arguments.
   * 启动参数中提供的 class的 main方法
   * This runs in two steps. First, we prepare the launch environment by setting up
   * the appropriate classpath, system properties, and application arguments for
   * running the child main class based on the cluster manager and the deploy mode.
   * Second, we use this launch environment to invoke the main method of the child
   * main class.
   * 启动分两步,首先准备环境,然后启动 main方法
   * Note that this main class will not be the one provided by the user if we're
   * running cluster deploy mode or python applications.
   * 在集群模式下或者提交的是 python应用程序,main方法可能不止一个
   */

进入runMain方法:

 private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    // prepareSubmitEnvironment 准备运行环境,内容非常丰富,放在下面粗略的过一眼
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    // Creates a new class loader
    // ChildFirstURLClassLoader<-MutableURLClassLoader<-URLClassLoader<-SecureClassLoader<-ClassLoader
    val loader =
      if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    // 指定当前线程的ClassLoader为刚刚得到的loader
    Thread.currentThread.setContextClassLoader(loader)
	// 加入依赖
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }
	var mainClass: Class[_] = null
    try {
      // 注意,此处将上面返回的 childMainClass作为mainclass
      mainClass = Utils.classForName(childMainClass)
    }
    val app: SparkApplication = 
    if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
      // Scala编写的类如果是直接继承至APP,那么就没有显示的 MainClass,所以需要特殊处理一下
        if (classOf[scala.App].isAssignableFrom(Utils.classForName(s"$childMainClass$$"))) {
          logWarning("Subclasses of scala.App may not work correctly. " +
            "Use a main() method instead.")
        }
      new JavaMainApplication(mainClass)
    }
    try {
      // 调用start方法,SparkApplication是一个trait特质,因此我们可以查看其实现类 JavaMainApplication的start方法
      app.start(childArgs.toArray, sparkConf)
    } 
}
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
  override def start(args: Array[String], conf: SparkConf): Unit = {
    // 通过反射得到传入类的静态 main方法
    val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
    // 正式调用 main方法 ,此main方法为上面的 childMainClass,具体指向我们去到 prepareSubmitEnvironment方法才能知晓
    mainMethod.invoke(null, args)
  }
}

prepareSubmitEnvironment 方法一览

我们现在急需知道返回的childMainClass指向是哪里,让我们来大概看一看prepareSubmitEnvironment方法,重点关注 childMainClass参数:

/* Prepare the environment for submitting an application. */
private[deploy] def prepareSubmitEnvironment(...):(Seq[String], Seq[String], SparkConf, String) = {
  // Return values 4个返回结果组成的元组
  val childArgs = new ArrayBuffer[String]()
  val childClasspath = new ArrayBuffer[String]()
  val sparkConf = new SparkConf()  //初始化了SparkConf
  var childMainClass = "" //重点关注此变量
  // Set the cluster manager 根据传入的master参数判断运行模式
  val clusterManager: Int = args.master match {
    case "yarn" => YARN
    case "yarn-client" | "yarn-cluster" => YARN
    case m if m.startsWith("spark") => STANDALONE
    case m if m.startsWith("mesos") => MESOS
    case m if m.startsWith("local") => LOCAL
    case _ => ...
  }
  // Set the deploy mode; default is client mode  判断程序部署模式
  var deployMode: Int = args.deployMode match {
    case "client" | null => CLIENT
    case "cluster" => CLUSTER
    case _ => ... 
  }
  val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
  val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
// 根据传参完善SparkConf属性
  args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
  val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
  // In client mode, download remote files.
  if (deployMode == CLIENT) {...}
  // We will download them to local disk prior to add to YARN's distributed cache.
  // For yarn client mode, since we already download them with above code, so we only need to
  // figure out the local path and replace the remote one.
  if (clusterManager == YARN) {...}
  // Special flag to avoid deprecation warnings at the client
  sys.props("SPARK_SUBMIT") = "true"
  // A list of rules to map each argument to system properties or command-line options in
  // each deploy mode; we iterate through these below
  // 一些配置信息组成的List,包括不同模式下
  val options = List[OptionAssigner](
    // All cluster managers
    OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
    ...,	  
    // Propagate attributes for dependency resolution at the driver side
    OptionAssigner(args.packagesExclusions, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.excludes"),
	...,
    // Yarn only
    OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
    ...,
    // Other options
    OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
    OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
      confKey = DRIVER_MEMORY.key),
    OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
      confKey = DRIVER_CORES.key),
    // An internal option used only for spark-shell to add user jars to repl's classloader,
    // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
    // remote jars, so adding a new option to only specify local jars for spark-shell internally.
    OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")
  )
  // In client mode, launch the application main class directly
  // In addition, add the main application jar and any added jars (if any) to the classpath
  // client模式下,直接将我们传入应用程序的 mainClass赋值给 childMainClass ,同时将依赖jar加入cp
  // 多说一句,其实这里就能印证,在client模式下,driver运行在当前节点上,所以不需要为driver分配节点,也即不需要加入到 waitingDrivers中,等待 schedule方法调度为其分配节点
  if (deployMode == CLIENT) {
    childMainClass = args.mainClass
    if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
      childClasspath += localPrimaryResource
    }
    if (localJars != null) { childClasspath ++= localJars.split(",") }
  }
  // 在 YARN的Client模式下,依赖包也会被提前下载到本地
  if (isYarnCluster) {
    if (isUserJar(args.primaryResource)) {
      childClasspath += args.primaryResource
    }
    if (args.jars != null) { childClasspath ++= args.jars.split(",") }
  }
  if (deployMode == CLIENT) {
    if (args.childArgs != null) { childArgs ++= args.childArgs }
  }
  //在 YARN的Cluster模式下,jar包也被提前提前分派至各个节点
  if (!isYarnCluster && !args.isPython && !args.isR) {
    var jars = sparkConf.get(JARS)
    if (isUserJar(args.primaryResource)) {
      jars = jars ++ Seq(args.primaryResource)
    }
    sparkConf.set(JARS, jars)
  }
  // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
  // All Spark parameters are expected to be passed to the client through system properties.
  // 这才是本次关注的 StandaloneCluster模式
  if (args.isStandaloneCluster) {
    // REST模式,暂不考虑,这还是个默认模式,不过失败后,会重新赋值useRest = false,再次SUBMIT,然后进入else分支
    if (args.useRest) {
      childMainClass = REST_CLUSTER_SUBMIT_CLASS
      childArgs += (args.primaryResource, args.mainClass)
    } else {
      // In legacy standalone cluster mode, use Client as a wrapper around the user class
      // 这才是 childMainClass
      // STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
      // ClientApp 在 org.apache.spark.deploy.Client 下
      // 继承关系 ClientApp extends SparkApplication
      childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
      if (args.supervise) { childArgs += "--supervise" }
      Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
      Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
      childArgs += "launch"
      // 注意,此处将我们自己的应用程序mainClass加入参数列表中,这点很重要,关系到应用程序的启动
      childArgs += (args.master, args.primaryResource, args.mainClass)
    }
    if (args.childArgs != null) {
      childArgs ++= args.childArgs
    }
  }
  // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
  if (isYarnCluster) {...}
  // Load any properties specified through --conf and the default properties file
  for ((k, v) <- args.sparkProperties) {
    sparkConf.setIfMissing(k, v)
  }
  // Ignore invalid spark.driver.host in cluster modes.
  if (deployMode == CLUSTER) {
    sparkConf.remove(DRIVER_HOST_ADDRESS)
  }
  // Resolve paths in certain spark properties
  val pathConfigs = Seq(
    "spark.jars",
    "spark.files",
    "spark.yarn.dist.files",
    "spark.yarn.dist.archives",
    "spark.yarn.dist.jars")
  pathConfigs.foreach { config =>
    // Replace old URIs with resolved URIs, if they exist
    sparkConf.getOption(config).foreach { oldValue =>
      sparkConf.set(config, Utils.resolveURIs(oldValue))
    }
  }
  (childArgs, childClasspath, sparkConf, childMainClass)
}

prepareSubmitEnvironment方法内容比较多,上面也仅仅是保留的一些主干内容,如需更加详细的源码,可自行查阅。
现在

val app: SparkApplication = 
    if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {...}
try {
      // 调用start方法,SparkApplication是一个trait特质,因此查看其实现类 ClientApp的start方法
      app.start(childArgs.toArray, sparkConf)
} 
private[spark] class ClientApp extends SparkApplication {
  override def start(args: Array[String], conf: SparkConf): Unit = {
    val driverArgs = new ClientArguments(args)
    val rpcEnv = RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
	// 根据传入masterUrl获取master的 Rpc入口
    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
      map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
    // 本地注册一个 ClientEndpoint,去看看它的 onStart方法
    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
    rpcEnv.awaitTermination()
  }
}
private class ClientEndpoint(...) extends ThreadSafeRpcEndpoint with Logging {
 override def onStart(): Unit = {
  driverArgs.cmd match {
    case "launch" => //前面封装的lanch参数

      val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
	  // 构造模板类 Command,注意两个mainClass的区别
	  // 同时注意 ,Seq元组前面两个固定字符串 WORKER_URL和 USER_JAR,用于后面启动driver时的模式匹配,下面会提及
      val command = new Command(mainClass,
        Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
        sys.env, classPathEntries, libraryPathEntries, javaOpts)
	  // 构造模板类 DriverDescription
      val driverDescription = new DriverDescription(
        driverArgs.jarUrl,
        driverArgs.memory,
        driverArgs.cores,
        driverArgs.supervise,
        command) 
	  // 此方法通过前面获取的Master引用,发送 RequestSubmitDriver消息,并传入driverDescription
      asyncSendToMasterAndForwardReply[SubmitDriverResponse](
        RequestSubmitDriver(driverDescription))
    case "kill" => ...     
  }
 }
}  

去到Master端看接收到RequestSubmitDriver消息作何处理

case RequestSubmitDriver(description) =>
  if (state != RecoveryState.ALIVE) {
    val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
      "Can only accept driver submissions in ALIVE state."
    context.reply(SubmitDriverResponse(self, false, None, msg))
  } else { // 不存活就不考虑了
    logInfo("Driver submitted " + description.command.mainClass)
    // 根据传参实例化 DriverInfo,简单的封装类
    val driver = createDriver(description)
    persistenceEngine.addDriver(driver)
    // 这里熟息吧,前面 schedule方法的介绍中,waitingDrivers的内容就是在此处填充的
    waitingDrivers += driver
    drivers.add(driver)
    // 资源变更就触发,根据之前的文章,就是对新加入的driver,见缝插针分配到一个worker上
    schedule()
    // 返回一个 SubmitDriverResponse消息
    context.reply(SubmitDriverResponse(self, true, Some(driver.id),
      s"Driver successfully submitted as ${driver.id}"))
  }

ClientEndpoint收到SubmitDriverResponse消息做一些简单的绑定处理:

override def receive: PartialFunction[Any, Unit] = {
  case SubmitDriverResponse(master, success, driverId, message) =>
    logInfo(message)
    if (success) {
      activeMasterEndpoint = master
      pollAndReportStatus(driverId.get)
    } else if (!Utils.responseFromBackup(message)) {
      System.exit(-1)
    }

到这里需要回到Driver启动的章节,也就是schedule方法章节中,在Driver的启动过程最后,是通过调用一个ProcessBuilder完成。可以简单的看似调用上面DriverDescription.command.mainClass,也即org.apache.spark.deploy.worker.DriverWrapper,我们直接进入DriverWrapper

object DriverWrapper extends Logging {
  def main(args: Array[String]) {
    args.toList match {
      /*IMPORTANT: Spark 1.3 provides a stable application submission gateway */
	  // 这里的模式匹配就用到上面的Seq元组,此时的mainClass正是我们自己的应用程序
      case workerUrl :: userJar :: mainClass :: extraArgs =>
        val conf = new SparkConf()
        val rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf))
		...
        // Delegate to supplied main class
		// 根据传参得到主类的main方法入口
        val clazz = Utils.classForName(mainClass)
        val mainMethod = clazz.getMethod("main", classOf[Array[String]])
		// 到此,正式进入我们的应用程序中
        mainMethod.invoke(null, extraArgs.toArray[String])
        rpcEnv.shutdown()
	...
}

到此,正式进入我们的应用程序中。

主程序的开始

这里我们以Spark官方提供的入门示例程序:~\examples\src\main\java\JavaWordCount.java为例,但是要注意,我们是以此为源代码打包为自己的应用程序,而不是直接运行自带的spark-examples_2.11-2.3.1.jar。(区别在于文章最开始提到的SparkSubmitCommandBuilder方法会对传入参数进行判断是不是RUN_EXAMPLE,忘记的话可以去开头部分看一下)

//package org.apache.spark.examples;
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class JavaWordCount {
  //设置分隔符为空格
  private static final Pattern SPACE = Pattern.compile(" ");
  public static void main(String[] args) throws Exception {
    if (args.length < 1) {
      System.err.println("Usage: JavaWordCount <file>");
      System.exit(1);
    }
	// 从Saprk2.2开始,SparkSession就作为统一的入口了,内部
	//仍是根据不同的情况生成相应的SparkContext,下面会展开详述
    SparkSession spark = SparkSession
      .builder()
      .appName("JavaWordCount")
      .getOrCreate();

    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();

    JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());

    JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));

    JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
	// action算子,触发程序提交
    List<Tuple2<String, Integer>> output = counts.collect();
    
    for (Tuple2<?,?> tuple : output) {
      System.out.println(tuple._1() + ": " + tuple._2());
    }
    spark.stop();
  }
}

我们先来看看SparkSession.builder().appName("JavaWordCount").getOrCreate();

@Stable //Stable注解说明主版本是稳定的,只有在不同主版本之间可能存在不兼容
object SparkSession extends Logging {
  /* Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]].*/
  // 直接返回一个 Builder实例对象
  def builder(): Builder = new Builder
    /**
   * Builder for [[SparkSession]].
   */
  @Stable
  // 作为 SparkSession的内部类
  class Builder extends Logging {...}
}

因此后面的.appName("JavaWordCount").getOrCreate();都是Builder类中的方法调用,我们可以看一下appName方法的调用,其他方法都是类似的:

def appName(name: String): Builder = config("spark.app.name", name)
def config(key: String, value: String): Builder = synchronized {
  options += key -> value
  this
}

方法很简单也不需解释,注意就config的返回值是this,也就是Builder对象本身,因此可以进行链式调用。

getOrCreate最终实例化过程

Builder调用过程的最后是getOrCreate()方法

def getOrCreate(): SparkSession = synchronized {
  // SparkSession should only be created and accessed on the driver.
  assertOnDriver()
  // Get the session from current thread's active session.
  var session = activeThreadSession.get()
  // 如果当前线程内存在实例化的 sparkContext,那么直接赋值conf就可以
  if ((session ne null) && !session.sparkContext.isStopped) {
    options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
    if (options.nonEmpty) {
      logWarning("Using an existing SparkSession; some configuration may not take effect.")
    }
    return session
  }
  // 当前线程不存在可用的 SparkSession时会去查询集群中是否有可用的,也即 global session
  // Global synchronization so we will only set the default session once.
  SparkSession.synchronized {
    // If the current thread does not have an active session, get it from the global session.
    // private val defaultSession = new AtomicReference[SparkSession]
    // 在AtomicReference初始化中有 private volatile V value;其中V就是传入的类型参数 SparkSession,有volatile 关键字,
    // 调用get()方法返回的value值因此就是一个已经实例化的 SparkSession
    session = defaultSession.get()
    if ((session ne null) && !session.sparkContext.isStopped) {
      options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
      if (options.nonEmpty) {
        logWarning("Using an existing SparkSession; some configuration may not take effect.")
      }
      return session
    }
    // 下面才是初次提交应用程序会触发的代码,也即global环境里是干净的
    // No active nor global default session. Create a new one.
    val sparkContext = userSuppliedContext.getOrElse {
      // 实例化SparkConf,并添加属性值,options是前面builder里面设置的参数
      val sparkConf = new SparkConf()
      options.foreach { case (k, v) => sparkConf.set(k, v) }
      // set a random app name if not given.
      if (!sparkConf.contains("spark.app.name")) {
        sparkConf.setAppName(java.util.UUID.randomUUID().toString)
      }
	  // 正式实例化SaprkContext,重要组件,之后会详解。注意:实例化SparkContext后就不可以再改变SparkConf的内容了
      SparkContext.getOrCreate(sparkConf)
    }
	// 实例化 SparkSession并记录进 DefaultSession和ActiveSession,以供后边程序重复使用
    session = new SparkSession(sparkContext, None, None, extensions)
    options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
    setDefaultSession(session)
    setActiveSession(session)
    // SparkListener 用于程序执行结束的善后处理
    sparkContext.addSparkListener(new SparkListener {
      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
        defaultSession.set(null)
      }
    })
  }
  return session
}

SparkSession中也封装了很多方法,其中以createDataFrame最为常用,不过我们不在此处展开,我们仍是返回到JavaWordCount中继续下去,首先是spark.read().textFile(args[0]).javaRDD();

def read: DataFrameReader = new DataFrameReader(self)
def textFile(paths: String*): Dataset[String] = {
    assertNoSpecifiedSchema("textFile")
    text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
}
/* Loads text files and returns a `DataFrame` 
   whose schema starts with a string column named "value",
   and followed by partitioned columns if there are any.*/
def text(paths: String*): DataFrame = format("text").load(paths : _*)
// load方法内部较为复杂,最终是通过调用Dataset.ofRows()来读取文件内容

最终将文件的每一行作为Dataset的一个row返回。然后就是flatMap、mapToPair、reduceByKey三个transformation算子,最终通过collect这个action算子来触发后续操作。

小结

到目前为止,一个应用的启动过程仅仅有了个简单的轮廓,而关于SparkContext的实例化以及collect算子触发的内容,才是Spark的重中之重,下面我们会通过两篇文章分别对它们进行详细的说明。

参考:

Apache Spark 源码

猜你喜欢

转载自blog.csdn.net/u011372108/article/details/89248353