OAP ParquetDataFile

ParquetDataFile.scala

val iterator = reader.iteratorWithRowIds(requiredIds, rowIds)
      .asInstanceOf[OapCompletionIterator[InternalRow]]
    val result = ArrayBuffer[Int]()
    while (iterator.hasNext) {
      val row: InternalRow = iterator.next()
      assert(row.numFields == 2)
      result += row.getInt(0)
    }

DataFile.scala

private[oap] class OapCompletionIterator[T](inner: Iterator[T], completionFunction: => Unit)
    extends Iterator[T] with Closeable {

  private[this] var completed = false
  override def hasNext: Boolean = {
    val r = inner.hasNext
    if (!r && !completed) {
      completed = true
      completionFunction
    }
    r
  }
  override def next(): T = inner.next()
  override def close(): Unit = {}
}

ParquetDataFile.scala
hasnext()

private class FileRecordReaderIterator[V](private[this] var rowReader: RecordReader[V])
    extends Iterator[V] with Closeable {
    private[this] var havePair = false
    private[this] var finished = false

    override def hasNext: Boolean = {
      if (!finished && !havePair) {
        finished = !rowReader.nextKeyValue
        if (finished) {
          close()
        }
        havePair = !finished
      }
      !finished
    }

    override def next(): V = {
      if (!hasNext) {
        throw new java.util.NoSuchElementException("End of stream")
      }
      havePair = false
      rowReader.getCurrentValue
    }

    override def close(): Unit = {
      if (rowReader != null) {
        try {
          rowReader.close()
        } finally {
          rowReader = null
        }
      }
    }
  }

VectorizedCacheReader.scala

override def nextKeyValue(): Boolean = {
    resultBatch

    if (returnColumnarBatch) {
      return nextBatch
    }

    if (batchIdx >= numBatched) {
      if (!nextBatch) {
        return false
      }
    }
    batchIdx += 1
    true
  }

IndexedVectorizedCacheReader.scala

override def nextBatch: Boolean = {
    // if idsMap is Empty, needn't read remaining data in this row group
    // rowsReturned = totalCountLoadedSoFar to skip remaining data
    if (idsMap.isEmpty) {
      rowsReturned = totalCountLoadedSoFar
    }

    if (rowsReturned >= totalRowCount) {
      return false
    }

    checkEndOfRowGroup()

    var ids = idsMap.remove(currentPageNumber)
    currentPageNumber += 1

    while (ids == null || ids.isEmpty) {
      skipBatchInternal()
      ids = idsMap.remove(currentPageNumber)
      currentPageNumber += 1
    }

    nextBatchInternal()
    if (!returnColumnarBatch) {
      batchIds = ids
      numBatched = ids.size
    }
    true
  }

猜你喜欢

转载自blog.csdn.net/zhixingheyi_tian/article/details/86742337