0x0 背景
最近为了将hadoop&hive的五大配置文件,即:
core-site.xml
hdfs-site.xml
yarn-site.xml
mapred-site.xml
hive-site.xml
从项目中(classpath)移到项目外(任意位置),研究了spark启动过程的源码,在此记录一下。
0x1 Hadoop及Hive获取默认配置过程
Hadoop有一个类
Configuration implementsIterable<Map.Entry<String,String>>,Writable
这个类就是用于处理hadoop的配置,其内部有静态代码块:
static{
//print deprecation warning if hadoop-site.xml is found in classpath
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
if(cL.getResource("hadoop-site.xml")!=null) {
LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
"Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
+ "mapred-site.xml and hdfs-site.xml to override properties of " +
"core-default.xml, mapred-default.xml and hdfs-default.xml " +
"respectively");
}
addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");
}
可见,当Configuration加载后,就会从classpath读取
hadoop-site.xml
core-default.xml
core-site.xml
这三个配置文件。
同时,Configuration类有四个子类:
分别是:
HdfsConfiguration
HiveConf
JobConf
YarnConfiguration
进入这四个类内部同样可以见到类似的静态代码,
HdfsConfiguration中:
static {
addDeprecatedKeys();
// adds the default resources
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
}
YarnConfiguration中:
static {
addDeprecatedKeys();
Configuration.addDefaultResource("yarn-default.xml");
Configuration.addDefaultResource("yarn-site.xml");
...
}
JobConf中:
public static void loadResources() {
addDeprecatedKeys();
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
Configuration.addDefaultResource("yarn-default.xml");
Configuration.addDefaultResource("yarn-site.xml");
}
但是HiveConf并未在静态代码块中读取配置文件,然而在CarbonData的启动过程中,会读取hive-site.xml:
val hadoopConf = new Configuration()
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
hadoopConf.addResource(configFile)
}
可见,Hadoop在启动过程中,各组件会首先在classpath下读取相应的配置文件。
我们也可以通过Configuration的set(String name, String value)
或者addResource(Path file)
方法来添加配置,addResource内部执行流程如下:
//将资源添加到resources列表(存储配置文件资源的列表)
resources.add(resource); // add to resources
//将已有的属性清空
properties = null; // trigger reload
finalParameters.clear(); // clear site-limits
//重新加载所有配置
loadResources(Properties properties,
ArrayList<Resource> resources,
boolean quiet)
0x2 Spark启动过程中设置Hadoop配置
Spark Application启动过程中首先要实启动一个SparkContext,其实SparkContext本质上可以理解为Spark运行的配置集合。
val sc = SparkContext.getOrCreate(sparkConf)
而在SparkContext创建过程中会启动一个调度任务,用于连接远程集群:
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
如果是Spark on Yarn,会调用YarnClusterManager的createSchedulerBackend方法:
override def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
sc.deployMode match {
case "cluster" =>
new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case "client" =>
new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case _ =>
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}
然后在YarnClientSchedulerBackend中创建了YarnClient,可见看Client中的构造函数:
private[spark] class Client(
val args: ClientArguments,
val hadoopConf: Configuration,
val sparkConf: SparkConf)
extends Logging {
import Client._
import YarnSparkHadoopUtil._
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
private val yarnClient = YarnClient.createYarnClient
private val yarnConf = new YarnConfiguration(hadoopConf)
可见,Spark将利用SparkConf中的配置,调用SparkHadoopUtil.get.newConfiguration(spConf)方法生成相应的Hadoop配置。
其实,在SparkContext中,有2个成员变量(本质上是一个):
private var _hadoopConfiguration: Configuration = _
def hadoopConfiguration: Configuration = _hadoopConfiguration
....
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
这个_hadoopConfiguration 也是通过SparkHadoopUtil.get.newConfiguration(_conf)方法获取到hadoop的配置。
进入SparkHadoopUtil.get.newConfiguration(_conf)方法,可见到:
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
也就是说,在SparkConf中所有以spark.hadoop.
开头的属性,都会被转换为hadoop的配置。
那么我们通过解析hadoop的xml配置文件,转换为相应的键值对,传给spark就可以了。代码如下:
/**
* 读取hadoopConfPath下所有hadoop相关配置文件,并转换为SparkConf
*
* @param hadoopConfPath hadoop配置文件所在的文件夹
* @return
*/
public SparkConf getHadoopConf(String hadoopConfPath) {
SparkConf hadoopConf = new SparkConf();
try {
Map<String, String> hiveConfMap = parseXMLToMap(hadoopConfPath + "/hive-site.xml");
Map<String, String> hadoopConfMap = parseXMLToMap(hadoopConfPath + "/core-site.xml");
hadoopConfMap.putAll(parseXMLToMap(hadoopConfPath + "/hdfs-site.xml"));
hadoopConfMap.putAll(parseXMLToMap(hadoopConfPath + "/yarn-site.xml"));
hadoopConfMap.putAll(parseXMLToMap(hadoopConfPath + "/mapred-site.xml"));
for (Map.Entry<String, String> entry : hiveConfMap.entrySet()) {
hadoopConf.set(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, String> entry : hadoopConfMap.entrySet()) {
hadoopConf.set("spark.hadoop." + entry.getKey(), entry.getValue());
}
return hadoopConf;
} catch (DocumentException e) {
logger.error("读取xml文件失败!");
throw new RuntimeException(e);
}
}
//将xml解析为HashMap
private Map<String, String> parseXMLToMap(String xmlFilePath) throws DocumentException {
Map<String, String> confMap = new HashMap<>();
SAXReader reader = new SAXReader();
Document document = reader.read(new File(xmlFilePath));
Element configuration = document.getRootElement();
Iterator iterator = configuration.elementIterator();
while (iterator.hasNext()) {
Element property = (Element) iterator.next();
String name = property.element("name").getText();
String value = property.element("value").getText();
confMap.put(name, value);
}
return confMap;
}
注意:
经测试,如果集群有kerberos加密,该方法无效!
原因可能是:
class SparkHadoopUtil extends Logging {
private val sparkConf = new SparkConf(false).loadFromSystemProperties(true)
val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)
在该类中设置了一个new的SparkConf,这个SparkConf只从System.getProperty读取spark开头的属性,因此不是正确的属性,导致kerberos登录异常。