spark源码分析-ContextCleaner缓存清理

ContextCleaner是用于清理spark执行过程中内存,主要用于清理任务执行过程中生成的缓存RDD、Broadcast、Accumulator、Shuffle数据,防止造成内存压力。
ContextCleaner会在SparkContext创建过程中,被实例化:

_cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
_cleaner.foreach(_.start())

调用ContextCleaner的start()方法启动清理器,该类启动两个线程一个用于将清理不被使用的对象,另一个线程定时调用System.gc()方法,来清理JVM内存中不被使用的对象,该方法不会直接触发full gc,它只会建议JVM进行full gc,具体full gc何时进行由JVM自行决定。

/** Start the cleaner. */
  def start(): Unit = {
    cleaningThread.setDaemon(true)
    cleaningThread.setName("Spark Context Cleaner")
    cleaningThread.start()
    periodicGCService.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = System.gc()
    }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
  }

看一下清理线程方法keepCleaning()方法,该方法会持续从referenceQueue获取并删除不被引用的对象,然后调用指定方法清理指定类型数据。

  private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

/** Keep cleaning RDD, shuffle, and broadcast state. */
  private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
    while (!stopped) {
      try {
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])
        // Synchronize here to avoid being interrupted on stop()
        synchronized {
          reference.foreach { ref =>
            logDebug("Got cleaning task " + ref.task)
            referenceBuffer.remove(ref)
            ref.task match {
              case CleanRDD(rddId) =>
                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
              case CleanShuffle(shuffleId) =>
                doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
              case CleanBroadcast(broadcastId) =>
                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
              case CleanAccum(accId) =>
                doCleanupAccum(accId, blocking = blockOnCleanupTasks)
              case CleanCheckpoint(rddId) =>
                doCleanCheckpoint(rddId)
            }
          }
        }
      } catch {
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)
      }
    }
  }

ContextCleaner如何知道哪些数据对象是不再被使用的呢?这个就要介绍一下java的四种引用类型了:
1. 强引用:被强引用关联的对象不会被垃圾收集器回收,使用 new 一个新对象的方式来创建强引用

Object obj = new Object();
  1. 软引用:被软引用关联的对象,只有在内存不够的情况下才会被回收。使用SoftReference类来创建软引用。、
Object obj = new Object();
SoftReference<Object> sf = new SoftReference<Object>(obj);
  1. 弱引用:被弱引用关联的对象一定会被垃圾收集器回收,也就是说它只能存活到下一次垃圾收集发生之前。使用WeakReference类来实现弱引用。
Object obj = new Object();
WeakReference<Object> wf = new WeakReference<Object>(obj);
  1. 虚引用:又称为幽灵引用或者幻影引用。一个对象是否有虚引用的存在,完全不会对其生存时间构成影响,也无法通过虚引用取得一个对象实例。为一个对象设置虚引用关联的唯一目的就是能在这个对象被收集器回收时收到一个系统通知。使用PhantomReference类来实现虚引用。
Object obj = new Object();
PhantomReference<Object> pf = new PhantomReference<Object>(obj);

可以看到使用弱引用可以实现ContextCleaner的功能,向ContextCleaner注册需要清理的数据对象时,会将该数据对象使用弱引用关联。如果该数据对象不再被除CleanupTaskWeakReference对象引用外,而且不被引用的对象可以通过referenceQueue获取到。

 /** Register an object for cleanup. */
  private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
    referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
  }

ContextCleaner主要通过使用java的软引用实现,将需要被清理的数据对象相关信息对象用软引用进行关联,当该引用没有被除软引用关联外,ContextCleaner可以获取到该信息对象,然后调用对应数据对象的清理方法将对象从Spark存储中删除,最后调用System.gc()方法建议JVM进行一次full gc清理这些需要被清理的数据对象。

猜你喜欢

转载自blog.csdn.net/Shie_3/article/details/81051133