小白也能看懂的源码分析系列(1)—HADOOP的NameNode启动过程
一、前言
HADOOP作为大数据的基石,甚至是大数据的代名词,各种耳熟能详的框架基于HADOOP生态展开,发展日益迅速,HADOOP生态的完善,离不开HADOOP这个项目的伟大,作为一名大数据方向的工程师或者研究人员,这是必须要熟悉的框架,想要进一步深入的理解它的伟大之处,外面必须要熟悉它的原理,原理从何而来?—源码。我们这一节会以一个简单的方式来分析hadoop的源码,小白不要一看到源码分析专题就头疼避而远之,这里我保证你看得懂!
二、源码正确打开方式
step1:
我们单纯的分析NameNode源码,没必要把hadoop源码包下载下来,如果下载全量源码,对没有fq条件的同学不友好,下载依赖会很麻烦,我们可以新建一个空的maven项目,把下面的依赖配到pom.xml文件里面,再把对应的源码下载下来
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
step2:
我们这次的目的是看NameNode源码,使用idea的类查找快捷键Ctrl+N,再输入NameNode既可查找到
step3:
我们点进去的只是idea反编译后的文件,需要点击右上角的下载源码,这样就能顺利的查看了,我们在查看源码的过程中,只要看到右上角有Download sources的提示,果断点击下载就行了
step4:
任何你在linux中通过jps命令看到的进程,NameNode、DataNode、NodeManger…一切看得到的,都是通过main方法启动,所以,这是进入源码的第一步,找到NameNode里面的main方法,终于可以进入正题了
必须要知道的快捷键:
Ctrl + Alt + ← 表示返回上一步的位置;Ctrl + Alt + → 进入当前的下一步前提是你进入过下一步。不懂的话这两个快捷键实操一下就知道什么意思,分析源码必须知道的这两个快捷键!
三、分析NameNode启动过程
进入main方法后,我们可以看到逻辑很简单,小白也能看的懂,显示解析传入的参数,打印启动的信息,最关键的一步是通过createNameNode(argv, null)
方法新建NameNode
对象
public static void main(String argv[]) throws Exception {
if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
System.exit(0);
}
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
// 创建NameNode对象
NameNode namenode = createNameNode(argv, null);
if (namenode != null) {
namenode.join();
}
} catch (Throwable e) {
LOG.error("Failed to start namenode.", e);
terminate(1, e);
}
}
我们再进入createNameNode(argv, null)
方法,重点在switch里面,switch前面是一堆解析参数的逻辑,switch里面会根据传入的参数执行不同的逻辑,关于启动参数,switch里面第一个选项,我们最熟悉的也就是-format
,也就是我们搭建集群的时候会执行的hadoop namenode –format命令。默认不加参数的,也就是启动NameNode,在最后的default
逻辑里面,会new NameNode()
对象
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
LOG.info("createNameNode " + Arrays.asList(argv));
if (conf == null)
conf = new HdfsConfiguration();
// 解析一般的参数,也就是我们熟悉的key-value的形式,会set到我们常用的Configuration.
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();
// 解析其它特殊的启动参数,在下面的switch中
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage(System.err);
return null;
}
setStartupOption(conf, startOpt);
// 判断 startOpt 启动参数
switch (startOpt) {
// 我们最熟悉的format参数,格式化NameNode,等同于命令行hadoop namenode –format
case FORMAT: {
boolean aborted = format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid javac warning
}
case GENCLUSTERID: {
System.err.println("Generating new cluster id:");
System.out.println(NNStorage.newClusterID());
terminate(0);
return null;
}
case FINALIZE: {
System.err.println("Use of the argument '" + StartupOption.FINALIZE +
"' is no longer supported. To finalize an upgrade, start the NN " +
" and then run `hdfs dfsadmin -finalizeUpgrade'");
terminate(1);
return null; // avoid javac warning
}
case ROLLBACK: {
boolean aborted = doRollback(conf, true);
terminate(aborted ? 1 : 0);
return null; // avoid warning
}
case BOOTSTRAPSTANDBY: {
String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
int rc = BootstrapStandby.run(toolArgs, conf);
terminate(rc);
return null; // avoid warning
}
case INITIALIZESHAREDEDITS: {
boolean aborted = initializeSharedEdits(conf,
startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid warning
}
case BACKUP:
case CHECKPOINT: {
NamenodeRole role = startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
return new BackupNode(conf, role);
}
case RECOVER: {
NameNode.doRecovery(startOpt, conf);
return null;
}
case METADATAVERSION: {
printMetadataVersion(conf);
terminate(0);
return null; // avoid javac warning
}
case UPGRADEONLY: {
DefaultMetricsSystem.initialize("NameNode");
new NameNode(conf);
terminate(0);
return null;
}
default: {
// 默认进入启动NameNode的逻辑
DefaultMetricsSystem.initialize("NameNode");
return new NameNode(conf);
}
}
}
进入new NameNode(conf)
我一路点进去,进入构造方法,重点方法在initialize(conf)
:
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
this.conf = conf;
this.role = role;
setClientNamenodeAddress(conf);
String nsId = getNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
// 从配置里面获取是否开启了HA机制
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
// 获取当前namenode的状态,总共有3中实现,ActiveState、StandbyState、BackupState
state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
initializeGenericKeys(conf, nsId, namenodeId);
// 初始化方法,主要逻辑都在这里面,重点看这里
initialize(conf);
try {
haContext.writeLock();
state.prepareToEnterState(haContext);
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}
} catch (IOException e) {
this.stop();
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stop();
throw e;
}
this.started.set(true);
}
重点看initialize(conf)
方法,进入此方法,重点逻辑方法:
loadNamesystem(conf)
:- 执行
FSNamesystem
的初始化逻辑,加载fsimage文件进内存 - 初始化
FSNamesystem
时,会实例化BlockManager
,BlockManager
保存了Datanode
块的信息
- 执行
startCommonServices(conf)
- 启动rpc server,包括clientRpcServer和serviceRpcServer,NameNode要跟其它服务进行rpc通信远程调用其它服务的方法,所以是需要clientRpcServer的,这点不懂rpc的通信可以仔细了解下rpc的原理
- 启用
blockManager
,主要功能是两个,第一是对block进行管理上报,第二是启动datanodeManager
,负责注册到NameNode,管理了NameNode
与DataNode
的心跳连接线程与上下线
protected void initialize(Configuration conf) throws IOException {
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}
UserGroupInformation.setConfiguration(conf);
// 如果使用kerberos做认证的话,这里要使用配置的用户登录
loginAsNameNodeUser(conf);
// 这里是初始化一些Metrics,做监控分析使用
NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
this.spanReceiverHost =
SpanReceiverHost.get(conf, DFSConfigKeys.DFS_SERVER_HTRACE_PREFIX);
// 这里是很关键的一步,会从本地加载fsimage文件进内存,fsimage是hdfs核心的一部分,存放了hdfs全量的文件信息,我们平常看到的所有hdfs目录信息,都在这个文件里面记录并持久化
loadNamesystem(conf);
// 启动NameNode的rpcserver,用于与其它rpc客户端进行交互做准备,大家都知道hadoop之前是通过rpc进行通信,这里是关键的一步,里面初始化并添加了一堆的Protocol协议
rpcServer = createRpcServer(conf);
if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(rpcServer.getRpcAddress());
LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
+ " this namenode/service.");
}
if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
// 这里是很关键的第二步,会启动rpc server,启动blockManager
startCommonServices(conf);
}
到这里,我们就不再继续深入,里面的逻辑相对复杂,可能需要花长篇大论来描述,读者也可能不会有足够的耐心读下去,知道NameNode启动的流程中大致做了哪些重要的事就行了,重点描述都在上面,有兴趣的话可以继续追踪源码,阅读过程中没必要每一行都懂,找到重点方法步入即可。想进一步深入了解,建议看完loadNamesystem(conf)
和startCommonServices(conf)
,会对NameNode
启动过程会有更深入的了解。
四、总结
本文写的有点浅显,主要目的是开个头,带读者入个门,让读者知道,哦,原来Hadoop的源码也就那么回事,都看的懂。本教程是面向对java基础、大数据基础相对不那么深入的童鞋,小白也能看懂的源码分析系列我打算一直写下去,保证人人都能看懂的情况下讲解源码分析过程。想要了解技术的本质,光会用是不行的,必须深入源码分析框架的基础原理,再高的房子都是由砖块堆积的,房子的形状千变万化,万变不离其宗的是盖楼的转头!