SparkConf的介绍
SparkConf用于管理Spark所有的配置项,不论是使用旧的SparkContext作为访问Spark的入口,还是使用新提供的SparkSession统一访问入口,我们会使用SparkConf设置各类参数,并传递给SparkContext或SparkSession。
/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
*
* Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
* values from any `spark.*` Java system properties set in your application as well. In this case,
* parameters you set directly on the `SparkConf` object take priority over system properties.
*
* For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
* get the same configuration no matter what the system properties are.
*
* All setter methods in this class support chaining. For example, you can write
* `new SparkConf().setMaster("local").setAppName("My app")`.
*
* @param loadDefaults whether to also load values from Java system properties
*
* @note Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
* by the user. Spark does not support modifying the configuration at runtime.
*/
SparkConf类的定义介绍
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
import SparkConf._
/** Create a SparkConf that loads defaults from system properties and the classpath
* 默认无参的构造方法,会传入loadDefaults为true
* */
def this() = this(true)
// 用于存放配置的Map(线程安全的ConcurrentHashMap对象)
private val settings = new ConcurrentHashMap[String, String]()
// 配置读取器,使用懒加载方法初始化
@transient private lazy val reader: ConfigReader = {
// SparkConfigProvider对settings字典进行了包装
val _reader = new ConfigReader(new SparkConfigProvider(settings))
_reader.bindEnv(new ConfigProvider {
override def get(key: String): Option[String] = Option(getenv(key))
})
_reader
}
// 当该参数为true时,会从系统属性中加载Spark的配置
if (loadDefaults) {
loadFromSystemProperties(false)
}
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
// 获取系统跟属性并加载以spark.开头的
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
set(key, value, false)
}
private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
// 校验参数
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
// 如果不是静默添加,设置丢弃的配置会打印提示
if (!silent) {
logDeprecationWarning(key)
}
// 设置到setting中
settings.put(key, value)
this
}
private[spark] def set[T](entry: ConfigEntry[T], value: T): SparkConf = {
set(entry.key, entry.stringConverter(value))
this
}
private[spark] def set[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {
set(entry.key, entry.rawStringConverter(value))
this
}
...//后面的代码省略
}
(1) loadDefaults参数决定是否加载System Properties中的属性。
/** Create a SparkConf that loads defaults from system properties and the classpath
* 默认无参的构造方法,会传入loadDefaults为true
* */
def this() = this(true)
如果loadDefaults参数为true,将会调用loadFromSystemProperties(…)方法加载相应的属性。
// 当该参数为true时,会从系统属性中加载Spark的配置
if (loadDefaults) {
loadFromSystemProperties(false)
}
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
// 获取系统跟属性并加载以spark.开头的
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
(2)SparkConf还在其伴生对象中定义了废弃的配置项以及一些在不同版本中发生变化的配置项
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
val configs = Seq(
DeprecatedConfig("spark.cache.class", "0.8",
"The spark.cache.class property is no longer being used! Specify storage levels using " +
"the RDD.persist() method instead."),
...//配置过多省略了
)
Map(configs.map {
cfg => (cfg.key -> cfg) } : _*)
}
// 在不同版本中发生变化的配置项
// configsWithAlternatives则使用键值对的形式表示新旧配置项的变化,键为字符串,
// 表示配置项新的名称,值为AlternateConfig序列,包含多个AlternateConfig对象,
// AlternateConfig中的两个字符串参数分别表示配置项旧的名称以及出现的版本号
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
"spark.executor.userClassPathFirst" -> Seq(
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
... //配置过多省略了
private val allAlternatives: Map[String, (String, AlternateConfig)] = {
configsWithAlternatives.keys.flatMap {
key =>
configsWithAlternatives(key).map {
cfg => (cfg.key -> (key -> cfg)) }
}.toMap
}
其中,
deprecatedConfigs使用的DeprecatedConfig中的三个字符串参数分别代表废弃的配置项名称、在哪个版本被废弃的以及废弃提示说明;configsWithAlternatives则使用键值对的形式表示新旧配置项的变化,键为字符串,表示配置项新的名称,值为AlternateConfig序列,包含多个AlternateConfig对象,
AlternateConfig中的两个字符串参数分别表示配置项旧的名称以及出现的版本号。
SparkConf配置项的设置
SparkConf配置项的设置有如下三种方法:
- 来源于系统参数
- 使用SparkConf的API进行设置
- 从其他SparkConf中克隆
(1)来源于系统参数
即使用System.getProperties获取的属性中以spark.作为前缀的那部分属性。
(2)使用SparkConf的API进行设置
SparkConf提供了大量以“set”开头的方法方便我们设置配置,如下代码所示:
/**
* The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
*/
def setMaster(master: String): SparkConf = {
set("spark.master", master)
}
/** Set a name for your application. Shown in the Spark web UI. */
def setAppName(name: String): SparkConf = {
set("spark.app.name", name)
}
...//后面代码省略了
(3)从其他SparkConf中克隆
由SparkConf的clone()方法实现的,SparkConf实现了JDK的Cloneable接口,提供了克隆操作。
/** Copy this object */
// clone()方法会克隆一个新的SparkConf,方便开发者根据当前配置克隆一份新配置
override def clone: SparkConf = {
val cloned = new SparkConf(false)
settings.entrySet().asScala.foreach {
e =>
cloned.set(e.getKey(), e.getValue(), true)
}
cloned
}
SparkConf配置项的获取
SparkConf使用“get”开头的方法,获取不同类型的配置项值,其调用了getOption(…)方法,该方法会先按指定键获取,如果没有获取到就按照对应的废弃键尝试获取:
/** Get a parameter; throws a NoSuchElementException if it's not set */
def get(key: String): String = {
getOption(key).getOrElse(throw new NoSuchElementException(key))
}
...//代码省略
/**
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then seconds are assumed.
* @throws java.util.NoSuchElementException If the time parameter is not set
* @throws NumberFormatException If the value cannot be interpreted as seconds
*/
def getTimeAsSeconds(key: String): Long = catchIllegalValue(key) {
Utils.timeStringAsSeconds(get(key))
}
...//代码省略
SparkConf配置项的校验
SparkConf提供了validateSettings()方法校验配置项的合法性并记录或打印提示日志,部分源码如下:
/**
*
* Checks for illegal or deprecated config settings. Throws an exception for the former. Not
* idempotent - may mutate this conf object to convert deprecated settings to supported ones.
*/
// 校验配置项的合法性并记录或打印提示日志
private[spark] def validateSettings() {
if (contains("spark.local.dir")) {
val msg = "Note that spark.local.dir will be overridden by the value set by " +
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS" +
" in YARN)."
logWarning(msg)
}
...//代码省略
// Validate spark.executor.extraJavaOptions
// 检查spark.executor.extraJavaOptions
getOption(executorOptsKey).foreach {
javaOpts =>
if (javaOpts.contains("-Dspark")) {
val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " +
"Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
throw new Exception(msg)
}
if (javaOpts.contains("-Xmx")) {
val msg = s"$executorOptsKey is not allowed to specify max heap memory settings " +
s"(was '$javaOpts'). Use spark.executor.memory instead."
throw new Exception(msg)
}
}
// Validate memory fractions
// 检查内存相关的配置
val deprecatedMemoryKeys = Seq(
"spark.storage.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
...//代码省略
// 检查部署模式相关的参数
if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +
"instead use \"yarn\" with specified deploy mode."
...//代码省略