- Spark商业环境实战-Spark内置框架rpc通讯机制及RpcEnv基础设施
- Spark商业环境实战-Spark事件监听总线流程分析
- Spark商业环境实战-Spark存储体系底层架构剖析
- Spark商业环境实战-Spark底层多个MessageLoop循环线程执行流程分析
- Spark商业环境实战-Spark二级调度系统Stage划分算法和最佳任务调度细节剖析
- Spark商业环境实战-Spark任务延迟调度及调度池Pool架构剖析
- Spark商业环境实战-Task粒度的缓存聚合排序结构AppendOnlyMap详细剖析
- Spark商业环境实战-ExternalSorter 外部排序器在Spark Shuffle过程中设计思路剖析
- Spark商业环境实战-ShuffleExternalSorter外部排序器在Spark Shuffle过程中的设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器SortShuffleWriter设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器UnsafeShuffleWriter设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器BypassMergeSortShuffleWriter设计思路剖析
- Spark商业环境实战-Spark Shuffle 核心组件BlockStoreShuffleReader内核原理深入剖析
- Spark商业环境实战-Spark Shuffle 管理器SortShuffleManager内核原理深入剖析
- Spark商业环境实战-StreamingContext启动流程及Dtream 模板源码剖析
- Spark商业环境实战-ReceiverTracker 启动过程及接收器 receiver RDD 任务提交机制源码剖析
- Spark商业环境实战-SparkStreaming数据流从currentBuffer到Block定时转化过程源码深度剖析
- Spark商业环境实战-SparkStreaming之JobGenerator周期性任务数据处理逻辑源码深度剖析
- [Spark商业环境实战-SparkStreaming Graph 处理链迭代过程源码深度剖析]
1 JobGenerator的前世
1.1 JobGenerator的难兄难弟ReceiverTracker
1.2 ReceiverTracker 的难兄难弟JobGenerator
JobGenerator周期性的不断产生Job,最终Job会在Executor上执行处理。
1.3 ReceiverTracker与receivedBlockTracker 的相爱相杀
-
我们可以看到receivedBlockTracker包含在ReceiverTracker,最重要的是receivedBlockTracker内部维护了一个 streamIdToUnallocatedBlockQueues,用于追踪Executor上报上来的Block。
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging { private val receiverInputStreams = ssc.graph.getReceiverInputStreams() private val receiverInputStreamIds = receiverInputStreams.map { _.id } private val receivedBlockTracker = new ReceivedBlockTracker( ssc.sparkContext.conf, ssc.sparkContext.hadoopConfiguration, receiverInputStreamIds, ssc.scheduler.clock, ssc.isCheckpointPresent, Option(ssc.checkpointDir) )
-
receivedBlockTracker内部重要的元数据存储结构:
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
1.4 StreamingContext如何双剑合璧
JobScheduler里面包含核心的重量级成员,分别是:jobGenerator 和 receiverTracker。其中初始化如下:
注意:jobGenerator中构造函数是JobScheduler
private val jobGenerator = new JobGenerator(this)
receiverTracker = new ReceiverTracker(ssc)
2 JobGenerator的今生
-
JobGenerator 中重要成员RecurringTimer,负责用户定义时间窗的触发
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
-
JobGenerator 的启动,通过StreamingContext来触发,最终调用startFirstTime
def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
-
JobGenerator 最终启动ssc.graph和timer,因此整个处理逻辑开始启动了。
private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
2.1 JobGenerator的4步核心处理逻辑
2.2 第一步:allocateBlocksToBatch
-
JobGenerator持有jobScheduler的引用,jobScheduler持有receiverTracker的引用
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
-
receiverTracker持有receivedBlockTracker的引用
-
从streamIdToUnallocatedBlockQueues取出streamId对应的所有间隔为200ms(default)采集的block,并把它放到timeToAllocatedBlocks中。
* Allocate all unallocated blocks to the given batch. * This event will get written to the write ahead log (if enabled). def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { (首先按照用户设置的时间窗,从streamIdToUnallocatedBlockQueues取出所有的Block) val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) }.toMap <=点睛之笔 (然后把未分配用户指定时间窗的block放进timeToAllocatedBlocks) val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) <=点睛之笔 if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } else { // This situation occurs when: // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, // possibly processed batch job or half-processed batch job need to be processed again, // so the batchTime will be equal to lastAllocatedBatchTime. // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } }
-
timeToAllocatedBlocks 是 receiverTracker(成员receivedBlockTracker)中包含的核心成员,反向迭代到调用链最顶端,根据timeToAllocatedBlocks来生成generatedRDDs
-
streamIdToUnallocatedBlockQueues :没有被分配批次的Block集合
-
timeToAllocatedBlocks :已经被分配批次的block集合
-
下面是DStream的模板代码,就是为了生成RDD来使用的,getOrCompute方法只有DStream有,所以上一级生成RDD后,就会放入generatedRDDs中。
-
generatedRDDs 中没有,就会调用compute,而Compute又会调用getOrCompute。getOrCompute又会调用Compute,反反复复进行一直回溯到InputDStream的Compute
* Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) <=点睛之笔 } rddOption } else { None } } }
-
MapPartitionedDStream的compute方法
override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) }
-
eceiverInputDstream中的compute方法
* Generates RDDs with blocks received by the receiver of this stream. override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch (主要从timeToAllocatedBlocks中取出数据) val receiverTracker = ssc.scheduler.receiverTracker <=点睛之笔 val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) <=点睛之笔 // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) (主要从timeToAllocatedBlocks中取出数据,构建RDD,方便后续调用链使用generatedRDDs) // Create the BlockRDD createBlockRDD(validTime, blockInfos) <=点睛之笔 } } Some(blockRDD) }
可见 allocateBlocksToBatch的作用就是为了把对应窗的Block放进timeToAllocatedBlocks。方便调用链使用。
2.3 第二步:graph.generateJobs
-
DStreamGraph的核心作用是注册了outputStreams,那么是什么时候注册的呢?
-
Action函数 print -> foreachRDD -> ForEachDStream -> register -> ssc.graph.addOutputStream(this)
-
DStreamGraph.generateJobs最终调用了 outputStream.generateJob(time)
private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
-
outputStream.generateJob定义了jobFunc,生成new Job(time, jobFunc)
private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) case None => None } }
2.4 第三步: jobScheduler.inputInfoTracker.getInfo(time)
-
就是为了对应批次Block的元数据信息
// Map to track all the InputInfo related to specific batch time and input stream.
private val batchTimeToInputInfos =
new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]case class StreamInputInfo( inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty)
2.5 第四步: jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
-
JobGenerator 持有JobScheduler的引用,最终会提交Job的并开始驱动Executor计算。
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
3 总结
本文是作者花大量时间整理而成,请勿做伸手党,禁止转载,欢迎学习,有问题请留言。
秦凯新 于深圳 2018