Namenode -bootstrapStandby
流程
- 根据配置项获取nameserviceId、namenodeId
- 获取其他的 namenode 信息,建立rpc通信。
- 判断配置项
dfs.namenode.support.allow.format
是否允许格式化,一般生产环境建议配置,防止误操作格式化了已有数据。 - 获取格式化的目录(fsImage和edits的存储目录,还有sharedEditsDirs配置)。
- format目录,创建current目录,写VERSION文件和seen_txid文件
- 从qjm中检验上一次checkpoint到最近的curtxid中间的editlog文件是否存在。
- 从远端namenode下载最近一次checkpoint产生的fsImage文件
- 整个过程格式化完毕。
同步元数据命令
hdfs namenode [-bootstrapStandby [-force] [-nonInteractive] [-skipSharedEditsCheck] ]
# 常用的命令
hdfs namenode -bootstrapStandby
源码解读
配置解析
入口org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby.run
方法
此步骤做了如下操作:
- 获取集群配置信息
- 找到远端Namenode,获取第一个
- 校验是否可以格式化
- 调用具体同步的流程
public int run(String[] args) throws Exception {
// 解析命令行参数
parseArgs(args);
// Disable using the RPC tailing mechanism for bootstrapping the standby
// since it is less efficient in this case; see HDFS-14806
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
// 解析配置,获取集群信息,找到remoteNN
parseConfAndFindOtherNN();
NameNode.checkAllowFormat(conf);
InetSocketAddress myAddr = DFSUtilClient.getNNAddress(conf);
SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, myAddr.getHostName());
return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
try {
// 执行 同步元数据
return doRun();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
同步元数据
执行 doRun 的时候,里面集成了整个流程,主要做了如下事项:
- 创建remoteNN的代理对象
- format目录文件,创建VERSION/seen_txid文件
- 准备下载fsImage
private int doRun() throws IOException {
// find the active NN
NamenodeProtocol proxy = null;
NamespaceInfo nsInfo = null;
boolean isUpgradeFinalized = false;
RemoteNameNodeInfo proxyInfo = null;
// 整个一大段就是在创建nn的代理对象。通过循环,找到第一个符合要求的。
for (int i = 0; i < remoteNNs.size(); i++) {
proxyInfo = remoteNNs.get(i);
InetSocketAddress otherIpcAddress = proxyInfo.getIpcAddress();
proxy = createNNProtocolProxy(otherIpcAddress);
try {
// Get the namespace from any active NN. If you just formatted the primary NN and are
// bootstrapping the other NNs from that layout, it will only contact the single NN.
// However, if there cluster is already running and you are adding a NN later (e.g.
// replacing a failed NN), then this will bootstrap from any node in the cluster.
nsInfo = proxy.versionRequest();
isUpgradeFinalized = proxy.isUpgradeFinalized();
break;
} catch (IOException ioe) {
LOG.warn("Unable to fetch namespace information from remote NN at " + otherIpcAddress
+ ": " + ioe.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Full exception trace", ioe);
}
}
}
if (nsInfo == null) {
LOG.error(
"Unable to fetch namespace information from any remote NN. Possible NameNodes: "
+ remoteNNs);
return ERR_CODE_FAILED_CONNECT;
}
// 判断layout,目前是-66
if (!checkLayoutVersion(nsInfo)) {
LOG.error("Layout version on remote node (" + nsInfo.getLayoutVersion()
+ ") does not match " + "this node's layout version ("
+ HdfsServerConstants.NAMENODE_LAYOUT_VERSION + ")");
return ERR_CODE_INVALID_VERSION;
}
// 打印集群信息
System.out.println(
"=====================================================\n" +
"About to bootstrap Standby ID " + nnId + " from:\n" +
" Nameservice ID: " + nsId + "\n" +
" Other Namenode ID: " + proxyInfo.getNameNodeID() + "\n" +
" Other NN's HTTP address: " + proxyInfo.getHttpAddress() + "\n" +
" Other NN's IPC address: " + proxyInfo.getIpcAddress() + "\n" +
" Namespace ID: " + nsInfo.getNamespaceID() + "\n" +
" Block pool ID: " + nsInfo.getBlockPoolID() + "\n" +
" Cluster ID: " + nsInfo.getClusterID() + "\n" +
" Layout version: " + nsInfo.getLayoutVersion() + "\n" +
" isUpgradeFinalized: " + isUpgradeFinalized + "\n" +
"=====================================================");
// 创建待格式化的存储对象
NNStorage storage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
if (!isUpgradeFinalized) {
//...省略升级相关部分代码
} else if (!format(storage, nsInfo)) {
// prompt the user to format storage 此步骤就是创建 VERSION/seen_txid文件
return ERR_CODE_ALREADY_FORMATTED;
}
// download the fsimage from active namenode
// 从remoteNN通过http下载fsImage文件了。
int download = downloadImage(storage, proxy, proxyInfo);
if (download != 0) {
return download;
}
//...省略部分代码
}
下载fsImage文件
private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo)
throws IOException {
// Load the newly formatted image, using all of the directories
// (including shared edits)
// 获取最近的checkpointTxid
final long imageTxId = proxy.getMostRecentCheckpointTxId();
// 获取当前事务id
final long curTxId = proxy.getTransactionID();
FSImage image = new FSImage(conf);
try {
// 赋值集群信息给image
image.getStorage().setStorageInfo(storage);
// 创建journalSet对象,置状态为OPEN_FOR_READING
image.initEditLog(StartupOption.REGULAR);
assert image.getEditLog().isOpenForRead() :
"Expected edit log to be open for read";
// Ensure that we have enough edits already in the shared directory to
// start up from the last checkpoint on the active.
// 从共享的qjm中获取curTxId到imageTxId的editLogs数据
if (!skipSharedEditsCheck &&
!checkLogsAvailableForRead(image, imageTxId, curTxId)) {
return ERR_CODE_LOGS_UNAVAILABLE;
}
// 通过http下载fsImage,名称为fsimage.ckpt文件,写到存储目录中。
// Download that checkpoint into our storage directories.
MD5Hash hash = TransferFsImage.downloadImageToStorage(
proxyInfo.getHttpAddress(), imageTxId, storage, true, true);
// 保存fsImage的md5值,并且重命名fsImage为正式的无ckpt的。
image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId,
hash);
// 写seen_txid到目录中
// Write seen_txid to the formatted image directories.
storage.writeTransactionIdFileToStorage(imageTxId, NameNodeDirType.IMAGE);
} catch (IOException ioe) {
throw ioe;
} finally {
image.close();
}
return 0;
}
校验shareEditsLog是否存在
先看 checkLogsAvailableForRead
此步骤主要是从 QJM中获取imageTxId到curTxId之间的editlogs的日志流
直接看重点
org.apache.hadoop.hdfs.server.namenode.FSEditLog.selectInputStreams
方法
public Collection<EditLogInputStream> selectInputStreams(long fromTxId,
long toAtLeastTxId, MetaRecoveryContext recovery, boolean inProgressOk,
boolean onlyDurableTxns) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
synchronized(journalSetLock) {
Preconditions.checkState(journalSet.isOpen(), "Cannot call " +
"selectInputStreams() on closed FSEditLog");
// 从共享qjm中获取editLogs,并保存
selectInputStreams(streams, fromTxId, inProgressOk, onlyDurableTxns);
}
try {
// 校验是否有间隔
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
} catch (IOException e) {
if (recovery != null) {
// If recovery mode is enabled, continue loading even if we know we
// can't load up to toAtLeastTxId.
LOG.error("Exception while selecting input streams", e);
} else {
closeAllStreams(streams);
throw e;
}
}
return streams;
}
下载fsImage
public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
Storage dstStorage, boolean needDigest, boolean isBootstrapStandby)
throws IOException {
String fileid = ImageServlet.getParamStringForImage(null,
imageTxId, dstStorage, isBootstrapStandby);
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
List<File> dstFiles = dstStorage.getFiles(
NameNodeDirType.IMAGE, fileName);
if (dstFiles.isEmpty()) {
throw new IOException("No targets in destination storage!");
}
// 下载并返回 md5值
MD5Hash hash = getFileClient(fsName, fileid, dstFiles, dstStorage, needDigest);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " +
dstFiles.get(0).length() + " bytes.");
return hash;
}
最后同步元数据完成
另外一个节点的数据目录下存放如下数据:
── current
├── fsimage_0000000000000000000
├── fsimage_0000000000000000000.md5
├── seen_txid
└── VERSION
1 directory, 4 files
希望对正在查看文章的您有所帮助,记得关注、评论、收藏,谢谢您