Parquet 查询流程

总体流程:根据用户给定的 Filter,先对文件中所有 RowGroup (Block) 过滤一遍,留下满足要求的 RowGroup。对这些 RowGroup 中涉及到的所有 Chunk 都读出来,对其中的 Page 一个一个解压缩,拼成一个一个 Record,再进行过滤。

细节:

  • 不管一个 page 是否满足条件,都会被反序列化。
  • 将多个 page 的东西拼成一个 record,并进行过滤,过滤结果放到 IncrementallyUpdatedFilterPredicate 谓词里,并根据这个结果选择是否返回这行数据。

读取流程

  • 读取文件元数据

    首先在 ParquetReader.build 时,读取文件尾部的 Footer,里边有整个文件的元数据,即ParquetMetadata。

  • 初始化: ParquetReader.initReader()

    • 过滤 RowGroup。根据 RowGroup 中的统计信息和 Filter,对文件中的所有 RowGroup 进行过滤,将不满足的 RowGroup 丢掉。
  • 构造 RecordReader: InternalParquetRecordReader.checkRead()

    • 读取一个 RowGroup (包含各个 Chunk 的 PageReader),将查询涉及的所有 Chunk 的字节读到内存中。
    • 对每个 Chunk,将其所有 Page 构造出来,用字节数组填充,作为压缩后的 Page,并用这些压缩的 Page 和解码器构造一个 ColumnChunkPageReader。
    • 根据这些 ColumnChunkPageReader 构造 RecordReader。构造了一个 RecordReaderImplementation。并且在 ColumnReaderImpl 的 checkRead() 方法里开始读 Page,解压缩 page。
  • 读取一行数据

    • 根据 Definition Level 和 Repetition Level 的计算方法构造出一行一行 Record 来,这一行 Record 如果为 Null,就是不满足条件,继续构造 Record。在过滤的时候一直维护一个 IncrementallyUpdatedFilterPredicate,这个谓词里记录了当前 record 是否满足条件,在 IncrementallyUpdatedFilterPredicateBuilder 里根据新的数据更新这个谓词的状态。

查询代码

  • 依赖
	<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.7</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.0</version>
    </dependency>
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

Configuration conf = new Configuration();
    ParquetInputFormat.setFilterPredicate(conf,
        and(gtEq(longColumn("long_column"), startTime),
            ltEq(longColumn("long_column"), endTime)));
    FilterCompat.Filter filter = ParquetInputFormat.getFilter(conf);

    Types.MessageTypeBuilder builder = Types.buildMessage();
    builder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "long_column"));
    builder.addField(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.DOUBLE, "double_column"));

    MessageType querySchema = builder.named("default_schema_name");
    conf.set(ReadSupport.PARQUET_READ_SCHEMA, querySchema.toString());

    // set reader
    ParquetReader.Builder<Group> reader= ParquetReader
            .builder(new GroupReadSupport(), new Path("file_path"))
            .withConf(conf)
            .withFilter(filter);

    ParquetReader<Group> build;
    int result = 0;
    try {
      build = reader.build();
      Group line;
      while((line=build.read())!=null) {
        result++;
      }
    } catch (IOException e) {
      e.printStackTrace();
    }

猜你喜欢

转载自blog.csdn.net/qiaojialin/article/details/90245209