ContextCleaner是Spark中用来清理无用rdd,broadcast等数据的清理器,其主要用到的是java的weakReference弱引用来达成清理无用数据的目的。
ContextCleaner主要由两个线程两个集合组成。
private val referenceBuffer =
Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
private val referenceQueue = new ReferenceQueue[AnyRef]
在ContextCleaner中,需要将来被清理而注册到ContextCleaner的数据都将会被构造成下文的CleanupTaskWeakReference。
private class CleanupTaskWeakReference(
val task: CleanupTask,
referent: AnyRef,
referenceQueue: ReferenceQueue[AnyRef])
extends WeakReference(referent, referenceQueue)
task代表具体的清理事情,为case类,根据数据类型区分具体的事件类型以便确定具体的清理方法。
referent为具体的被注册到这里的数据,将会直接被作为弱引用WeakReference构造方法的一员参数被使用,而referenceQueue则是上文中提到的ContextCleaner生效的集合之一,用来构造WeakReference,当被注册的数据只剩下当前这唯一一个弱引用,而在别处没有引用之后,将会准备作为gc的一部分被清理回收,并被放入到此referenceQueue中被获取到。
而上文另一个容器referenceBuffer则用阿里存放CleanupTaskWeakReference,可根据具体的数据类型确定具体的清理步骤。
CleanupTaskWeakReference的注册如下:
def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = {
registerForCleanup(rdd, CleanCheckpoint(parentId))
}
/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}
另外两个线程如下:
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
private val periodicGCService: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
def start(): Unit = {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
periodicGCService.scheduleAtFixedRate(new Runnable {
override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}
两个线程都将在ContextCleaner中被开启,其中单个线程池中的线程职责很简单,则是简单的调用System.gc()去开启垃圾回收进行数据清理。
另一个线程会在start()方法中被设置为守护线程,并被启动,其会开始执行keepCleaning()方法。
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
在这里,会不断从上文提到,需要被回收的对象将会在referenceQueue中,从这里取得并从erferenceBuffer中得到对应的case类确定执行清理的具体步骤,并移除。例如如果为rdd,则在这里获得的是CleanRDD,并将调用doCleanRDD()方法根据rddId去回收该rdd。
def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
try {
logDebug("Cleaning RDD " + rddId)
sc.unpersistRDD(rddId, blocking)
listeners.asScala.foreach(_.rddCleaned(rddId))
logInfo("Cleaned RDD " + rddId)
} catch {
case e: Exception => logError("Error cleaning RDD " + rddId, e)
}
}
Rdd的具体回收包含两步,首先从blockManager中移除该数据,之后调用监听器通知rddCleaned()被回收。