upgrade/rollback/importCheckpoint
在FsImage.recoverTransitionRead方法中,针对upgrade/rollback/importCheckpoint参数,在启动前做了特殊的操作,代码如下:
switch(startOpt) {
case UPGRADE:
doUpgrade();
return false; // upgrade saved image already
case IMPORT:
doImportCheckpoint();
return true;
case ROLLBACK:
doRollback();
break;
case REGULAR:
// just load the image
}
- hadoop namenode -upgrade
进行hadoop namenode的升级操作。调用FsImage.doUpgrade方法,主要操作是将current保存为previous,升级后的fsimage保存为current。if(getDistributedUpgradeState()) { // only distributed upgrade need to continue // don't do version upgrade this.loadFSImage(); initializeDistributedUpgrade(); return; } // Upgrade is allowed only if there are // no previous fs states in any of the directories for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); //如果已经有previous目录,则无法重新upgrade if (sd.getPreviousDir().exists()) throw new InconsistentFSStateException(sd.getRoot(), "previous fs state should not exist during upgrade. " + "Finalize or rollback first."); } // 把硬盘上最近的fsimage加载进内存 this.loadFSImage(); // Do upgrade for each directory long oldCTime = this.getCTime(); this.cTime = FSNamesystem.now(); // generate new cTime for the state int oldLV = this.getLayoutVersion(); this.layoutVersion = FSConstants.LAYOUT_VERSION; this.checkpointTime = FSNamesystem.now(); for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); LOG.info("Upgrading image directory " + sd.getRoot() + ".\n old LV = " + oldLV + "; old CTime = " + oldCTime + ".\n new LV = " + this.getLayoutVersion() + "; new CTime = " + this.getCTime()); File curDir = sd.getCurrentDir(); File prevDir = sd.getPreviousDir(); File tmpDir = sd.getPreviousTmp(); assert curDir.exists() : "Current directory must exist."; assert !prevDir.exists() : "prvious directory must not exist."; assert !tmpDir.exists() : "prvious.tmp directory must not exist."; // 把current目录改为previous.tmp目录 rename(curDir, tmpDir); // 把内存中的fsimage保存到current中 saveCurrent(sd); // 把previous.tmp目录修改为previous目录 rename(tmpDir, prevDir); //标记升级没有完成(升级成功需要进行finalize,升级失败需要进行rollback来完成升级) isUpgradeFinalized = false; LOG.info("Upgrade of " + sd.getRoot() + " is complete."); } //初始化升级后操作,主要是计算version写VERSION文件 initializeDistributedUpgrade(); //开启editlog editLog.open();
- hadoop namenode -rollback
升级hadoop失败后回滚,主要是把upgrade时变更的数据目录恢复原状,将current修改为removed.tmp,previous修改回current,最后删除removed.tmp// Rollback is allowed only if there is // a previous fs states in at least one of the storage directories. // Directories that don't have previous state do not rollback boolean canRollback = false; FSImage prevState = new FSImage(); prevState.layoutVersion = FSConstants.LAYOUT_VERSION; for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) { //将previous目录里的版本信息加载进内存,如果没有previous目录,则加载current目录 StorageDirectory sd = it.next(); File prevDir = sd.getPreviousDir(); if (!prevDir.exists()) { // use current directory then LOG.info("Storage directory " + sd.getRoot() + " does not contain previous fs state."); sd.read(); // read and verify consistency with other directories continue; } StorageDirectory sdPrev = prevState.new StorageDirectory(sd.getRoot()); sdPrev.read(sdPrev.getPreviousVersionFile()); // read and verify consistency of the prev dir canRollback = true; } if (!canRollback) throw new IOException("Cannot rollback. " + "None of the storage directories contain previous fs state."); // Now that we know all directories are going to be consistent // Do rollback for each directory containing previous state for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); File prevDir = sd.getPreviousDir(); if (!prevDir.exists()) continue; LOG.info("Rolling back storage directory " + sd.getRoot() + ".\n new LV = " + prevState.getLayoutVersion() + "; new CTime = " + prevState.getCTime()); File tmpDir = sd.getRemovedTmp(); assert !tmpDir.exists() : "removed.tmp directory must not exist."; // rename current to tmp File curDir = sd.getCurrentDir(); assert curDir.exists() : "Current directory must exist."; //将current目录更改为removed.tmp rename(curDir, tmpDir); // rename previous to current //将previous更改为current rename(prevDir, curDir); // delete tmp dir //删除removed.tmp deleteDir(tmpDir); LOG.info("Rollback of " + sd.getRoot()+ " is complete."); } isUpgradeFinalized = true; // check whether name-node can start in regular mode verifyDistributedUpgradeProgress(StartupOption.REGULAR);
- hadoop namenode -importCheckpoint
使用备份的checkpoint启动namenode,之后将加载进来的ckp保存为currentFSImage ckptImage = new FSImage(); FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem(); // replace real image with the checkpoint image FSImage realImage = fsNamesys.getFSImage(); assert realImage == this; fsNamesys.dir.fsImage = ckptImage; // load from the checkpoint dirs try { //读取checkpointDirs目录与checkpointEditsDirs目录,合并后放入内存 //checkpointDirs值为fs.checkpoint.dir配置项,如果没有则为null,secondarynamenode上默认值为/tmp/hadoop/dfs/namesecondary //checkpointEditsDirs值为fs.checkpoint.edits.dir配置项,如果没有则为null,secondarynamenode上默认值为/tmp/hadoop/dfs/namesecondary ckptImage.recoverTransitionRead(checkpointDirs, checkpointEditsDirs, StartupOption.REGULAR); } finally { ckptImage.close(); } // return back the real image realImage.setStorageInfo(ckptImage); fsNamesys.dir.fsImage = realImage; // and save it 保存fsimage saveNamespace(false);
升级完成后,使用hadoop namenode -finalize来进行确认,finalize之后就不能rollback了
回到NameNode.createNameNode方法中,进入FINALIZE分支
case FINALIZE:
aborted = finalize(conf, true);
System.exit(aborted ? 1 : 0);
之后进入FSImage.doFinalize进行完成升级,主要工作就是删除previous目录,标记升级结束
File prevDir = sd.getPreviousDir();
if (!prevDir.exists()) { // already discarded
LOG.info("Directory " + prevDir + " does not exist.");
LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
return;
}
LOG.info("Finalizing upgrade for storage directory "
+ sd.getRoot() + "."
+ (getLayoutVersion()==0 ? "" :
"\n cur LV = " + this.getLayoutVersion()
+ "; cur CTime = " + this.getCTime()));
assert sd.getCurrentDir().exists() : "Current directory must exist.";
final File tmpDir = sd.getFinalizedTmp();
// rename previous to tmp and remove
//将previous修改为finalized.tmp
rename(prevDir, tmpDir);
//删除finalized.tmp
deleteDir(tmpDir);
isUpgradeFinalized = true;
LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");