1. StreamingContext初始化过程
StreamingContext是很多Streaming功能的入口,如:它提供从多种数据源创建DStream的方法等。在StreamingContext创建时将会创建如下主要组件:
1.1 创建DStreamGraph,并为其设置转换成RDD的时间间隔
private[streaming] val graph: DStreamGraph = {
// 如果进行了检查点,这从检查点恢复
if (isCheckpointPresent) {
_cp.graph.setContext(this)
_cp.graph.restoreCheckpointData()
_cp.graph
} else {
require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
// 创建DStreamGraph
val newGraph = new DStreamGraph()
// 设置batchDuration。
newGraph.setBatchDuration(_batchDur)
newGraph
}
}
1.2 创建JobScheduler
/**
* JobGenerator会负责每个batch interval生成一个job,然后通过JobScheduler来调度和提交job。
* 在这底层,其实还是基于Spark的核心计算引擎,底层DAGScheduler、TaskScheduler、Worker、Executor、Task
* 如果我们定义了reduceByKey这种算子,还是会有shuffle过程。而且底层的数据存取组件还是Executor关联的BlockManager。
* 负责持久化数据存储的组件还是CacheManager。
*/
private[streaming] val scheduler = new JobScheduler(this)
2. DStream的创建和转换
在创建和完成StreamCotnext的初始化时,它创建了DStreamGraph、JobScheduler等关键组件,就会调用StreamContext的socketTextStream等方法来创建DStream,然后针对input DStream执行一系列的transformation转换操作。最后会执行一个output输出操作来触发和执行针对一个个batch的job。
在创建DStream对象时,会先初始化其父类InputDStream,在InputDStream中实现将自身加入DStreamGraph中。
ssc.graph.addInputStream(this)
InputDStream之类都有一个getReceiver()方法,此方法用来获取Receiver对象,用于接收数据。
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
需要注意的是,DStream中算子的转换,和RDD中的转换一样都是属于惰性计算。只有遇到output算子(如print操作)时,才会将DStream转换为ForEachDStream,并调用register方法作为OutputStream注册到DStreamGraph的outputStreams列表。
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
其中ForEachDStream不同于其它DStream的地方是其重载了generateJob方法。
/**
* DStream的所有output操作,都会调用ForEachDStream的generatorJob()方法
* 然后底层就会触发job的提交。
*/
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
3. 启动过程
在上述初始化过程完成之后,有一个方法时必须要调用的,这个方法就是StreamingContext的start()方法,用来启动一个Spark Streaming应用程序。在这个方法中会创建StreamingContext相关的另外两个组件:ReceiverTracker和JobGenerator。另外,在Spark集群的某个Worker节点的Executor中启动整个Spark Streaming应用程序的Input DStream对应的Receiver。
在StreamingContext的start()方法中,最最重要的是调用了JobScheduler的start()方法。
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
// 最重要的时调用JobScheduler的start()
scheduler.start()
}
state = StreamingContextState.ACTIVE
scheduler.listenerBus.post(
StreamingListenerStreamingStarted(System.currentTimeMillis()))
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
logDebug("Adding shutdown hook") // force eager creation of logger
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
这里着重讲解一下
JobScheduler的创建和执行:
JobScheduler用来调度运行在Spark上的作业,它使用JobGenerator生成Jobs,然后使用一个线程池并行运行提交作业。
JobScheduler创建
- JobScheduler由StreamingContext创建,并触发start调用。
- JobScheduler初始化时,会创建一个ThreadPool(jobExecutor)和jobGenerator。
其中:
jobExecutor用于提交作业,ThreadPool中线程的数量为Job并发量,由”spark.streaming.concurrentJobs”指定,默认为1。
JobGenerator为JobGenerator类实例。其用于依据DStreams生成jobs。
JobScheduler执行
start方法执行时会创建并启动以下服务:
- eventLoop:EventLoop[JobSchedulerEvent]对象,用以接收和处理事件。调用者通过调用其post方法向事件队列注册事件。EventLoop开始执行时,会开启一deamon线程用于处理队列中的事件。EventLoop是一个抽象类,JobScheduler中初始化EventLoop时实现了其OnReceive方法。该方法中指定接收的事件由processEvent(event)方法处理。
- receiverTracker:ReceiverTracker对象,用以管理ReceiverInputDStream中receiver的执行。这个对象必须在所有InputStream添加至DStreamGraph中后创建。因其实例化时会从DStreamGraph中抽取InputDStream。以便用在其启动时抽取其中的Receiver。
- jobGenertor:其在JobScheduler实例化时创建,在此处进行启动。
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start()
// 创建ReceiverTracker
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
case _ => null
}
executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
executorAllocClient,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
clock)
executorAllocationManager.foreach(ssc.addStreamingListener)
// 启动ReceiverTracker
receiverTracker.start()
// 启动JobGenerator
jobGenerator.start()
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
}
4. Receiver启动原理
在上面我们说到JobScheduler执行start方法的时候,会去创建ReceiverTracker,然后执行ReceiverTracker的start方法。那么接下来我们分析下ReceiverTracker的start()源码。
/** 创建一个ReceiverTrackerEndpoint对象,并且开始加载Receivers*/
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map { nis =>
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
}
runDummySparkJob()
logInfo("Starting " + receivers.length + " receivers")
// 开始启动所有的Receivers
endpoint.send(StartAllReceivers(receivers))
}
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
// 拿到Receiver的最佳位置
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
// 在Executor启动Receiver
startReceiver(receiver, executors)
}
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It's okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId = receiver.streamId
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(ThreadUtils.sameThread)
logInfo(s"Receiver ${receiver.streamId} started")
}
参考资料:
https://blog.csdn.net/Anbang713/article/details/82049637