HDFS读数据分析(二):获取到一个被读文件的meta信息后(block信息等),是怎么读这些block的?

一、前言

1. 版本:
Hadoop 源码版本: Version 2.7.1

2. HDFS读一个文件的流程图

二、回顾区

用户的App通过调用流程FileSystem.open->DistributedFileSystem.open->DFSClient.open->new DFSInputStream()后,获取到了要读取文件的所有blocks信息(文件长度、文件包含的Block的位置等信息)并保存在DFSInputStream对象的成员变量里。由于block可能是本地的或者远程的,它们分别是怎么被读的?

三、预热区(数据结构)

  • block: The hdfs block, typically large (~128MB). For more, pls refer to src RemoteBlockReader2.java in Hadoop Project
  • chunk: A block is divided into chunks, each comes with a checksum. We want transfers to be chunk-aligned, to be able to verify checksums.
  • packet: A grouping of chunks used for transport. It contains a header, followed by checksum data, followed by real data. For more, pls refer to src PacketReceiver.java and PacketHeader.java in Hadoop Project.

HDFS中进行通信的数据包(Packet)的格式如下:

   +-----------------------------------------------------------------------+
   |    4 byte packet length(exclude packet header)            | 
  +------------------------------------------------------------------------+
   |   8 byte offset in the block | 8 byte sequence number   |
  +------------------------------------------------------------------------+
   |                 1 byte isLastPacketInBlock                           |
  +------------------------------------------------------------------------+
  |                 4 byte Length of actual data                           |
 +-------------------------------------------------------------------------+
  |           x byte checksum data. x is defined below            |
 +-------------------------------------------------------------------------+
  |           actual data .........                                                  |
 +--------------------------------------------------------------------------+

  x = (length of data + BYTE_PER_CHECKSUM - 1) / BYTE_PER_CHECKSUM * CHECKSUM_SIZE

  • LocatedBlocks:LocatedBlocks在hdfs读取文件时调用openInfo()方法,最终调用的是DFSInputStream的fetchLocatedBlocksAndGetLastBlockLength方法获取块信息LocatedBlocks。块的信息非常详尽,如块名称,大小,起始偏移量,IP地址等。

    在hadoop中写文件实际是把block写入到datanode中,而namenode是通过datanode定期的汇报得知该文件到底由哪几个block组成的。因此在读某个文件时可能存在datanode还未汇报给namenode的情况,因此在读文件时只能读到最后一个汇报的block块。isLastBlockComplete可以标识是否读取到最后的块。若不是,则会根据元数据提供的block的pipeline来到datanode上获得block的写入长度,并赋值给lastBlockBeingWrittenLength。

    扫描二维码关注公众号,回复: 5666870 查看本文章
LocatedBlocks {
	fileLength = 210606807
		underConstruction = false
		blocks = [LocatedBlock {
				BP - 1853423215 - 192.X.X.X - 1474747765776: blk_1073741828_1004;
				getBlockSize() = 134217728;
				corrupt = false;
				offset = 0;
				locs = [192.X.X.X: 50010]
			}, LocatedBlock {
				BP - 1853423215 - 192.X.X.X - 1474747765776: blk_1073741829_1005;
				getBlockSize() = 76389079;
				corrupt = false;
				offset = 134217728;
				locs = [192.X.X.X: 50010]
			}
		]
		lastLocatedBlock = LocatedBlock {
		BP - 1853423215 - 192.X.X.X - 1474747765776: blk_1073741829_1005;
		getBlockSize() = 76389079;
		corrupt = false;
		offset = 134217728;
		locs = [192.X.X.X: 50010]
	}
	isLastBlockComplete = true
}
 
  • LocatedBlock: 表示单个block, 成员变量包含了ExtendedBlock b和 StorageType[] storageTypes

/**
 * Associates a block with the Datanodes that contain its replicas
 * and other block metadata (E.g. the file offset associated with this
 * block, whether it is corrupt, a location is cached in memory,
 * security token, etc).
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class LocatedBlock {

  private final ExtendedBlock b;
  private long offset;  // offset of the first byte of the block in the file
  private final DatanodeInfoWithStorage[] locs;
  /** Cached storage ID for each replica */
  private String[] storageIDs;
  /** Cached storage type for each replica, if reported. */
  private StorageType[] storageTypes;
  // corrupt flag is true if all of the replicas of a block are corrupt.
  // else false. If block has few corrupt replicas, they are filtered and 
  // their locations are not part of this object
  private boolean corrupt;
  private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
  /**
   * List of cached datanode locations
   */
  private DatanodeInfo[] cachedLocs;

  // Used when there are no locations
  private static final DatanodeInfoWithStorage[] EMPTY_LOCS =
      new DatanodeInfoWithStorage[0];
