VectorizedParquetRecordReader.java
{
/**
* The number of rows that have been returned.
*/
private long rowsReturned;
/**
* The number of rows that have been reading, including the current in flight row group.
*/
private long totalCountLoadedSoFar = 0;
}
rowsReturned 为记录累计返回值的变量。
totalCountLoadedSoFar,累计加载Row Count的总量,一次加载一个 rowgroup 的count
nextBatch() 方法
public boolean nextBatch() throws IOException {
for (WritableColumnVector vector : columnVectors) {
vector.reset();
}
columnarBatch.setNumRows(0);
//第一波过滤, 文件里是否所有的行都已经被读取了
if (rowsReturned >= totalRowCount) return false;
checkEndOfRowGroup();
// 通过 totalCountLoadedSoFar 、rowsReturned 来控制是否已经读完一个rowgroup
int num = (int) Math.min((long) CAPACITY, totalCountLoadedSoFar - rowsReturned);
for (int i = 0; i < columnReaders.length; ++i) {
if (columnReaders[i] == null) continue;
columnReaders[i].readBatch(num, columnVectors[i]);
}
rowsReturned += num;
columnarBatch.setNumRows(num);
numBatched = num;
batchIdx = 0;
return true;
}
checkEndOfRowGroup 方法
检查是否加载下一个rowgroup。
columnReaders 在每个rowgroup 里重新指向新的引用。
有 pages 决定指向具体的 rowgroup
private void checkEndOfRowGroup() throws IOException {
// 第二波过滤, 判断当前 rowGroup 是否已经结束。
if (rowsReturned != totalCountLoadedSoFar) return;
PageReadStore pages = reader.readNextRowGroup();
if (pages == null) {
throw new IOException("expecting more rows but reached last block. Read "
+ rowsReturned + " out of " + totalRowCount);
}
List<ColumnDescriptor> columns = requestedSchema.getColumns();
List<Type> types = requestedSchema.asGroupType().getFields();
columnReaders = new VectorizedColumnReader[columns.size()];
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(),
pages.getPageReader(columns.get(i)), convertTz);
}
totalCountLoadedSoFar += pages.getRowCount();
}
父类 SpecificParquetRecordReaderBase.java
/**
* The total number of rows this RecordReader will eventually read. The sum of the
* rows of all the row groups.
*/
protected long totalRowCount;
VectorizedColumnReader.java
checkEndOfRowGroup() 方法涉及的columnReaders ,是VectorizedColumnReader 构造出的实例
columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(),
pages.getPageReader(columns.get(i)), convertTz);
/**
* Total number of values in this column (in this row group).
*/
private final long totalValueCount;
public VectorizedColumnReader(
ColumnDescriptor descriptor,
OriginalType originalType,
PageReader pageReader,
TimeZone convertTz) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.convertTz = convertTz;
this.originalType = originalType;
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
if (dictionaryPage != null) {
try {
this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
this.isCurrentPageDictionaryEncoded = true;
} catch (IOException e) {
throw new IOException("could not decode the dictionary for " + descriptor, e);
}
} else {
this.dictionary = null;
this.isCurrentPageDictionaryEncoded = false;
}
this.totalValueCount = pageReader.getTotalValueCount();
if (totalValueCount == 0) {
throw new IOException("totalValueCount == 0");
}
}