ParquetDataFile.scala
val iterator = reader.iteratorWithRowIds(requiredIds, rowIds)
.asInstanceOf[OapCompletionIterator[InternalRow]]
val result = ArrayBuffer[Int]()
while (iterator.hasNext) {
val row: InternalRow = iterator.next()
assert(row.numFields == 2)
result += row.getInt(0)
}
DataFile.scala
private[oap] class OapCompletionIterator[T](inner: Iterator[T], completionFunction: => Unit)
extends Iterator[T] with Closeable {
private[this] var completed = false
override def hasNext: Boolean = {
val r = inner.hasNext
if (!r && !completed) {
completed = true
completionFunction
}
r
}
override def next(): T = inner.next()
override def close(): Unit = {}
}
ParquetDataFile.scala
hasnext()
private class FileRecordReaderIterator[V](private[this] var rowReader: RecordReader[V])
extends Iterator[V] with Closeable {
private[this] var havePair = false
private[this] var finished = false
override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !rowReader.nextKeyValue
if (finished) {
close()
}
havePair = !finished
}
!finished
}
override def next(): V = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
rowReader.getCurrentValue
}
override def close(): Unit = {
if (rowReader != null) {
try {
rowReader.close()
} finally {
rowReader = null
}
}
}
}
VectorizedCacheReader.scala
override def nextKeyValue(): Boolean = {
resultBatch
if (returnColumnarBatch) {
return nextBatch
}
if (batchIdx >= numBatched) {
if (!nextBatch) {
return false
}
}
batchIdx += 1
true
}
IndexedVectorizedCacheReader.scala
override def nextBatch: Boolean = {
// if idsMap is Empty, needn't read remaining data in this row group
// rowsReturned = totalCountLoadedSoFar to skip remaining data
if (idsMap.isEmpty) {
rowsReturned = totalCountLoadedSoFar
}
if (rowsReturned >= totalRowCount) {
return false
}
checkEndOfRowGroup()
var ids = idsMap.remove(currentPageNumber)
currentPageNumber += 1
while (ids == null || ids.isEmpty) {
skipBatchInternal()
ids = idsMap.remove(currentPageNumber)
currentPageNumber += 1
}
nextBatchInternal()
if (!returnColumnarBatch) {
batchIds = ids
numBatched = ids.size
}
true
}