...
}
  • ExtendedBlock: Identifies a Block uniquely across the block pools
public class ExtendedBlock {
  private String poolId;
  private Block block;
...
}
  • StorageType: 指定了单个block的存储介质
/**
 * Defines the types of supported storage media. The default storage
 * medium is assumed to be DISK.
 */
@InterfaceAudience.Public
@InterfaceStability.Unstable
public enum StorageType {  
  // sorted by the speed of the storage types, from fast to slow
  RAM_DISK(true),
  SSD(false),
  DISK(false),
  ARCHIVE(false);

  private final boolean isTransient;

  public static final StorageType DEFAULT = DISK;
 ...
}

四、分析区

1) 用户层读接口的调用

在构建DFSInputStream对象时获取了文件的meta信息(存储在locatedBlocks等成员变量里),而DFSInputStream对象封装在FSDataInputStream对象里,对于HDFS文件系统来说,就是HDFSDataInputStream对象。可以通过FSDataInputStream.read直接读,也可以再封装FSDataInputStream到InputStreamReader 和 BufferedReader中,它们提供了更丰富的读接口,如readLine()。下面是 “HDFS读数据分析(一):用户的读文件代码是怎么走到HDFS的”里开始的例子。

...
  reader = new BufferedReader(new InputStreamReader(in)); //in 为 FSDataInputStream对象
  String line = null;
  while((line = reader.readLine()) != null) { // 读取文件行内容
       System.out.println("Record: " + line);

  }

不管用户使用BufferedReader.readLine接口还是FSDataInputStream.read来读取数据,流程都会进入到DFSInputStream对应的read函数里,而DFSInputStream里目前有两种策略 ByteBufferStrategy 和 ByteArrayStrategy。不同的DFSInputStream.read(...)函数会自动对应到不同的读策略,这个之前有提到。

2) 由于文件的meta信息,block的location信息不同,有local和 remote之分。详情如下:

HDFS目前实现的读操作有三个层次,分别是网络读、短路读(short circuit read)以及零拷贝(zero copy read),它们的读取效率一次递增。
网络读:网络读是最基本的一种HDFS读,DFSClient和Datanode通过建立Socket连接传输数据。
短路读:当DFSClient和保存目标数据块的Datanode在同一个物理节点上时,DFSClient可以直接打开数据块副本文件读取数据,而不需要Datanode进程的转发。后面会讲到。
零拷贝读:当DFSClient和缓存目标数据块的Datanode在同一个物理节点上时,DFSClient可以通过零拷贝方式读取该数据块,大大提供了效率。而且即使在读取过程中,该数据块已经被Datanode从缓存中移出了,读取操作也可以退化成本地短路读。

上面的三个层次通过BlockReader 接口类的子类来具体实现的:

网络读:RemoteBlockReader2 和 RemoteBlockReader(已经置成deprecated了)

短路读:BlockReaderLocal 和 BlockReaderLocalLegacy

零拷贝读HasEnhancedByteBufferAccessDFSInputStream 实现了ByteBuffer read(ByteBufferPool bufferPool,
      int maxLength, EnumSet<ReadOption> opts)函数。

3) BlockReader接口及实现类(包含网络读 和 短路读)

能看到BlockReader接口的子类都实现了两种策略 ByteBufferStrategy 和 ByteArrayStrategy 的 read(...)方式

4)RemoteBlockReader2(网络读) 分析:

先给出RemoteBlockReader2的ByteArrayStrategy的调用栈,举例Text文件读:
TextInputFormat(继承FileInputFormat)=>
    LineRecordReader.LineRecordReader=>
        DFSclient.open=>                                                                                //这里返回了FSDataInputStream对象
            HdfsDataInputStream.read =>                                                         //HdfsDataInputStream是FSDataInputStream子类
                FSDataInputStream.read(ByteBufferPool bufferPool, int maxLength,EnumSet<ReadOption> opts) 
                    => ((HasEnhancedByteBufferAccess)in).read(bufferPool,maxLength, opts)           //先尝试零拷贝读
                    -> ByteBufferUtil.fallbackRead                                                  //零拷贝读没成功
                        => DFSInputStream.read(byte[] buf, ...)                         //根据byte[]还是byteBuffer创建对应的readStrategy
                            => ByteArrayStrategy.readWithStrategy(new ByteArrayStrategy(buf)))
                                => DFSInputStream.blockSeekTo(pos)
                                -> chooseDataNode(targetBlock, null)                        //选择最优的datanode
                                -> BlockReaderFactory.build                                 //创建具体的blockReader(网络读/本地读)
                                =>readBuffer(strategy, off, realLen, corruptedBlockMap)   //回到readWithStrategy函数调用readBuffer
                                    =>reader.doRead(blockReader, off, len)      //reader是ByteArrayStrategy对象,ByteArrayStrategy.doRead
                                        =>blockReader.read(buf, off, len)       //blockReader是RemoteBlockReader2对象

