当spark开启反压之后,将会在PIDRateEstimator中根据上一批任务的处理延时情况计算下一批接收数据的大小。
def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Double]
计算需要四个参数,该批数据处理结束时间time,该批处理元素个数elements,以及处理时间和调度延迟时间。
val delaySinceUpdate = (time - latestTime).toDouble / 1000
首先计算本次结束时间相比上一批数据处理结束相差的时间delaySinceUpdate,单位秒。
val processingRate = numElements.toDouble / processingDelay * 1000
之后用总处理数量除以总处理时间得到该批的单秒处理速率processingRate,单位数据量/秒。
val error = latestRate - processingRate
error则为上次处理速率与本次速率之差,可以理解为上次预估处理性能与本次实际处理性能的误差,单位数据量/秒,默认权重1。
val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
histoticalError则为调度延迟乘本次处理速率的乘积(由于调度延迟导致缺少处理的数据量)除以任务批次时间,也就是单批数据由于调度延迟导致的单秒缺少的数据处理量,单位数据量/秒,默认权重0.2。
val dError = (error - latestError) / delaySinceUpdate
dError则为当前误差与上一次误差的差除以前后两次完成时间间隔,单位数据量/秒的平方,默认权重0。
val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(minRate)
最后得到的新速率则为上次速率根据权重乘上述误差类型之后的结果,存在一个默认不会低于的的最小速率。