在Spark中,SparkContext是Spark所有功能的一个入口,你无论是用java、scala,甚至是python编写都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括 调度器(DAGSchedule、TaskScheduler),还会去到Spark Master节点上进行注册,等等
- 首先通过SparkContext方法,进行TaskScheduler的初始化机制,通过createTaskScheduler方法,createTaskScheduler()里面会创建三个东西,首先是TaskSchedulerImpl(它其实就是TaskScheduler),然后创建SparkDeploySchedulerBackend(它在底层会受TaskSchedulerImp的控制,实际上负责与Master的注册,Executor的反注册,Task发送到Executor等操作),然后调用TaskSchedulerImpl的init()方法,创建SchedulerPool调度池 ,它有不同的优先策略,比如收FIFO先进先出。
- 在创建完TaskSchedulerImpl和SparkDeploySchedulerBackend之后,是执行TaskSchedulerImpl的start()方法,这个方法内部实际上会调用SparkDeploySchedulerBackend的start()方法,在这个start()方法里会创建AppClient,AppClient里会启动一个线程,也就是ClientActor,ClientActor会调用两个方法,registerWithMaster(),会去调用tryRegisterAllMaster()。这两个方法会向master发送一个东西叫做RegisterApplication(case class,里面封装了application的信息),就会发送到spark集群的Master上面去,后面回去找worker,然后启动executor,然后executor启动后会反向注册到SparkDeploySchedulerBackend上面去。这就是TaskScheduler的初始化
- DAGScheduler实际是使用DAGSchedulerEventProcessActor这个组件进行通信(线程)
- SparkUI。4040端口,线上application运行的状态,启动一个jetty服务器,来提供web服务,从而显示网页。
源码分析
第一步:进入SparkContext类
// Create and start the scheduler
// 主构造方法,SparkContext调用时默认就调用此方法
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
// 启动taskScheduler,也就是对应的TaskSchedulerImpl的start方法
_taskScheduler.start()
第二步:点击createTaskScheduler
/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
* 匹配不同的提交启动模式,本地启动模式、Standalone、集群spark-on-yarn
*/
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
......
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """spark://(.*)""".r
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
master match {
......
// 这是spark提交方式中的standalone方式
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
......
}
}
}
第三步:点击TaskSchedulerImpl
/**
*
* 1、底层通过操作一个SchedulerBackend,针对不同的种类的cluster(standalone,yarn,mesos),进行调度task
* 2、它也可以使用一个LocalBackend,并将isLocal设置为true,来进行本地模式下进行工作
* 3、它负责处理一些通用的逻辑,比如说多个job的调度顺序(例如FIFO),启动推测任务执行(比如某些任务执行的速度慢,剔除掉,从其他地方执行)
* 4、客户端首先会调用他的initialize()和start()方法,然后通过runTasks()方法来提交task
*/
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))
......
/**
* 创建调度池,可以是FIFO,也可以是Fair,FIFO是先进先出的调度算法,Fair是公平调度算法
* FIFO:根据任务的个数和分配资源的比值,选择较小的一个
* FAIR:队列中的Job平分资源
*/
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0) // 调度池
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
......
override def start() {
// 最核心的在于把SparkDeploySchedulerBacked的方法调用
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
第四步:点击第二步中的SparkDeploySchedulerBackend
override def start() {
super.start()
// 基本参数的准备 jvm之类的
// The endpoint for executors to talk to us
val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
......
//描述了当前应用程序的配置信息,包括此application需要多大的cpu core,每个slave上需要多少内存
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
//创建AppClient
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
//启动client
client.start()
waitForRegistration()
}
第五步:点击第四步中的AppClient
/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
* an app description, and a listener for cluster events, and calls back the listener when various
* events occur.
*
* 1、这是一个接口
* 2、它负责为application和spark集群进行通信
* 3、它负责接收一个spark master的url,以及applicationDescription,和一个集群事件的监听器,以及各种事件发生时监听器的回调函数
*
* @param masterUrls Each url should look like spark://host:port.
*/
private[spark] class AppClient(
rpcEnv: RpcEnv,
masterUrls: Array[String],
appDescription: ApplicationDescription,
listener: AppClientListener,
conf: SparkConf)
extends Logging {
......
//是AppClient的内部类
private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint
with Logging {
......
override def onStart(): Unit = {
try {
//注册master
registerWithMaster(1)
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
stop()
}
}
......
private def registerWithMaster(nthRetry: Int) {
// 核心在于tryRegisterAllMasters
registerMasterFutures = tryRegisterAllMasters()
registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
Utils.tryOrExit {
if (registered) {
registerMasterFutures.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}
}, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
}
......
/**
* Register with all masters asynchronously and returns an array `Future`s for cancellation.
* master支持两种主备切换机制,一种的hdfs的,另外一种是基于zookeeper的(动态ha,热切换的)
*/
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
//发送RegisterApplication这个case cass,把appDescription发送给master,进行向master注册
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
第六步:点击第一步中的DAGScheduler
/**
* 实现了面向stage的调度机制的高层次的调度层 ,它会为每个job计算一个stage的DAG(有向无环图),
* 追踪RDD和Stage的输出是否被物化了,也就是说写入磁盘或者是内存中,并且寻找一个最小消耗、最优调度机制
* 运行job,它会将stage作为taskset提交到底层的TaskSchedulerImpl中,并在集群中运行他们
*
* 除了处理stage的DAG,它还负责决定运行每一个task的最佳位置,基于当前的缓存状态,并将这些最佳的位置提交
* 给底层的TaskSchedulerImpl,此外,他还会处理由于shuffle输出文件丢失导致的失败,在这种情况下,旧的stage
* 可能就会被重新提交。一个stage内部的失败,如果不是由于shuffle文件丢失所导致的,会被TaskScheduler处理,
* 它会多次重试每一个task,直到最后,实在不行了,才会去取消整个stage,也就是说app挂掉
*/
private[spark]
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
......
第七步:点击第一步中的_ ui
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
private[spark] object SparkUI {
val DEFAULT_PORT = 4040
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
val DEFAULT_POOL_NAME = "default"
val DEFAULT_RETAINED_STAGES = 1000
val DEFAULT_RETAINED_JOBS = 1000
def getUIPort(conf: SparkConf): Int = {
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
}
def createLiveUI(
sc: SparkContext,
conf: SparkConf,
listenerBus: SparkListenerBus,
jobProgressListener: JobProgressListener,
securityManager: SecurityManager,
appName: String,
startTime: Long): SparkUI = {
create(Some(sc), conf, listenerBus, securityManager, appName,
jobProgressListener = Some(jobProgressListener), startTime = startTime)
}