总体流程:根据用户给定的 Filter,先对文件中所有 RowGroup (Block) 过滤一遍,留下满足要求的 RowGroup。对这些 RowGroup 中涉及到的所有 Chunk 都读出来,对其中的 Page 一个一个解压缩,拼成一个一个 Record,再进行过滤。
细节:
- 不管一个 page 是否满足条件,都会被反序列化。
- 将多个 page 的东西拼成一个 record,并进行过滤,过滤结果放到 IncrementallyUpdatedFilterPredicate 谓词里,并根据这个结果选择是否返回这行数据。
读取流程
-
读取文件元数据
首先在 ParquetReader.build 时,读取文件尾部的 Footer,里边有整个文件的元数据,即ParquetMetadata。
-
初始化: ParquetReader.initReader()
- 过滤 RowGroup。根据 RowGroup 中的统计信息和 Filter,对文件中的所有 RowGroup 进行过滤,将不满足的 RowGroup 丢掉。
-
构造 RecordReader: InternalParquetRecordReader.checkRead()
- 读取一个 RowGroup (包含各个 Chunk 的 PageReader),将查询涉及的所有 Chunk 的字节读到内存中。
- 对每个 Chunk,将其所有 Page 构造出来,用字节数组填充,作为压缩后的 Page,并用这些压缩的 Page 和解码器构造一个 ColumnChunkPageReader。
- 根据这些 ColumnChunkPageReader 构造 RecordReader。构造了一个 RecordReaderImplementation。并且在 ColumnReaderImpl 的 checkRead() 方法里开始读 Page,解压缩 page。
-
读取一行数据
- 根据 Definition Level 和 Repetition Level 的计算方法构造出一行一行 Record 来,这一行 Record 如果为 Null,就是不满足条件,继续构造 Record。在过滤的时候一直维护一个 IncrementallyUpdatedFilterPredicate,这个谓词里记录了当前 record 是否满足条件,在 IncrementallyUpdatedFilterPredicateBuilder 里根据新的数据更新这个谓词的状态。
查询代码
- 依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
Configuration conf = new Configuration();
ParquetInputFormat.setFilterPredicate(conf,
and(gtEq(longColumn("long_column"), startTime),
ltEq(longColumn("long_column"), endTime)));
FilterCompat.Filter filter = ParquetInputFormat.getFilter(conf);
Types.MessageTypeBuilder builder = Types.buildMessage();
builder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "long_column"));
builder.addField(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.DOUBLE, "double_column"));
MessageType querySchema = builder.named("default_schema_name");
conf.set(ReadSupport.PARQUET_READ_SCHEMA, querySchema.toString());
// set reader
ParquetReader.Builder<Group> reader= ParquetReader
.builder(new GroupReadSupport(), new Path("file_path"))
.withConf(conf)
.withFilter(filter);
ParquetReader<Group> build;
int result = 0;
try {
build = reader.build();
Group line;
while((line=build.read())!=null) {
result++;
}
} catch (IOException e) {
e.printStackTrace();
}