http://zhangjun5965.iteye.com/admin/blogs/2386384
datanode注册和心跳
在hadoop启动的时候,正常的流程是先启动namenoe,然后启动datanode,因为namenode要接受datanode的注册,datanode的注册和心跳是在其启动的时候就开始了,入口方法自然是datanode的main方法。
通过跟踪代码发现在datanode的构造方法里,初始化了BlockPoolManager对象,通过其 blockPoolManager.refreshNamenodes(conf);从配置文件中获取该datanode相关的namenode信息,然后向其发生注册和心跳信息。
具体的是BlockPoolManager里面的startAll()方法,通过startAll方法,会将datanode上面的所有BPOfferService启动.
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
for (BPOfferService bpos : offerServices) {
bpos.start();
}
return null;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}
通过BPOfferService的start方法循环启动BPServiceActor线程,以便BPServiceActor向其对应的namenode发送注册和心跳消息。
//This must be called only by blockPoolManager
void start() {
for (BPServiceActor actor : bpServices) {
actor.start();
}
}
具体的实现方法自然在BPServiceActor的run方法中。
/**
* 无论发生任何异常,都不会停止offerService方法,除非shouldRun 或者shouldServiceRun返回false
* No matter what kind of exception we get, keep retrying to offerService().
* That's the loop that connects to the NameNode and provides basic DataNode
* functionality.
*
* Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
* happen either at shutdown or due to refreshNamenodes.
*/
@Override
public void run() {
LOG.info(this + " starting to offer service");
try {
while (true) {
// init stuff
try {
// setup storage
//连接到namenode,注册datanode
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
.......
}
}
runningState = RunningState.RUNNING;
while (shouldRun()) {
try {
//心跳报告
offerService();
} catch (Exception ex) {
.....
}
}
runningState = RunningState.EXITED;
} catch (Throwable ex) {
...
} finally {
....
}
}
datanode注册
datanode端的注册相对来说比较简单,通过跟踪connectToNNAndHandshake方法,最后调用的是DatanodeProtocolServerSideTranslatorPB.registerDatanode(RpcController, RegisterDatanodeRequestProto)方法。
在这里构造了一个DatanodeRegistration对象作为参数,里面包含了namenode需要验证datanode的一些基本信息。
最后通过datanode和namenode直接交互的协议DatanodeProtocol接口的registerDatanode方法向namenode发送rpc请求来注册datanode。
该方法最后将datanode注册namenode之后返回的结果处理后返回。
@Override
public RegisterDatanodeResponseProto registerDatanode(
RpcController controller, RegisterDatanodeRequestProto request)
throws ServiceException {
DatanodeRegistration registration = PBHelper.convert(request
.getRegistration());
DatanodeRegistration registrationResp;
try {
registrationResp = impl.registerDatanode(registration);
} catch (IOException e) {
throw new ServiceException(e);
}
return RegisterDatanodeResponseProto.newBuilder()
.setRegistration(PBHelper.convert(registrationResp)).build();
}
datanode心跳
datanode的心跳操作主要是在offerService方法中,这个方法会一直运行下去直到shouldRun返回false。
心跳操作,首先向namenode发送心跳的请求,然后根据返回的结果更新一些信息,然后处理从namenode带回来的各种命令(DatanodeCommand数组)
发送心跳的方法是sendHeartBeat,最终调用了DatanodeProtocolServerSideTranslatorPB类的sendHeartbeat(RpcController, HeartbeatRequestProto)方法来发送心跳的请求。
@Override
public HeartbeatResponseProto sendHeartbeat(RpcController controller,
HeartbeatRequestProto request) throws ServiceException {
HeartbeatResponse response;
try {
......
//发送心跳
response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
report, request.getCacheCapacity(), request.getCacheUsed(),
request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes(),
volumeFailureSummary);
} catch (IOException e) {
throw new ServiceException(e);
}
//返回命令
HeartbeatResponseProto.Builder builder = HeartbeatResponseProto
.newBuilder();
DatanodeCommand[] cmds = response.getCommands();
.............
}
具体的发送心跳的协议我们来看下DatanodeProtocol类的sendHeartbeat方法。
/**
*
* sendHeartbeat方法告诉namenode这个datanode还活着,当然也包含一些状态的信息. namenode也会通过心跳信息给datanode发送一些命令,通过DatanodeCommand对象来封装命令。通过这些DatanodeCommand命令,DataNode做一些删除本地无效的块、或者将本地的块复制到其他的datanode的操作。
*
* sendHeartbeat() tells the NameNode that the DataNode is still
* alive and well. Includes some status info, too.
* It also gives the NameNode a chance to return
* an array of "DatanodeCommand" objects in HeartbeatResponse.
* A DatanodeCommand tells the DataNode to invalidate local block(s),
* or to copy them to other DataNodes, etc.
* @param registration datanode registration information datanode的注册信息
* @param reports utilization report per storage datanode上每个存储的利用率报告(datanode可以配置多个存储目录,这些存储目录可以是异构的,如内存、disk、ssd等)
* @param xmitsInProgress number of transfers from this datanode to others
* @param xceiverCount number of active transceiver threads
* @param failedVolumes number of failed volumes
* @param volumeFailureSummary info about volume failures
* @throws IOException on error
*/
@Idempotent
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports,
long dnCacheCapacity,
long dnCacheUsed,
int xmitsInProgress,
int xceiverCount,
int failedVolumes,
VolumeFailureSummary volumeFailureSummary)
throws IOException;
namenode接收注册和心跳信息
DatanodeManager简单介绍
首先介绍下DatanodeManager中的几个重要变量
/**
*
*
* datanodeMap这个map主要是存储了StorageID到DatanodeDescriptor的映射关系,如注释所说,datanode向namenode注册的时候分为三种情况。
*
* 1.如果是以一个新的storage id注册,直接放到map里。
* 2.现有节点重复注册,这个时候用新的替换旧的就行
* 3.一个已经存在的datanode以一个不同的storage id来注册
*
* Stores the datanode -> block map.
* <p>
* Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
* storage id. In order to keep the storage map consistent it tracks
* all storages ever registered with the namenode.
* A descriptor corresponding to a specific storage id can be
* <ul>
* <li>added to the map if it is a new storage id;</li>
* <li>updated with a new datanode started as a replacement for the old one
* with the same storage id; and </li>
* <li>removed if and only if an existing datanode is restarted to serve a
* different storage id.</li>
* </ul> <br>
* <p>
* Mapping: StorageID -> DatanodeDescriptor
*/
private final NavigableMap<String, DatanodeDescriptor> datanodeMap
= new TreeMap<String, DatanodeDescriptor>();
/**
* 集群的网络结构
* Cluster network topology
*/
private final NetworkTopology networktopology;
/**
* host和DatanodeDescriptor和映射,因为一个节点上可能会有多个datanode。
* 所以在Host2NodesMap内部其实是String到DatanodeDescriptor[]的映射,
* 这样的话对这个节点上的某一个datanode进行增删改查操作一个普通的map就无能为力了,
* Host2NodesMap主要就是对这些增删改查操作做了一下封装
* Host names to datanode descriptors mapping.
*/
private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
namednoe接收注册的信息
不管是注册和心跳,datanode都是通过rpc调用了namenode中的同名方法,具体的实现是在NameNodeRpcServe中。
在registerDatanode方法中,调用了FSNamesystem的registerDatanode方法,最终的处理方法是在DatanodeManager.registerDatanode(DatanodeRegistration)中。
首先通过下面的两行代码获取了注册的datanode在datanodemanage中的两个map中的信息。
//这里用 nodeS表示从datanodeMap中获取的datanode信息
DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
//用nodeN表示从host2DatanodeMap获取的信息
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(nodeReg.getIpAddr(), nodeReg.getXferPort());
接下来对datanodemanage中datanodeMap的注释中说的的三种情况 分别进行处理
//此情况为数据节点存在,但是使用了新的存储ID
if (nodeN != null && nodeN != nodeS) {
NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
//移除
removeDatanode(nodeN);
// physically remove node from datanodeMap
//物理层面的移除,包含移除这个datanode下面的数据块等
wipeDatanode(nodeN);
nodeN = null;
}
//重复注册的情况,主要就是更新信息
if (nodeS != null) {
................
nodeS.updateRegInfo(nodeReg);
nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
nodeS.setDisallowed(false); // Node is in the include list
//重新解析网络的位置信息
// resolve network location
if(this.rejectUnresolvedTopologyDN) {
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
} else {
nodeS.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
nodeS.setDependentHostNames(
getNetworkDependenciesWithDefault(nodeS));
}
getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
incrementVersionCount(nodeS.getSoftwareVersion());
startDecommissioningIfExcluded(nodeS);
success = true;
...........................
}
//接下来处理从未注册的过的新节点注册的情况
// resolve network location
//解析网络信息,将其加入集群的网络拓扑中
if(this.rejectUnresolvedTopologyDN) {
nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));
} else {
nodeDescr.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
nodeDescr.setDependentHostNames(
getNetworkDependenciesWithDefault(nodeDescr));
}
networktopology.add(nodeDescr);
nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
// register new datanode
addDatanode(nodeDescr);
// also treat the registration message as a heartbeat
// no need to update its timestamp
// because its is done when the descriptor is created
heartbeatManager.addDatanode(nodeDescr);
incrementVersionCount(nodeReg.getSoftwareVersion());
startDecommissioningIfExcluded(nodeDescr);
success = true;
上述对于三种注册的情况分别进行了处理,针对新的节点注册的情况,最终调用了addDatanode方法进行注册,主要就是在那两个map中添加相应的datanode信息,以及将datanode加到网络拓扑中。
/** Add a datanode. */
void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
//加到datanodeMap和 host2DatanodeMap中
synchronized(datanodeMap) {
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
}
//加到网络中
networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node);
checkIfClusterIsNowMultiRack(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
+ "node " + node + " is added to datanodeMap.");
}
}
namenode 接收心跳信息
namenode处理心跳信息是在和datanode同名的方法sendHeartbeat中,最终的处理方法是DatanodeManager.handleHeartbeat方法。
心跳的具体流程如下:
1.先获取datanode的信息,判断是否允许连接(比如在exclude中),如果不允许的话,直接抛出异常。
2.判断是否注册过,如果没注册过,直接返回注册命令
3.更新datanode的信息,主要就是更新DatanodeDescriptor中的信息,如使用空间,剩余空间等。
4.检查是否处于安全模式 5.检查租约情况
6.生成复制的命令
7.生成删除的命令
8.生成缓存相关的命令
9.生成带宽相关的命令
10.返回所有的命令
相关的代码如下:
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
try {
//获取datanode的信息
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
//是否允许连接
// Check if this datanode should actually be shutdown instead.
if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
//检查是否注册过
if (nodeinfo == null || !nodeinfo.isRegistered()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
//更新datanode的信息,如使用空间,剩余空间等
heartbeatManager.updateHeartbeat(nodeinfo, reports,
cacheCapacity, cacheUsed,
xceiverCount, failedVolumes,
volumeFailureSummary);
//是否处于安全模式
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.
if(namesystem.isInSafeMode()) {
return new DatanodeCommand[0];
}
//检查租约情况
//check lease recovery
BlockInfoContiguousUnderConstruction[] blocks = nodeinfo
.getLeaseRecoveryCommand(Integer.MAX_VALUE);
if (blocks != null) {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length);
.................................
return new DatanodeCommand[] { brCommand };
}
final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
//生成复制命令
//check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers);
if (pendingList != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
//检查无效的数据块,生成删除命令
//check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
blockPoolId, blks));
}
//生成缓存相关的命令
boolean sendingCachingCommands = false;
long nowMs = monotonicNow();
if (shouldSendCachingCommands &&
((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
timeBetweenResendingCachingDirectivesMs)) {
DatanodeCommand pendingCacheCommand =
getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
DatanodeProtocol.DNA_CACHE, blockPoolId);
if (pendingCacheCommand != null) {
cmds.add(pendingCacheCommand);
sendingCachingCommands = true;
}
DatanodeCommand pendingUncacheCommand =
getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
DatanodeProtocol.DNA_UNCACHE, blockPoolId);
if (pendingUncacheCommand != null) {
cmds.add(pendingUncacheCommand);
sendingCachingCommands = true;
}
if (sendingCachingCommands) {
nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
}
}
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
//生成带宽相关的命令
// check for balancer bandwidth update
if (nodeinfo.getBalancerBandwidth() > 0) {
cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
// set back to 0 to indicate that datanode has been sent the new value
nodeinfo.setBalancerBandwidth(0);
}
//返回所有的命令
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
}
}
return new DatanodeCommand[0];
}