Spark-Core源码学习记录
该系列作为Spark源码回顾学习的记录,旨在捋清Spark分发程序运行的机制和流程,对部分关键源码进行追踪,争取做到知其所以然,对枝节部分源码仅进行文字说明,不深入下钻,避免混淆主干内容。
前面几篇文章从Spark
集群的启动开始,对Master
、Worker
、Driver
、Executor
和RpcEnv
等组件的注册调用流程进行了介绍。从本篇文章开始,将从一个应用的提交开始,对Spark
的任务划分及调度进行流程上的追踪,从而达到对Spark
运行机制的一个整体把握。
spark-submit 脚本
首先,撇开具体应用程序,先来看看必经之路spark-submit
脚本的内容:
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
直接调用spark-class
进行处理,同时传入*.deploy.SparkSubmit
,应该还对spark-class
有印象,之前传入的是*.deploy.Master
和*.deploy.Worker
,spark-class
内部调用的*.launcher.Main
作为一个统一入口:
public static void main(String[] argsArray) throws Exception {
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
cmd = buildCommand(builder, env, printLaunchCommand);
}
List<String> bashCmd = prepareBashCommand(cmd, env);
for (String c : bashCmd) {
System.out.print(c);
System.out.print('\0');
}
}
代码和之前的区别点在于SparkSubmitCommandBuilder方法:
SparkSubmitCommandBuilder(List<String> args) {
...
switch (args.get(0)) {
// 可以看到对传入的第一个参数做了匹配,主要用于spark命令行和自带的运行实例
case PYSPARK_SHELL:
...
case SPARKR_SHELL:
...
case RUN_EXAMPLE:
isExample = true;
appResource = SparkLauncher.NO_RESOURCE;
// 这里args(0)就是‘run-example’,所以在参数列表里处理掉
submitArgs = args.subList(1, args.size());
this.isExample = isExample;
// 对传入参数进行一些匹配复制和封装,不再展开
OptionParser parser = new OptionParser(true);
parser.parse(submitArgs);
}
经过和Master
类似的参数输出,(不熟悉的朋友可以参见本系列的第一篇文章)最终会去调用*.deploy.SparkSubmit
,并传入参数列表。下面直接进入该类的主方法。
deploy.SparkSubmit
override def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {
self =>
...
override def doSubmit(args: Array[String]): Unit = {
try {
super.doSubmit(args)
} catch {...}
}
}
// 默认就是调用 SparkSubmit类的 doSubmit方法,当然可以在上面重写 doSubmit方法,完成自己的一些特定需求。
submit.doSubmit(args)
}
def doSubmit(args: Array[String]): Unit = {
// 对传入参数进行封装,并且提供了许多默认属性值,属性比较繁琐,不必深入了解
val appArgs = parseArguments(args)
// action = Option(action).getOrElse(SUBMIT)
// action默认值就是 SUBMIT
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
@tailrec // 尾递归标志
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// 此处省略一些对环境的判断,所有的判断结构最终都会指向 runMain方法,因此直接去runMain方法一探究竟
runMain(args, uninitLog)
}
先来看一下runMain方法的注释:
/**
* Run the main method of the child class using the submit arguments.
* 启动参数中提供的 class的 main方法
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
* 启动分两步,首先准备环境,然后启动 main方法
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
* 在集群模式下或者提交的是 python应用程序,main方法可能不止一个
*/
进入runMain方法:
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// prepareSubmitEnvironment 准备运行环境,内容非常丰富,放在下面粗略的过一眼
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// Creates a new class loader
// ChildFirstURLClassLoader<-MutableURLClassLoader<-URLClassLoader<-SecureClassLoader<-ClassLoader
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
// 指定当前线程的ClassLoader为刚刚得到的loader
Thread.currentThread.setContextClassLoader(loader)
// 加入依赖
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
var mainClass: Class[_] = null
try {
// 注意,此处将上面返回的 childMainClass作为mainclass
mainClass = Utils.classForName(childMainClass)
}
val app: SparkApplication =
if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
// Scala编写的类如果是直接继承至APP,那么就没有显示的 MainClass,所以需要特殊处理一下
if (classOf[scala.App].isAssignableFrom(Utils.classForName(s"$childMainClass$$"))) {
logWarning("Subclasses of scala.App may not work correctly. " +
"Use a main() method instead.")
}
new JavaMainApplication(mainClass)
}
try {
// 调用start方法,SparkApplication是一个trait特质,因此我们可以查看其实现类 JavaMainApplication的start方法
app.start(childArgs.toArray, sparkConf)
}
}
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
// 通过反射得到传入类的静态 main方法
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
// 正式调用 main方法 ,此main方法为上面的 childMainClass,具体指向我们去到 prepareSubmitEnvironment方法才能知晓
mainMethod.invoke(null, args)
}
}
prepareSubmitEnvironment 方法一览
我们现在急需知道返回的childMainClass指向是哪里,让我们来大概看一看prepareSubmitEnvironment方法,重点关注 childMainClass参数:
/* Prepare the environment for submitting an application. */
private[deploy] def prepareSubmitEnvironment(...):(Seq[String], Seq[String], SparkConf, String) = {
// Return values 4个返回结果组成的元组
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = new SparkConf() //初始化了SparkConf
var childMainClass = "" //重点关注此变量
// Set the cluster manager 根据传入的master参数判断运行模式
val clusterManager: Int = args.master match {
case "yarn" => YARN
case "yarn-client" | "yarn-cluster" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
case _ => ...
}
// Set the deploy mode; default is client mode 判断程序部署模式
var deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ => ...
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
// 根据传参完善SparkConf属性
args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
// In client mode, download remote files.
if (deployMode == CLIENT) {...}
// We will download them to local disk prior to add to YARN's distributed cache.
// For yarn client mode, since we already download them with above code, so we only need to
// figure out the local path and replace the remote one.
if (clusterManager == YARN) {...}
// Special flag to avoid deprecation warnings at the client
sys.props("SPARK_SUBMIT") = "true"
// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
// 一些配置信息组成的List,包括不同模式下
val options = List[OptionAssigner](
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
...,
// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.excludes"),
...,
// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
...,
// Other options
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = DRIVER_MEMORY.key),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = DRIVER_CORES.key),
// An internal option used only for spark-shell to add user jars to repl's classloader,
// previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
// remote jars, so adding a new option to only specify local jars for spark-shell internally.
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")
)
// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
// client模式下,直接将我们传入应用程序的 mainClass赋值给 childMainClass ,同时将依赖jar加入cp
// 多说一句,其实这里就能印证,在client模式下,driver运行在当前节点上,所以不需要为driver分配节点,也即不需要加入到 waitingDrivers中,等待 schedule方法调度为其分配节点
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
// 在 YARN的Client模式下,依赖包也会被提前下载到本地
if (isYarnCluster) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
}
if (deployMode == CLIENT) {
if (args.childArgs != null) { childArgs ++= args.childArgs }
}
//在 YARN的Cluster模式下,jar包也被提前提前分派至各个节点
if (!isYarnCluster && !args.isPython && !args.isR) {
var jars = sparkConf.get(JARS)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sparkConf.set(JARS, jars)
}
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
// All Spark parameters are expected to be passed to the client through system properties.
// 这才是本次关注的 StandaloneCluster模式
if (args.isStandaloneCluster) {
// REST模式,暂不考虑,这还是个默认模式,不过失败后,会重新赋值useRest = false,再次SUBMIT,然后进入else分支
if (args.useRest) {
childMainClass = REST_CLUSTER_SUBMIT_CLASS
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
// 这才是 childMainClass
// STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
// ClientApp 在 org.apache.spark.deploy.Client 下
// 继承关系 ClientApp extends SparkApplication
childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
// 注意,此处将我们自己的应用程序mainClass加入参数列表中,这点很重要,关系到应用程序的启动
childArgs += (args.master, args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {...}
// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
}
// Ignore invalid spark.driver.host in cluster modes.
if (deployMode == CLUSTER) {
sparkConf.remove(DRIVER_HOST_ADDRESS)
}
// Resolve paths in certain spark properties
val pathConfigs = Seq(
"spark.jars",
"spark.files",
"spark.yarn.dist.files",
"spark.yarn.dist.archives",
"spark.yarn.dist.jars")
pathConfigs.foreach { config =>
// Replace old URIs with resolved URIs, if they exist
sparkConf.getOption(config).foreach { oldValue =>
sparkConf.set(config, Utils.resolveURIs(oldValue))
}
}
(childArgs, childClasspath, sparkConf, childMainClass)
}
prepareSubmitEnvironment方法内容比较多,上面也仅仅是保留的一些主干内容,如需更加详细的源码,可自行查阅。
现在
val app: SparkApplication =
if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {...}
try {
// 调用start方法,SparkApplication是一个trait特质,因此查看其实现类 ClientApp的start方法
app.start(childArgs.toArray, sparkConf)
}
private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val driverArgs = new ClientArguments(args)
val rpcEnv = RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
// 根据传入masterUrl获取master的 Rpc入口
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
// 本地注册一个 ClientEndpoint,去看看它的 onStart方法
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
}
private class ClientEndpoint(...) extends ThreadSafeRpcEndpoint with Logging {
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" => //前面封装的lanch参数
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
// 构造模板类 Command,注意两个mainClass的区别
// 同时注意 ,Seq元组前面两个固定字符串 WORKER_URL和 USER_JAR,用于后面启动driver时的模式匹配,下面会提及
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
// 构造模板类 DriverDescription
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
// 此方法通过前面获取的Master引用,发送 RequestSubmitDriver消息,并传入driverDescription
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
case "kill" => ...
}
}
}
去到Master
端看接收到RequestSubmitDriver消息作何处理
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else { // 不存活就不考虑了
logInfo("Driver submitted " + description.command.mainClass)
// 根据传参实例化 DriverInfo,简单的封装类
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
// 这里熟息吧,前面 schedule方法的介绍中,waitingDrivers的内容就是在此处填充的
waitingDrivers += driver
drivers.add(driver)
// 资源变更就触发,根据之前的文章,就是对新加入的driver,见缝插针分配到一个worker上
schedule()
// 返回一个 SubmitDriverResponse消息
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
ClientEndpoint
收到SubmitDriverResponse消息做一些简单的绑定处理:
override def receive: PartialFunction[Any, Unit] = {
case SubmitDriverResponse(master, success, driverId, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
pollAndReportStatus(driverId.get)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}
到这里需要回到Driver
启动的章节,也就是schedule方法
的章节中,在Driver的启动过程最后,是通过调用一个ProcessBuilder完成。可以简单的看似调用上面DriverDescription.command.mainClass
,也即org.apache.spark.deploy.worker.DriverWrapper
,我们直接进入DriverWrapper
object DriverWrapper extends Logging {
def main(args: Array[String]) {
args.toList match {
/*IMPORTANT: Spark 1.3 provides a stable application submission gateway */
// 这里的模式匹配就用到上面的Seq元组,此时的mainClass正是我们自己的应用程序
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf))
...
// Delegate to supplied main class
// 根据传参得到主类的main方法入口
val clazz = Utils.classForName(mainClass)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
// 到此,正式进入我们的应用程序中
mainMethod.invoke(null, extraArgs.toArray[String])
rpcEnv.shutdown()
...
}
到此,正式进入我们的应用程序中。
主程序的开始
这里我们以Spark
官方提供的入门示例程序:~\examples\src\main\java\JavaWordCount.java
为例,但是要注意,我们是以此为源代码打包为自己的应用程序,而不是直接运行自带的spark-examples_2.11-2.3.1.jar
。(区别在于文章最开始提到的SparkSubmitCommandBuilder方法会对传入参数进行判断是不是RUN_EXAMPLE
,忘记的话可以去开头部分看一下)
//package org.apache.spark.examples;
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class JavaWordCount {
//设置分隔符为空格
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}
// 从Saprk2.2开始,SparkSession就作为统一的入口了,内部
//仍是根据不同的情况生成相应的SparkContext,下面会展开详述
SparkSession spark = SparkSession
.builder()
.appName("JavaWordCount")
.getOrCreate();
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
// action算子,触发程序提交
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
spark.stop();
}
}
我们先来看看SparkSession.builder().appName("JavaWordCount").getOrCreate();
@Stable //Stable注解说明主版本是稳定的,只有在不同主版本之间可能存在不兼容
object SparkSession extends Logging {
/* Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]].*/
// 直接返回一个 Builder实例对象
def builder(): Builder = new Builder
/**
* Builder for [[SparkSession]].
*/
@Stable
// 作为 SparkSession的内部类
class Builder extends Logging {...}
}
因此后面的.appName("JavaWordCount").getOrCreate();
都是Builder
类中的方法调用,我们可以看一下appName方法的调用,其他方法都是类似的:
def appName(name: String): Builder = config("spark.app.name", name)
def config(key: String, value: String): Builder = synchronized {
options += key -> value
this
}
方法很简单也不需解释,注意就config的返回值是this
,也就是Builder
对象本身,因此可以进行链式调用。
getOrCreate最终实例化过程
Builder
调用过程的最后是getOrCreate()
方法
def getOrCreate(): SparkSession = synchronized {
// SparkSession should only be created and accessed on the driver.
assertOnDriver()
// Get the session from current thread's active session.
var session = activeThreadSession.get()
// 如果当前线程内存在实例化的 sparkContext,那么直接赋值conf就可以
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
if (options.nonEmpty) {
logWarning("Using an existing SparkSession; some configuration may not take effect.")
}
return session
}
// 当前线程不存在可用的 SparkSession时会去查询集群中是否有可用的,也即 global session
// Global synchronization so we will only set the default session once.
SparkSession.synchronized {
// If the current thread does not have an active session, get it from the global session.
// private val defaultSession = new AtomicReference[SparkSession]
// 在AtomicReference初始化中有 private volatile V value;其中V就是传入的类型参数 SparkSession,有volatile 关键字,
// 调用get()方法返回的value值因此就是一个已经实例化的 SparkSession
session = defaultSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
if (options.nonEmpty) {
logWarning("Using an existing SparkSession; some configuration may not take effect.")
}
return session
}
// 下面才是初次提交应用程序会触发的代码,也即global环境里是干净的
// No active nor global default session. Create a new one.
val sparkContext = userSuppliedContext.getOrElse {
// 实例化SparkConf,并添加属性值,options是前面builder里面设置的参数
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }
// set a random app name if not given.
if (!sparkConf.contains("spark.app.name")) {
sparkConf.setAppName(java.util.UUID.randomUUID().toString)
}
// 正式实例化SaprkContext,重要组件,之后会详解。注意:实例化SparkContext后就不可以再改变SparkConf的内容了
SparkContext.getOrCreate(sparkConf)
}
// 实例化 SparkSession并记录进 DefaultSession和ActiveSession,以供后边程序重复使用
session = new SparkSession(sparkContext, None, None, extensions)
options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
setDefaultSession(session)
setActiveSession(session)
// SparkListener 用于程序执行结束的善后处理
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
defaultSession.set(null)
}
})
}
return session
}
在SparkSession中也封装了很多方法,其中以createDataFrame最为常用,不过我们不在此处展开,我们仍是返回到JavaWordCount中继续下去,首先是spark.read().textFile(args[0]).javaRDD();
def read: DataFrameReader = new DataFrameReader(self)
def textFile(paths: String*): Dataset[String] = {
assertNoSpecifiedSchema("textFile")
text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
}
/* Loads text files and returns a `DataFrame`
whose schema starts with a string column named "value",
and followed by partitioned columns if there are any.*/
def text(paths: String*): DataFrame = format("text").load(paths : _*)
// load方法内部较为复杂,最终是通过调用Dataset.ofRows()来读取文件内容
最终将文件的每一行作为Dataset的一个row返回。然后就是flatMap、mapToPair、reduceByKey三个transformation算子
,最终通过collect这个action算子
来触发后续操作。
小结
到目前为止,一个应用的启动过程仅仅有了个简单的轮廓,而关于SparkContext
的实例化以及collect算子
触发的内容,才是Spark的重中之重,下面我们会通过两篇文章分别对它们进行详细的说明。
参考: