一.DataXceiverServer类图
二.DataXceiverServer
- Server used for receiving/sending a block of data.This is created to listen for requests from clients or other DataNodes. This small server does not use the Hadoop IPC mechanism.DataXceiverServer用于接收和发送block数据,它监听着client或者其它DataNode的请求。DataXceiverServer没有采用RPC机制。DataXceiverServer是流式机制,而RPC是命令式接口。
- DataXceiverServer每接收到一个请求,就会创建一个DataXceiver来处理该请求。
class DataXceiverServer implements Runnable, FSConstants { public static final Log LOG = DataNode.LOG; ServerSocket ss; DataNode datanode; // Record all sockets opend for data transfer Map<Socket, Socket> childSockets = Collections.synchronizedMap(new HashMap<Socket, Socket>()); static final int MAX_XCEIVER_COUNT = 256; //默认值是256,每个node最多可以起多少个DataXceiver,如果太多的话可能会导致内存不足 BlockBalanceThrottler balanceThrottler; DataXceiverServer(ServerSocket ss, Configuration conf, DataNode datanode) { this.ss = ss; this.datanode = datanode; } public void run() { while (datanode.shouldRun) { Socket s = ss.accept(); //接受client请求 s.setTcpNoDelay(true); //实例化DataXceiver,需要Socket和DataNode new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this)).start(); } } }
三.DataXceiver类图
- DataXceiver处理Client或DataNode的五种请求(DataTransferProtocol接口定义)。
public static final byte OP_WRITE_BLOCK = (byte) 80; public static final byte OP_READ_BLOCK = (byte) 81; /** * @deprecated As of version 15, OP_READ_METADATA is no longer supported */ @Deprecated public static final byte OP_READ_METADATA = (byte) 82; public static final byte OP_REPLACE_BLOCK = (byte) 83; public static final byte OP_COPY_BLOCK = (byte) 84; public static final byte OP_BLOCK_CHECKSUM = (byte) 85;
class DataXceiver implements Runnable, FSConstants { Socket s; final String remoteAddress; // address of remote side final String localAddress; // local address of this daemon DataNode datanode; DataXceiverServer dataXceiverServer; public DataXceiver(Socket s, DataNode datanode, DataXceiverServer dataXceiverServer) { this.s = s; this.datanode = datanode; this.dataXceiverServer = dataXceiverServer; dataXceiverServer.childSockets.put(s, s); remoteAddress = s.getRemoteSocketAddress().toString(); localAddress = s.getLocalSocketAddress().toString(); } public void run() { DataInputStream in = null; in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE)); short version = in.readShort(); if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) { throw new IOException( "Version Mismatch" ); } //version boolean local = s.getInetAddress().equals(s.getLocalAddress()); //本地请求 byte op = in.readByte(); //op switch (op) { //读数据块 case DataTransferProtocol.OP_READ_BLOCK: readBlock(in); datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime); if (local) datanode.myMetrics.incrReadsFromLocalClient(); else datanode.myMetrics.incrReadsFromRemoteClient(); break; //写数据块 case DataTransferProtocol.OP_WRITE_BLOCK: writeBlock(in); datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime); if (local) datanode.myMetrics.incrWritesFromLocalClient(); else datanode.myMetrics.incrWritesFromRemoteClient(); break; //替换数据块 case DataTransferProtocol.OP_REPLACE_BLOCK: replaceBlock(in); datanode.myMetrics .addReplaceBlockOp(DataNode.now() - startTime); break; //拷贝数据块 case DataTransferProtocol.OP_COPY_BLOCK: // for balancing purpose; send to a proxy source copyBlock(in); datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime); break; //读取数据块校验码 case DataTransferProtocol.OP_BLOCK_CHECKSUM: // get the checksum of // a block getBlockChecksum(in); datanode.myMetrics.addBlockChecksumOp(DataNode.now() - startTime); break; default: throw new IOException("Unknown opcode " + op + " in data stream"); } dataXceiverServer.childSockets.remove(s); } }