排列三源码出售与spark内存

一.背景
在实际生产工作中,一般都会采用spark on yarn的方式运行管理spark任务。排列三源码出售[企俄:2152876294] 网址diguaym.com这时候难免就会遇到提交任务时该如何去写配置参数,比如公司给你分配了一个yarn的队列10core,200G内存,你该如何在这个整体资源限制下提交任务?为了回答这个问题,本文阐述了可spark的内存管理机制,供大家参阅。

二.spark静态内存分配机制
spark内存管理机制基于MemoryManager类,首先剖析一下Memorymanager类的源码:

@GuardedBy("this")
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)br/>@GuardedBy("this")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
br/>@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
br/>@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
Executor分配的内存整体分配成两部分,分别指向storage(用于cache数据)和execution(用于计算数据),通过构建MemoryPool(内存池)来分配内存。内存池在构建的过程中可以选择两种模式:ON_HEAP(堆内)和OFF_HEAP,默认为堆内分配,即所有对象的创建都在JVM内实现,但是在堆内创建过多的对象实际上会带来GC的压力,GC响应时间过长也会导致OOM的情况出现。同时数据写入磁盘的时候实际上也会通过堆外内存过渡,所以这部分操作也会有时间的损耗。如果通过OFF_HEAP的方式实现,则这部分内存直接由内核管理,减轻了JVM本身的压力。Spark从1.6开始引入了钨丝计划(Tungsten),如果是On-heap模式下,会去寻找对象,然后对象中通过offset来具体定位地址,而如果是Off-heap的话,会直接定位对象数据。进一步增强了OFF_HEAP的速度优势。

MemoryManager是一个顶级抽象类,spark1.6前内存管理模型的实现是基于StaticMemoryManager类实现,同样我们先看一下部分源码:

private def getMaxStorageMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(systemMaxMemory memoryFraction safetyFraction).toLong
}

/**

  • Return the total amount of memory available for the execution region, in bytes.
    */
    private def getMaxExecutionMemory(conf: SparkConf): Long = {
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

    if (systemMaxMemory < MIN_MEMORY_BYTES) {
    throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
    s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
    s"option or spark.driver.memory in Spark configuration.")
    }
    if (conf.contains("spark.executor.memory")) {
    val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
    if (executorMemory < MIN_MEMORY_BYTES) {
    throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
    s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
    s"--executor-memory option or spark.executor.memory in Spark configuration.")
    }
    }
    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
    (systemMaxMemory memoryFraction safetyFraction).toLong
    }
    如代码所示,其中实现了两个方法getMaxStorageMemory和getMaxExecutionMemory分别对应了前面提到的Storage和Execution部分的内存分配。Runtime.getRuntime.maxMemory得到的是java虚拟机能够在运行时从操作系统获取最大的内存,即我们能获取的总内存值。总内存默认有60%分配给StorageMemory部分,StorageMemory默认保留10%作为安全阈,剩余90%用户缓存。在这90%的内存资源中,默认有20%分配给unRollMemory,主要用于缓存Iterator类型的block数据和序列化数据的展开的时候使用。其余80%则用于具体cache数据。总内存默认有20%分配给了ExecutionMemory部分,ExecutionMemory默认保留20%作为安全阈,剩余80%具体用于计算。

    下面以一张图去具体展示一下实际的内存分配,我们假设executor调用Runtime.getRuntime.maxMemory时获取的总内存为10G:

    如图所示,executor的10G内存分成了三块,分别为6G(StorageMemoryFraction),2G(ExecutionMemoryFraction),和2G预留给JVM创建对象使用。刨去设立的安全阈和unRollMemory,真正分配给缓存的内存为4.32G,真正分配给计算的内存值为1.6G。从这里也可以看出spark1.6版本前采用的静态内存资源管控机制是有很大改善空间的。

三.spark动态内存分配机制(1.6版本后)
经过上面实际分析,可以明显发现spark静态内存的管理分配机制是不灵活的,有很大改善空间。所以spark在1.6版本之后开始引入动态内存管理机制。代码由UnifiedMemoryManager类具体实现,下面来看源码:

override def maxOnHeapStorageMemory: Long = synchronized {
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}

扫描二维码关注公众号,回复: 2777404 查看本文章

override def maxOffHeapStorageMemory: Long = synchronized {
maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}
从代码可以看出无论是ON_HEAP还是OFF_HEAP的方式,maxStorageMemory都会根据总内存减去ExecutionMemoryPool实际占用内存获取。换句话说Storage和Execution的内存分配是动态的,可以在总内存的范围下动态调整。

private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficient
if (conf.contains("spark.executor.memory")) {
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
(usableMemory
memoryFraction).toLong
}
通过getMaxMemory方法获取了memoryFraction,即executor用于Execution和Storage的最大内存值。下面我们来详细剖析一下这个值是怎么算出来的:

systemMemory=Runtime.getRuntime.maxmemory 为JVM能够获取的最大内存值
reservedMemory=RESERVED_SYSTEM_MEMORY_BYTES ,如果命中了“spark.testing”模式为0(对应local模式),否则为固定值300M,这个值可以在源码找到:
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 1024 1024
3.minSystemMemory=reservedMemory*1.5=450M.源码中会判断该值,如果systemMemory小于450M,则直接报错。如果提交spark任务时,executor的内存指定小于450M,也会报错。

 4.usableMemory=systemMemory-reservedMemory(300M) 为刨去预留值的内存总量

 5.函数整体返回值为usableMemory * memoryFraction默认为usableMemory*0.6,这里注意笔者实际测试版本为spark2.2.0,源码这个fraction的值为0.6,但之前的版本这个fraction的默认值可能为0.75.剩余的40%为JVM的创建对象占用。

 这里引用一张网上的图:

下面我们举个栗子:假如executor的内存为10G,则默认留给Execution和Storage的内存总量为(10*1024-300)*0.6=5964M。这5个多G的内存为Excution和Storage动态占用,如果己方空间不足则可占用对方。

def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
conf,
maxHeapMemory = maxMemory,
onHeapStorageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
numCores = numCores)
}
Execution和Storage在初始化的时候各占50%

补充一下Storage和Execution内存动态调整的机制:

1.Cache(Storage)内存不足的时候,若Execution内存剩余,则可以占用。但Execution占用的内存不会释放给Cache。

2.Execution内存不足的时候,若Cache内存剩余,则可以占用,若Cache内存用满,会释放Cache中缓存等级低的部分分区。

了解了spark的内存分配机制后,我们将在下一篇文章中集中探讨如何合理的配置spark任务参数。

猜你喜欢

转载自blog.51cto.com/13924542/2160305