到blockReader.read(buf, off, len)这一步就是进入RemoteBlockReader2.read(...)里了。来看看byteArray类型read的代码:

 public synchronized int read(byte[] buf, int off, int len) 
                               throws IOException {

    UUID randomId = null;
    if (LOG.isTraceEnabled()) {
      randomId = UUID.randomUUID();
      LOG.trace(String.format("Starting read #%s file %s from datanode %s",
        randomId.toString(), this.filename,
        this.datanodeID.getHostName()));
    }

    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
      TraceScope scope = Trace.startSpan(
          "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
      try {
        readNextPacket();
      } finally {
        scope.close();
      }
    }

    if (LOG.isTraceEnabled()) {
      LOG.trace(String.format("Finishing read #" + randomId));
    }

    if (curDataSlice.remaining() == 0) {
      // we're at EOF now
      return -1;
    }
    
    int nRead = Math.min(curDataSlice.remaining(), len);
    curDataSlice.get(buf, off, nRead);
    
    return nRead;
  }

上面的函数调用了readNextPacket()把读到数据解析,检查checksum,并把真正的数据放到curDataSlice变量里。最后通过curDataSlice.get(buf, off, nRead) 把curDataSlice的数据放到buf[]中返回。

如果打开了RemoteBlockReader2模块的TRACE开关,可以看到 nodemanager.log 文件里有如下log:

2019-01-15 13:48:40,488 TRACE org.apache.hadoop.hdfs.RemoteBlockReader2: Starting read #d9d5cd28-f3e2-4ba8-bf48-1797cdfa7b72 file /user/yeshang/.sparkStaging/application_1547530487554_0001/__spark_libs__9183298763495301561.zip from datanode cluster2.serversolution.sh.hxt
2019-01-15 13:48:40,489 TRACE org.apache.hadoop.hdfs.RemoteBlockReader2: DFSClient readNextPacket got header PacketHeader with packetLen=132100 header data: offsetInBlock: 262144
seqno: 2
lastPacketInBlock: false
dataLen: 131072

2019-01-15 13:48:40,489 TRACE org.apache.hadoop.hdfs.RemoteBlockReader2: Finishing read #d9d5cd28-f3e2-4ba8-bf48-1797cdfa7b72

5)奉上整个读相关的类图,看代码和理解用得到:

五、总结区

DFSInputStream 是 HDFS文件系统读的核心类,它被封装在FSDataInputStream中。

open时调用DFSClient的接口从namenode那里获取要读的文件的meta信息,由于一个block有replication,就需要筛选最优的一个block(尽量近,尽量好的存储介质)来读。

read时先创建对应的ReadStrategy对象,再调用readWithStrategy,根据要读的文件position,找到要读的datanode的block。在readWithStrategy里调用blockSeekTo函数创建了对应的blockReader,readWithStrategy里又调用readBuffer函数使用前面创建的blockReader来真正读数据。


六、参考
HDFS的Read过程分析: https://www.cnblogs.com/linghuchong0605/p/4213389.html
DFSClient技术内幕(读取数据): https://blog.csdn.net/u013494310/article/details/19236895
读HDFS书笔记---5.2 文件读操作与输入流(5.2.1)   https://blog.csdn.net/weixin_39935887/article/details/84328640
读HDFS书笔记---5.2 文件读操作与输入流(5.2.2)---上  https://blog.csdn.net/weixin_39935887/article/details/84338294
读HDFS书笔记---5.2 文件读操作与输入流(5.2.2)---下  https://blog.csdn.net/weixin_39935887/article/details/84393611
Hadoop中的FileStatus、BlockLocation、LocatedBlocks、InputSplit: https://blog.csdn.net/woshixuye/article/details/53588841

猜你喜欢

转载自blog.csdn.net/don_chiang709/article/details/86496541