namenode
NameNde -> createNameNode -> new NameNode(conf); -> initialize(){
startHttpServer 50070
loadNamesystem 加载元数据(fsimage+editlog)
createRpcServer -> new NameNodeRpcServer(){
8020/9000 rpc tcp端口
//hadop-RPC-server 核心代码 JPS namenode服务
this.serviceRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
}
startCommonServices(){
//元数据管理和安全模式
namesystem.startCommonServices(conf, haContext);{
nnResourceChecker = new NameNodeResourceChecker(conf);
// 检查是否有足够的磁盘存储元数据
checkAvailableResources();
// 安全模式
setBlockTotal();
// 启动重要服务
blockManager.activate(conf){
//启动了等待复制的线程
pendingReplications.start();
//TODO 启动了管理心跳的服务
datanodeManager.activate(conf);
this.replicationThread.start();
}
}
rpcServer.start();
}
}
datanode
DataNode -> secureMain -> createDataNode(){
-> 一、instantiateDataNode(){
makeInstance -> new DataNode
-> startDataNode(){
// 初始化DataXceiverServer 用来接收客户端和其他DataNode传过来数据的服务。
1、initDataXceiver
// 启动HttpServer服务
2、startInfoServer(conf)
// 初始化RPC的服务
3、initIpcServer(conf){
ipcServer = new RPC.Builder(conf)
.setProtocol(ClientDatanodeProtocolPB.class)
.setInstance(service)
.setBindAddress(ipcAddr.getHostName())
.setPort(ipcAddr.getPort())
.setNumHandlers(conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,DFS_DATANODE_HANDLER_COUNT_DEFAULT))
.setVerbose(false)
.setSecretManager(blockPoolTokenSecretManager)
.build();
}
4、 创建了BlockPoolManager,一个集群就有一个BlockPool,一个联邦就是一个blockpool
blockPoolManager = new BlockPoolManager(this);
// 心跳
blockPoolManager.refreshNamenodes(conf); -> doRefreshNamenodes(){
// 遍历所有的联邦,一个联邦里面会有两个NameNode(HA)
for (String nsToAdd : toAdd) {
//一个联邦对应一个BPOfferService
//一个联邦里面的一个NameNode就是一个BPServiceActor
BPOfferService bpos = createBPOS(addrs);
}
// DataNode向NameNode进行注册和心跳
startAll(); -> bpos.start(); -> BPServiceActor.start();
-> bpThread.start() -> BPServiceActor.run()
-> connectToNNAndHandshake() -> register()(){
//bpNamenode RPC的客户端,服务端的代理 调用服务端(NameNode.NameNodeRpcServer)的registerDatanode方法
bpNamenode.registerDatanode(bpRegistration);
}
-> NameNodeRpcServer.registerDatanode -> namesystem.registerDatanode(nodeReg);
-> getBlockManager().getDatanodeManager().registerDatanode(nodeReg){
// 注册DataNode
addDatanode(nodeDescr);
// 把注册上来的DataNode加入到HeartbeatManager里面,进行心跳管理
heartbeatManager.addDatanode(nodeDescr);
}
}
}
}
// 启动DataNode后台线程
->二、runDatanodeDaemon(){
blockPoolManager.startAll();
dataXceiverServer.start();
localDataXceiverServer.start();
localDataXceiverServer.start();
}
}
元数据管理
1、用户创建目录时,namenode 先在内存创建目录树, 然后向磁盘中记录日志(双缓存机制),再写到journalnode
FileSystem fileSystem=FileSystem.newInstance(configuration);
fileSystem.mkdirs-> DistributedFileSystem.mkdirsInternal -> DfsClient.mkdirs -> primitiveMkdir
-> namenode.mkdirs ->(hadoop RPC) -> NameNodeRpcServer.mkdirs -> FSDirMkdirOp.mkdirs
-> createChildrenDirectories()
-> createSingleDirectory(){
// (1) 在内存里构建目录树
unprotectedMkdir
// (2) 往磁盘上面记录元数据日志
fsd.getEditLog().logMkDir(cur, newNode); -> logEdit -> logSync
// 双缓存机制 详见 https://blog.csdn.net/qq_41106844/article/details/111378865
// EditsDoubleBuffer (TxnBuffer bufCurrent,TxnBuffer bufReady)
// 先写入 bufCurrent,一定条件时(bufCurrent.size() >= 512kb) 交换 bufCurrent 和 bufReady
// 再刷盘
-> logStream.flush();
}
// 1.写到namenode磁盘 2. 写到journalnode
logStream.flush(); -> flushAndSync(durable);
//第一次:FileJouranlManager -> EditLogFileOutputStream
//第二次:QuorumJounalManager -> QuorumOutputStream
// (FSNamesystem.mkdirs.getEditLog().logSync()->logStream.flush()-> flushAndSync -> QuorumOutputStream.flushAndSync)
-> sendEdits -> getProxy().journal -> (rpc) -> JournalNodeRpcServer.journal(同样双缓存写入内存和磁盘)
2、StandbyNameNode 的 EditLogTailer周期性(每隔60s)的去journalnode集群上面去,
读取元数据日志,然后再把这些元数据日志应用到自己的元数据里面(内存+磁盘)
保持和activenamedoe元数据一致
NameNode.protected NameNode() -> state.enterState(haContext) -> StandbyState.enterState
-> NameNode.startStandbyServices -> namesystem.startStandbyServices(conf); -> editLogTailer.start();
-> tailerThread.start();-> EditLogTailerThread.run -> doWork -> doTailEdits -> image.loadEdits
-> FSEditLogLoader.loadFSEdits -> loadEditRecords(){
FSEditLogOp op = in.readOp() -> nextOp -> nextOpImpl -> init
fStream = log.getInputStream(); -> URLlog.getInputStream(){
// HttpURLConnection 连接 JournalNode -> JournalnodeHttpserver 使用http的方式从 JournalNode 拉取数据
connection = (HttpURLConnection)
connectionFactory.openConnection(url, isSpnegoEnabled);
JournalnodeHttpserver.start(){
httpServer.addInternalServlet("getJournal", "/getJournal",
GetJournalEditServlet.class, true);
绑定了 GetJournalEditServlet 这个servlet -> GetJournalEditServlet.doGet
}
}
->
applyEditLogOp(case OP_MKDIR)
-> 写入standynamenode和 activenamenode 一致
}
3、StandbyNameNode的StandbyCheckpointer 将ActiveNameNode 的fsimage和editlog拉下来合并然后上传到ActiveNameNode
NameNode.protected NameNode() -> state.enterState(haContext) -> StandbyState.enterState
-> NameNode.startStandbyServices -> namesystem.startStandbyServices(conf); ->standbyCheckpointer.start();
-> CheckpointerThread.run -> doWork -> doCheckpoint(){
// 1、把元数据持久化到磁盘上面
img.saveNamespace(namesystem, imageType, canceler);
// 2、合并后上传
TransferFsImage.uploadImageFromStorage -> uploadImage -> NameNodeHttpServer(){
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
} -> ImageServlet.doPut(){
// 针对请求获取到一个输入流,不断的把数据读取过来
InputStream stream = request.getInputStream();
// 会把接收过来的元数据 替换 现在已有的fsimage文件。对文件进行重命名
nnImage.saveDigestAndRenameCheckpointImage
}
}