From 10851f38c5a0a700691b87f83fb32978d9dd052e Mon Sep 17 00:00:00 2001 From: fys Date: Wed, 1 Apr 2026 10:42:02 +0800 Subject: [PATCH 01/12] feat: add parquet nested leaf projection --- src/mito2/src/sst/parquet/format.rs | 171 ++++++++++++++++++++++++++++ src/mito2/src/sst/parquet/reader.rs | 22 +++- 2 files changed, 188 insertions(+), 5 deletions(-) diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index ba64eac78b39..e865417fcd50 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -46,6 +46,7 @@ use mito_codec::row_converter::{ }; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::statistics::Statistics; +use parquet::schema::types::SchemaDescriptor; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::storage::{ColumnId, SequenceNumber}; @@ -133,6 +134,65 @@ pub enum ReadFormat { Flat(FlatReadFormat), } +/// A nested field access path inside one parquet root column. +pub type ParquetPath = Vec; + +/// Projection mapped onto a parquet schema. +/// +/// Each projected parquet column may optionally carry nested-path pruning +/// information. If `nested_paths` is empty, the whole root column is read. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ParquetProjection { + pub cols: Vec, +} + +/// Projection for a single root column in the parquet schema. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ParquetColumnProjection { + /// Root field index in the parquet schema. + pub root_index: usize, + /// Nested paths to read under this root column. + /// + /// Each path includes the root column itself. For example, for a root + /// column `j`, path `["j", "a", "b"]` refers to `j.a.b`. + /// + /// If empty, the whole root column is read. + pub nested_paths: Vec, +} + +/// Builds parquet leaf-column indices from a parquet projection. +pub fn build_parquet_leaves_indices( + parquet_schema_desc: &SchemaDescriptor, + projection: &ParquetProjection, +) -> Vec { + let mut map = HashMap::new(); + for col in &projection.cols { + map.insert(col.root_index, &col.nested_paths); + } + + let mut leaf_indices = Vec::new(); + for (leaf_idx, leaf_col) in parquet_schema_desc.columns().iter().enumerate() { + let root_idx = parquet_schema_desc.get_column_root_idx(leaf_idx); + let Some(nested_paths) = map.get(&root_idx) else { + continue; + }; + if nested_paths.is_empty() { + leaf_indices.push(leaf_idx); + continue; + } + + let leaf_path = leaf_col.path().parts(); + if nested_paths + .iter() + .any(|nested_path| leaf_path.starts_with(nested_path)) + { + leaf_indices.push(leaf_idx); + } + } + + leaf_indices +} + impl ReadFormat { /// Creates a helper to read the primary key format. pub fn new_primary_key( @@ -1000,6 +1060,8 @@ mod tests { use mito_codec::row_converter::{ DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec, }; + use parquet::basic::Repetition; + use parquet::schema::types::Type; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; @@ -1392,6 +1454,115 @@ mod tests { assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices()); } + #[test] + fn test_build_parquet_leaves_indices_reads_whole_root() { + let leaf_a = Arc::new( + Type::primitive_type_builder("a", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let leaf_b = Arc::new( + Type::primitive_type_builder("b", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let root_j = Arc::new( + Type::group_type_builder("j") + .with_repetition(Repetition::REQUIRED) + .with_fields(vec![leaf_a, leaf_b]) + .build() + .unwrap(), + ); + let schema = Arc::new( + Type::group_type_builder("schema") + .with_fields(vec![root_j]) + .build() + .unwrap(), + ); + let parquet_schema_desc = SchemaDescriptor::new(schema); + + let projection = ParquetProjection { + cols: vec![ParquetColumnProjection { + root_index: 0, + nested_paths: vec![], + }], + }; + + assert_eq!( + vec![0, 1], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + #[test] + fn test_build_parquet_leaves_indices_filters_nested_paths() { + let leaf_a = Arc::new( + Type::primitive_type_builder("a", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let leaf_c = Arc::new( + Type::primitive_type_builder("c", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let leaf_d = Arc::new( + Type::primitive_type_builder("d", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let group_b = Arc::new( + Type::group_type_builder("b") + .with_repetition(Repetition::REQUIRED) + .with_fields(vec![leaf_c, leaf_d]) + .build() + .unwrap(), + ); + let root_j = Arc::new( + Type::group_type_builder("j") + .with_repetition(Repetition::REQUIRED) + .with_fields(vec![leaf_a, group_b]) + .build() + .unwrap(), + ); + let root_k = Arc::new( + Type::primitive_type_builder("k", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let schema = Arc::new( + Type::group_type_builder("schema") + .with_fields(vec![root_j, root_k]) + .build() + .unwrap(), + ); + let parquet_schema_desc = SchemaDescriptor::new(schema); + + let projection = ParquetProjection { + cols: vec![ + ParquetColumnProjection { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "b".to_string()]], + }, + ParquetColumnProjection { + root_index: 1, + nested_paths: vec![], + }, + ], + }; + + assert_eq!( + vec![1, 2, 3], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + #[test] fn test_flat_read_format_convert_batch() { let metadata = build_test_region_metadata(); diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 8832cd4a1656..10b50c197078 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -74,7 +74,10 @@ use crate::sst::parquet::file_range::{ FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase, row_group_contains_delete, }; -use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; +use crate::sst::parquet::format::{ + ParquetColumnProjection, ParquetProjection, ReadFormat, build_parquet_leaves_indices, + need_override_sequence, +}; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::prefilter::{ PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter, @@ -420,10 +423,19 @@ impl ParquetReaderBuilder { // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); - let indices = read_format.projection_indices(); - // Now we assumes we don't have nested schemas. - // TODO(yingwen): Revisit this if we introduce nested types such as JSON type. - let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied()); + let parquet_projection = ParquetProjection { + cols: read_format + .projection_indices() + .iter() + .map(|index| ParquetColumnProjection { + root_index: *index, + nested_paths: vec![], + }) + .collect(), + }; + let leaf_indices = build_parquet_leaves_indices(parquet_schema_desc, &parquet_projection); + let projection_mask = + ProjectionMask::leaves(parquet_schema_desc, leaf_indices.iter().copied()); let selection = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await; From 1785e952d2c2317c53a53f26fc4273ac9b289dd8 Mon Sep 17 00:00:00 2001 From: fys Date: Wed, 1 Apr 2026 10:56:32 +0800 Subject: [PATCH 02/12] rename ParquetProjection related struct --- src/mito2/src/sst/parquet.rs | 1 + src/mito2/src/sst/parquet/format.rs | 171 ------------------ src/mito2/src/sst/parquet/read_columns.rs | 206 ++++++++++++++++++++++ src/mito2/src/sst/parquet/reader.rs | 18 +- 4 files changed, 212 insertions(+), 184 deletions(-) create mode 100644 src/mito2/src/sst/parquet/read_columns.rs diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 4a3466a29cef..7eec8806f750 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -31,6 +31,7 @@ pub mod format; pub(crate) mod helper; pub(crate) mod metadata; pub mod prefilter; +pub mod read_columns; pub mod reader; pub mod row_group; pub mod row_selection; diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index e865417fcd50..ba64eac78b39 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -46,7 +46,6 @@ use mito_codec::row_converter::{ }; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::statistics::Statistics; -use parquet::schema::types::SchemaDescriptor; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::storage::{ColumnId, SequenceNumber}; @@ -134,65 +133,6 @@ pub enum ReadFormat { Flat(FlatReadFormat), } -/// A nested field access path inside one parquet root column. -pub type ParquetPath = Vec; - -/// Projection mapped onto a parquet schema. -/// -/// Each projected parquet column may optionally carry nested-path pruning -/// information. If `nested_paths` is empty, the whole root column is read. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ParquetProjection { - pub cols: Vec, -} - -/// Projection for a single root column in the parquet schema. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ParquetColumnProjection { - /// Root field index in the parquet schema. - pub root_index: usize, - /// Nested paths to read under this root column. - /// - /// Each path includes the root column itself. For example, for a root - /// column `j`, path `["j", "a", "b"]` refers to `j.a.b`. - /// - /// If empty, the whole root column is read. - pub nested_paths: Vec, -} - -/// Builds parquet leaf-column indices from a parquet projection. -pub fn build_parquet_leaves_indices( - parquet_schema_desc: &SchemaDescriptor, - projection: &ParquetProjection, -) -> Vec { - let mut map = HashMap::new(); - for col in &projection.cols { - map.insert(col.root_index, &col.nested_paths); - } - - let mut leaf_indices = Vec::new(); - for (leaf_idx, leaf_col) in parquet_schema_desc.columns().iter().enumerate() { - let root_idx = parquet_schema_desc.get_column_root_idx(leaf_idx); - let Some(nested_paths) = map.get(&root_idx) else { - continue; - }; - if nested_paths.is_empty() { - leaf_indices.push(leaf_idx); - continue; - } - - let leaf_path = leaf_col.path().parts(); - if nested_paths - .iter() - .any(|nested_path| leaf_path.starts_with(nested_path)) - { - leaf_indices.push(leaf_idx); - } - } - - leaf_indices -} - impl ReadFormat { /// Creates a helper to read the primary key format. pub fn new_primary_key( @@ -1060,8 +1000,6 @@ mod tests { use mito_codec::row_converter::{ DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec, }; - use parquet::basic::Repetition; - use parquet::schema::types::Type; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; @@ -1454,115 +1392,6 @@ mod tests { assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices()); } - #[test] - fn test_build_parquet_leaves_indices_reads_whole_root() { - let leaf_a = Arc::new( - Type::primitive_type_builder("a", parquet::basic::Type::INT64) - .with_repetition(Repetition::REQUIRED) - .build() - .unwrap(), - ); - let leaf_b = Arc::new( - Type::primitive_type_builder("b", parquet::basic::Type::INT64) - .with_repetition(Repetition::REQUIRED) - .build() - .unwrap(), - ); - let root_j = Arc::new( - Type::group_type_builder("j") - .with_repetition(Repetition::REQUIRED) - .with_fields(vec![leaf_a, leaf_b]) - .build() - .unwrap(), - ); - let schema = Arc::new( - Type::group_type_builder("schema") - .with_fields(vec![root_j]) - .build() - .unwrap(), - ); - let parquet_schema_desc = SchemaDescriptor::new(schema); - - let projection = ParquetProjection { - cols: vec![ParquetColumnProjection { - root_index: 0, - nested_paths: vec![], - }], - }; - - assert_eq!( - vec![0, 1], - build_parquet_leaves_indices(&parquet_schema_desc, &projection) - ); - } - - #[test] - fn test_build_parquet_leaves_indices_filters_nested_paths() { - let leaf_a = Arc::new( - Type::primitive_type_builder("a", parquet::basic::Type::INT64) - .with_repetition(Repetition::REQUIRED) - .build() - .unwrap(), - ); - let leaf_c = Arc::new( - Type::primitive_type_builder("c", parquet::basic::Type::INT64) - .with_repetition(Repetition::REQUIRED) - .build() - .unwrap(), - ); - let leaf_d = Arc::new( - Type::primitive_type_builder("d", parquet::basic::Type::INT64) - .with_repetition(Repetition::REQUIRED) - .build() - .unwrap(), - ); - let group_b = Arc::new( - Type::group_type_builder("b") - .with_repetition(Repetition::REQUIRED) - .with_fields(vec![leaf_c, leaf_d]) - .build() - .unwrap(), - ); - let root_j = Arc::new( - Type::group_type_builder("j") - .with_repetition(Repetition::REQUIRED) - .with_fields(vec![leaf_a, group_b]) - .build() - .unwrap(), - ); - let root_k = Arc::new( - Type::primitive_type_builder("k", parquet::basic::Type::INT64) - .with_repetition(Repetition::REQUIRED) - .build() - .unwrap(), - ); - let schema = Arc::new( - Type::group_type_builder("schema") - .with_fields(vec![root_j, root_k]) - .build() - .unwrap(), - ); - let parquet_schema_desc = SchemaDescriptor::new(schema); - - let projection = ParquetProjection { - cols: vec![ - ParquetColumnProjection { - root_index: 0, - nested_paths: vec![vec!["j".to_string(), "b".to_string()]], - }, - ParquetColumnProjection { - root_index: 1, - nested_paths: vec![], - }, - ], - }; - - assert_eq!( - vec![1, 2, 3], - build_parquet_leaves_indices(&parquet_schema_desc, &projection) - ); - } - #[test] fn test_flat_read_format_convert_batch() { let metadata = build_test_region_metadata(); diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs new file mode 100644 index 000000000000..28ccfdc7cb12 --- /dev/null +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -0,0 +1,206 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use parquet::schema::types::SchemaDescriptor; + +/// A nested field access path inside one parquet root column. +pub type ParquetNestedPath = Vec; + +/// Logical parquet columns to read. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ParquetReadColumns { + pub cols: Vec, +} + +/// Read requirement for a single parquet root column. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ParquetReadColumn { + /// Root field index in the parquet schema. + pub root_index: usize, + /// Nested paths to read under this root column. + /// + /// Each path includes the root column itself. For example, for a root + /// column `j`, path `["j", "a", "b"]` refers to `j.a.b`. + /// + /// If empty, the whole root column is read. + pub nested_paths: Vec, +} + +impl ParquetReadColumns { + pub fn from_root_indices(root_indices: impl IntoIterator) -> Self { + Self { + cols: root_indices + .into_iter() + .map(|root_index| ParquetReadColumn { + root_index, + nested_paths: vec![], + }) + .collect(), + } + } +} + +/// Builds parquet leaf-column indices from parquet read columns. +pub fn build_parquet_leaves_indices( + parquet_schema_desc: &SchemaDescriptor, + projection: &ParquetReadColumns, +) -> Vec { + let mut map = HashMap::new(); + for col in &projection.cols { + map.insert(col.root_index, &col.nested_paths); + } + + let mut leaf_indices = Vec::new(); + for (leaf_idx, leaf_col) in parquet_schema_desc.columns().iter().enumerate() { + let root_idx = parquet_schema_desc.get_column_root_idx(leaf_idx); + let Some(nested_paths) = map.get(&root_idx) else { + continue; + }; + if nested_paths.is_empty() { + leaf_indices.push(leaf_idx); + continue; + } + + let leaf_path = leaf_col.path().parts(); + if nested_paths + .iter() + .any(|nested_path| leaf_path.starts_with(nested_path)) + { + leaf_indices.push(leaf_idx); + } + } + + leaf_indices +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use parquet::basic::Repetition; + use parquet::schema::types::Type; + + use super::*; + + #[test] + fn test_build_parquet_leaves_indices_reads_whole_root() { + let leaf_a = Arc::new( + Type::primitive_type_builder("a", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let leaf_b = Arc::new( + Type::primitive_type_builder("b", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let root_j = Arc::new( + Type::group_type_builder("j") + .with_repetition(Repetition::REQUIRED) + .with_fields(vec![leaf_a, leaf_b]) + .build() + .unwrap(), + ); + let schema = Arc::new( + Type::group_type_builder("schema") + .with_fields(vec![root_j]) + .build() + .unwrap(), + ); + let parquet_schema_desc = SchemaDescriptor::new(schema); + + let projection = ParquetReadColumns { + cols: vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![], + }], + }; + + assert_eq!( + vec![0, 1], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + #[test] + fn test_build_parquet_leaves_indices_filters_nested_paths() { + let leaf_a = Arc::new( + Type::primitive_type_builder("a", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let leaf_c = Arc::new( + Type::primitive_type_builder("c", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let leaf_d = Arc::new( + Type::primitive_type_builder("d", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let group_b = Arc::new( + Type::group_type_builder("b") + .with_repetition(Repetition::REQUIRED) + .with_fields(vec![leaf_c, leaf_d]) + .build() + .unwrap(), + ); + let root_j = Arc::new( + Type::group_type_builder("j") + .with_repetition(Repetition::REQUIRED) + .with_fields(vec![leaf_a, group_b]) + .build() + .unwrap(), + ); + let root_k = Arc::new( + Type::primitive_type_builder("k", parquet::basic::Type::INT64) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ); + let schema = Arc::new( + Type::group_type_builder("schema") + .with_fields(vec![root_j, root_k]) + .build() + .unwrap(), + ); + let parquet_schema_desc = SchemaDescriptor::new(schema); + + let projection = ParquetReadColumns { + cols: vec![ + ParquetReadColumn { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "b".to_string()]], + }, + ParquetReadColumn { + root_index: 1, + nested_paths: vec![], + }, + ], + }; + + assert_eq!( + vec![1, 2, 3], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 10b50c197078..b5bb20de53e0 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -74,9 +74,9 @@ use crate::sst::parquet::file_range::{ FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase, row_group_contains_delete, }; -use crate::sst::parquet::format::{ - ParquetColumnProjection, ParquetProjection, ReadFormat, build_parquet_leaves_indices, - need_override_sequence, +use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; +use crate::sst::parquet::read_columns::{ + ParquetReadColumns, build_parquet_leaves_indices, }; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::prefilter::{ @@ -423,16 +423,8 @@ impl ParquetReaderBuilder { // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); - let parquet_projection = ParquetProjection { - cols: read_format - .projection_indices() - .iter() - .map(|index| ParquetColumnProjection { - root_index: *index, - nested_paths: vec![], - }) - .collect(), - }; + let parquet_projection = + ParquetReadColumns::from_root_indices(read_format.projection_indices().iter().copied()); let leaf_indices = build_parquet_leaves_indices(parquet_schema_desc, &parquet_projection); let projection_mask = ProjectionMask::leaves(parquet_schema_desc, leaf_indices.iter().copied()); From f6b5f1624aa2c914045d6fbefdd79b315f8031c1 Mon Sep 17 00:00:00 2001 From: fys Date: Wed, 1 Apr 2026 11:46:47 +0800 Subject: [PATCH 03/12] add some apis --- src/mito2/src/sst/parquet/read_columns.rs | 78 +++++++++++++++++++---- src/mito2/src/sst/parquet/reader.rs | 4 +- 2 files changed, 65 insertions(+), 17 deletions(-) diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs index 28ccfdc7cb12..2b22a06476c5 100644 --- a/src/mito2/src/sst/parquet/read_columns.rs +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -19,38 +19,89 @@ use parquet::schema::types::SchemaDescriptor; /// A nested field access path inside one parquet root column. pub type ParquetNestedPath = Vec; -/// Logical parquet columns to read. +/// The parquet columns to read. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ParquetReadColumns { - pub cols: Vec, + cols: Vec, +} + +impl Default for ParquetReadColumns { + fn default() -> Self { + Self::new() + } +} + +impl ParquetReadColumns { + pub fn new() -> ParquetReadColumns { + ParquetReadColumns { cols: vec![] } + } + + /// # Safety + /// + /// The caller must ensure `read_col.root_index()` is unique within `self`. + pub unsafe fn push_cols(&mut self, read_col: ParquetReadColumn) { + self.cols.push(read_col); + } + + pub fn from_root_indices(root_indices: impl IntoIterator) -> Self { + let cols = root_indices + .into_iter() + .map(ParquetReadColumn::new) + .collect(); + Self { cols } + } + + pub fn columns(&self) -> &[ParquetReadColumn] { + &self.cols + } } /// Read requirement for a single parquet root column. +/// +/// `root_index` identifies the root column in the parquet schema. +/// +/// If `nested_paths` is empty, the whole root column is read. Otherwise, only +/// leaves under the specified nested paths are read. +/// +/// To construct a [`ParquetReadColumn`]: +/// - `ParquetReadColumn::new(0)` reads the whole root column at index `0`. +/// - `ParquetReadColumn::new(0).with_nested_paths(vec![vec!["j".into(), "b".into()]])` +/// reads only leaves under `j.b`. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ParquetReadColumn { /// Root field index in the parquet schema. - pub root_index: usize, + root_index: usize, /// Nested paths to read under this root column. /// /// Each path includes the root column itself. For example, for a root /// column `j`, path `["j", "a", "b"]` refers to `j.a.b`. /// /// If empty, the whole root column is read. - pub nested_paths: Vec, + nested_paths: Vec, } -impl ParquetReadColumns { - pub fn from_root_indices(root_indices: impl IntoIterator) -> Self { +impl ParquetReadColumn { + pub fn new(root_index: usize) -> Self { Self { - cols: root_indices - .into_iter() - .map(|root_index| ParquetReadColumn { - root_index, - nested_paths: vec![], - }) - .collect(), + root_index, + nested_paths: vec![], } } + + pub fn with_nested_paths(self, nested_paths: Vec) -> Self { + Self { + nested_paths, + ..self + } + } + + pub fn root_index(&self) -> usize { + self.root_index + } + + pub fn nested_paths(&self) -> &[ParquetNestedPath] { + &self.nested_paths + } } /// Builds parquet leaf-column indices from parquet read columns. @@ -82,7 +133,6 @@ pub fn build_parquet_leaves_indices( leaf_indices.push(leaf_idx); } } - leaf_indices } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index b5bb20de53e0..8c63c247344f 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -75,13 +75,11 @@ use crate::sst::parquet::file_range::{ row_group_contains_delete, }; use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; -use crate::sst::parquet::read_columns::{ - ParquetReadColumns, build_parquet_leaves_indices, -}; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::prefilter::{ PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter, }; +use crate::sst::parquet::read_columns::{ParquetReadColumns, build_parquet_leaves_indices}; use crate::sst::parquet::row_group::ParquetFetchMetrics; use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; From fd974aff5e55133e1406eee4f4b367f62bfc6114 Mon Sep 17 00:00:00 2001 From: fys Date: Wed, 1 Apr 2026 11:52:09 +0800 Subject: [PATCH 04/12] extract common build schema function for test --- src/mito2/src/sst/parquet/read_columns.rs | 70 ++++++++--------------- 1 file changed, 25 insertions(+), 45 deletions(-) diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs index 2b22a06476c5..0723329be0db 100644 --- a/src/mito2/src/sst/parquet/read_columns.rs +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -145,50 +145,7 @@ mod tests { use super::*; - #[test] - fn test_build_parquet_leaves_indices_reads_whole_root() { - let leaf_a = Arc::new( - Type::primitive_type_builder("a", parquet::basic::Type::INT64) - .with_repetition(Repetition::REQUIRED) - .build() - .unwrap(), - ); - let leaf_b = Arc::new( - Type::primitive_type_builder("b", parquet::basic::Type::INT64) - .with_repetition(Repetition::REQUIRED) - .build() - .unwrap(), - ); - let root_j = Arc::new( - Type::group_type_builder("j") - .with_repetition(Repetition::REQUIRED) - .with_fields(vec![leaf_a, leaf_b]) - .build() - .unwrap(), - ); - let schema = Arc::new( - Type::group_type_builder("schema") - .with_fields(vec![root_j]) - .build() - .unwrap(), - ); - let parquet_schema_desc = SchemaDescriptor::new(schema); - - let projection = ParquetReadColumns { - cols: vec![ParquetReadColumn { - root_index: 0, - nested_paths: vec![], - }], - }; - - assert_eq!( - vec![0, 1], - build_parquet_leaves_indices(&parquet_schema_desc, &projection) - ); - } - - #[test] - fn test_build_parquet_leaves_indices_filters_nested_paths() { + fn build_test_nested_parquet_schema() -> SchemaDescriptor { let leaf_a = Arc::new( Type::primitive_type_builder("a", parquet::basic::Type::INT64) .with_repetition(Repetition::REQUIRED) @@ -233,7 +190,30 @@ mod tests { .build() .unwrap(), ); - let parquet_schema_desc = SchemaDescriptor::new(schema); + + SchemaDescriptor::new(schema) + } + + #[test] + fn test_build_parquet_leaves_indices_reads_whole_root() { + let parquet_schema_desc = build_test_nested_parquet_schema(); + + let projection = ParquetReadColumns { + cols: vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![], + }], + }; + + assert_eq!( + vec![0, 1, 2], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + #[test] + fn test_build_parquet_leaves_indices_filters_nested_paths() { + let parquet_schema_desc = build_test_nested_parquet_schema(); let projection = ParquetReadColumns { cols: vec![ From 956a6710d19ad628711a357177bd1f0a6b7faf6b Mon Sep 17 00:00:00 2001 From: fys Date: Wed, 1 Apr 2026 12:14:55 +0800 Subject: [PATCH 05/12] remove unsed method --- src/mito2/src/sst/parquet/read_columns.rs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs index 0723329be0db..18efc4ca110c 100644 --- a/src/mito2/src/sst/parquet/read_columns.rs +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -25,24 +25,7 @@ pub struct ParquetReadColumns { cols: Vec, } -impl Default for ParquetReadColumns { - fn default() -> Self { - Self::new() - } -} - impl ParquetReadColumns { - pub fn new() -> ParquetReadColumns { - ParquetReadColumns { cols: vec![] } - } - - /// # Safety - /// - /// The caller must ensure `read_col.root_index()` is unique within `self`. - pub unsafe fn push_cols(&mut self, read_col: ParquetReadColumn) { - self.cols.push(read_col); - } - pub fn from_root_indices(root_indices: impl IntoIterator) -> Self { let cols = root_indices .into_iter() From 1dd2e19a8289714225b0e6ac764edb9d39ef653d Mon Sep 17 00:00:00 2001 From: fys Date: Wed, 1 Apr 2026 18:43:05 +0800 Subject: [PATCH 06/12] keep only deduped parquet root projection constructor --- src/mito2/src/sst/parquet/read_columns.rs | 6 +++++- src/mito2/src/sst/parquet/reader.rs | 5 +++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs index 18efc4ca110c..b054c562cb52 100644 --- a/src/mito2/src/sst/parquet/read_columns.rs +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -26,7 +26,11 @@ pub struct ParquetReadColumns { } impl ParquetReadColumns { - pub fn from_root_indices(root_indices: impl IntoIterator) -> Self { + /// Builds root-column projections from root indices that are already + /// deduplicated. + /// + /// Note: this constructor does not check for duplicates. + pub fn from_deduped_root_indices(root_indices: impl IntoIterator) -> Self { let cols = root_indices .into_iter() .map(ParquetReadColumn::new) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 8c63c247344f..92f187e4d02b 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -421,8 +421,9 @@ impl ParquetReaderBuilder { // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); - let parquet_projection = - ParquetReadColumns::from_root_indices(read_format.projection_indices().iter().copied()); + let parquet_projection = ParquetReadColumns::from_deduped_root_indices( + read_format.projection_indices().iter().copied(), + ); let leaf_indices = build_parquet_leaves_indices(parquet_schema_desc, &parquet_projection); let projection_mask = ProjectionMask::leaves(parquet_schema_desc, leaf_indices.iter().copied()); From 224d8f18024d5507c8f505ad77edcdad95c2dcc8 Mon Sep 17 00:00:00 2001 From: fys Date: Thu, 2 Apr 2026 10:51:47 +0800 Subject: [PATCH 07/12] add more unit tests --- src/mito2/src/sst/parquet/read_columns.rs | 142 ++++++++++++++++------ 1 file changed, 102 insertions(+), 40 deletions(-) diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs index b054c562cb52..79ad771d0b83 100644 --- a/src/mito2/src/sst/parquet/read_columns.rs +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -132,6 +132,108 @@ mod tests { use super::*; + #[test] + fn test_reads_whole_root() { + let parquet_schema_desc = build_test_nested_parquet_schema(); + + let projection = ParquetReadColumns { + cols: vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![], + }], + }; + + assert_eq!( + vec![0, 1, 2], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + #[test] + fn test_filters_nested_paths() { + let parquet_schema_desc = build_test_nested_parquet_schema(); + + let projection = ParquetReadColumns { + cols: vec![ + ParquetReadColumn { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "b".to_string()]], + }, + ParquetReadColumn { + root_index: 1, + nested_paths: vec![], + }, + ], + }; + + assert_eq!( + vec![1, 2, 3], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + #[test] + fn test_reads_middle_level_path() { + let parquet_schema_desc = build_test_nested_parquet_schema(); + + let projection = ParquetReadColumns { + cols: vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "b".to_string()]], + }], + }; + + assert_eq!( + vec![1, 2], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + #[test] + fn test_reads_leaf_level_path() { + let parquet_schema_desc = build_test_nested_parquet_schema(); + + let projection = ParquetReadColumns { + cols: vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "b".to_string(), "c".to_string()]], + }], + }; + + assert_eq!( + vec![1], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + #[test] + fn test_merges_mixed_paths() { + let parquet_schema_desc = build_test_nested_parquet_schema(); + + let projection = ParquetReadColumns { + cols: vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![ + vec!["j".to_string(), "a".to_string()], + vec!["j".to_string(), "b".to_string(), "d".to_string()], + ], + }], + }; + + assert_eq!( + vec![0, 2], + build_parquet_leaves_indices(&parquet_schema_desc, &projection) + ); + } + + // Test schema: + // schema + // |- j + // | |- a: INT64 + // | `- b + // | |- c: INT64 + // | `- d: INT64 + // `- k: INT64 fn build_test_nested_parquet_schema() -> SchemaDescriptor { let leaf_a = Arc::new( Type::primitive_type_builder("a", parquet::basic::Type::INT64) @@ -180,44 +282,4 @@ mod tests { SchemaDescriptor::new(schema) } - - #[test] - fn test_build_parquet_leaves_indices_reads_whole_root() { - let parquet_schema_desc = build_test_nested_parquet_schema(); - - let projection = ParquetReadColumns { - cols: vec![ParquetReadColumn { - root_index: 0, - nested_paths: vec![], - }], - }; - - assert_eq!( - vec![0, 1, 2], - build_parquet_leaves_indices(&parquet_schema_desc, &projection) - ); - } - - #[test] - fn test_build_parquet_leaves_indices_filters_nested_paths() { - let parquet_schema_desc = build_test_nested_parquet_schema(); - - let projection = ParquetReadColumns { - cols: vec![ - ParquetReadColumn { - root_index: 0, - nested_paths: vec![vec!["j".to_string(), "b".to_string()]], - }, - ParquetReadColumn { - root_index: 1, - nested_paths: vec![], - }, - ], - }; - - assert_eq!( - vec![1, 2, 3], - build_parquet_leaves_indices(&parquet_schema_desc, &projection) - ); - } } From f9346a62ee6e4a504e19bcd1c942430d90f5591b Mon Sep 17 00:00:00 2001 From: fys Date: Thu, 2 Apr 2026 10:54:03 +0800 Subject: [PATCH 08/12] fix: typo --- src/catalog/src/process_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/catalog/src/process_manager.rs b/src/catalog/src/process_manager.rs index 1eebfec627fe..8796d948e1e1 100644 --- a/src/catalog/src/process_manager.rs +++ b/src/catalog/src/process_manager.rs @@ -395,7 +395,7 @@ impl SlowQueryTimer { impl Drop for SlowQueryTimer { fn drop(&mut self) { - // Calculate the elaspsed duration since the timer is created. + // Calculate the elapsed duration since the timer is created. let elapsed = self.start.elapsed(); if elapsed > self.threshold { // Only capture a portion of slow queries based on sample_ratio. From 88902fa6dee155912d2cc406c99963e6ef98bb42 Mon Sep 17 00:00:00 2001 From: fys Date: Thu, 2 Apr 2026 14:45:49 +0800 Subject: [PATCH 09/12] fix: cr --- src/mito2/src/sst/parquet/read_columns.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs index 79ad771d0b83..9764ef7ebf8d 100644 --- a/src/mito2/src/sst/parquet/read_columns.rs +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -96,7 +96,7 @@ pub fn build_parquet_leaves_indices( parquet_schema_desc: &SchemaDescriptor, projection: &ParquetReadColumns, ) -> Vec { - let mut map = HashMap::new(); + let mut map = HashMap::with_capacity(projection.cols.len()); for col in &projection.cols { map.insert(col.root_index, &col.nested_paths); } From 551ddecbbfbffe6046a688ed9bc0115cd7f936d9 Mon Sep 17 00:00:00 2001 From: fys Date: Tue, 7 Apr 2026 17:43:36 +0800 Subject: [PATCH 10/12] fast-path parquet root projection without nested fields --- src/mito2/src/sst/parquet/read_columns.rs | 19 ++++++++++++++++++- src/mito2/src/sst/parquet/reader.rs | 11 ++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs index 9764ef7ebf8d..65fdd63fd770 100644 --- a/src/mito2/src/sst/parquet/read_columns.rs +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -23,6 +23,7 @@ pub type ParquetNestedPath = Vec; #[derive(Debug, Clone, PartialEq, Eq)] pub struct ParquetReadColumns { cols: Vec, + has_nested: bool, } impl ParquetReadColumns { @@ -35,12 +36,23 @@ impl ParquetReadColumns { .into_iter() .map(ParquetReadColumn::new) .collect(); - Self { cols } + Self { + cols, + has_nested: false, + } } pub fn columns(&self) -> &[ParquetReadColumn] { &self.cols } + + pub fn has_nested(&self) -> bool { + self.has_nested + } + + pub fn root_indices_iter(&self) -> impl Iterator + '_ { + self.cols.iter().map(|col| col.root_index) + } } /// Read requirement for a single parquet root column. @@ -141,6 +153,7 @@ mod tests { root_index: 0, nested_paths: vec![], }], + has_nested: false, }; assert_eq!( @@ -164,6 +177,7 @@ mod tests { nested_paths: vec![], }, ], + has_nested: true, }; assert_eq!( @@ -181,6 +195,7 @@ mod tests { root_index: 0, nested_paths: vec![vec!["j".to_string(), "b".to_string()]], }], + has_nested: true, }; assert_eq!( @@ -198,6 +213,7 @@ mod tests { root_index: 0, nested_paths: vec![vec!["j".to_string(), "b".to_string(), "c".to_string()]], }], + has_nested: true, }; assert_eq!( @@ -218,6 +234,7 @@ mod tests { vec!["j".to_string(), "b".to_string(), "d".to_string()], ], }], + has_nested: true, }; assert_eq!( diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 92f187e4d02b..ebecef1a8297 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -424,9 +424,14 @@ impl ParquetReaderBuilder { let parquet_projection = ParquetReadColumns::from_deduped_root_indices( read_format.projection_indices().iter().copied(), ); - let leaf_indices = build_parquet_leaves_indices(parquet_schema_desc, &parquet_projection); - let projection_mask = - ProjectionMask::leaves(parquet_schema_desc, leaf_indices.iter().copied()); + + let projection_mask = if parquet_projection.has_nested() { + let leaf_indices = + build_parquet_leaves_indices(parquet_schema_desc, &parquet_projection); + ProjectionMask::leaves(parquet_schema_desc, leaf_indices.iter().copied()) + } else { + ProjectionMask::roots(parquet_schema_desc, parquet_projection.root_indices_iter()) + }; let selection = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await; From 0b3a11d07f34c886a5bc53b63c9affe4ac3ac5eb Mon Sep 17 00:00:00 2001 From: fys Date: Fri, 10 Apr 2026 17:37:52 +0800 Subject: [PATCH 11/12] extract a build_projection_mask method --- src/mito2/src/sst/parquet/read_columns.rs | 16 +++++++++++++++- src/mito2/src/sst/parquet/reader.rs | 12 +++--------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs index 65fdd63fd770..5405dbcc4b8d 100644 --- a/src/mito2/src/sst/parquet/read_columns.rs +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use parquet::arrow::ProjectionMask; use parquet::schema::types::SchemaDescriptor; /// A nested field access path inside one parquet root column. @@ -103,8 +104,21 @@ impl ParquetReadColumn { } } +/// Builds a projection mask from parquet read columns. +pub fn build_projection_mask( + parquet_read_cols: &ParquetReadColumns, + parquet_schema_desc: &SchemaDescriptor, +) -> ProjectionMask { + if parquet_read_cols.has_nested() { + let leaf_indices = build_parquet_leaves_indices(parquet_schema_desc, &parquet_read_cols); + ProjectionMask::leaves(parquet_schema_desc, leaf_indices.into_iter()) + } else { + ProjectionMask::roots(parquet_schema_desc, parquet_read_cols.root_indices_iter()) + } +} + /// Builds parquet leaf-column indices from parquet read columns. -pub fn build_parquet_leaves_indices( +fn build_parquet_leaves_indices( parquet_schema_desc: &SchemaDescriptor, projection: &ParquetReadColumns, ) -> Vec { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index ebecef1a8297..b359d01ea4a8 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -79,7 +79,7 @@ use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::prefilter::{ PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter, }; -use crate::sst::parquet::read_columns::{ParquetReadColumns, build_parquet_leaves_indices}; +use crate::sst::parquet::read_columns::{ParquetReadColumns, build_projection_mask}; use crate::sst::parquet::row_group::ParquetFetchMetrics; use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -421,17 +421,11 @@ impl ParquetReaderBuilder { // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); - let parquet_projection = ParquetReadColumns::from_deduped_root_indices( + let parquet_read_cols = ParquetReadColumns::from_deduped_root_indices( read_format.projection_indices().iter().copied(), ); - let projection_mask = if parquet_projection.has_nested() { - let leaf_indices = - build_parquet_leaves_indices(parquet_schema_desc, &parquet_projection); - ProjectionMask::leaves(parquet_schema_desc, leaf_indices.iter().copied()) - } else { - ProjectionMask::roots(parquet_schema_desc, parquet_projection.root_indices_iter()) - }; + let projection_mask = build_projection_mask(&parquet_read_cols, &parquet_schema_desc); let selection = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await; From c735f7cbed2894672211dd4834f0fed020d30efc Mon Sep 17 00:00:00 2001 From: fys Date: Fri, 10 Apr 2026 18:01:35 +0800 Subject: [PATCH 12/12] fix: cargo clippy --- src/mito2/src/sst/parquet/read_columns.rs | 4 ++-- src/mito2/src/sst/parquet/reader.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs index 5405dbcc4b8d..f0f35a4099c4 100644 --- a/src/mito2/src/sst/parquet/read_columns.rs +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -110,8 +110,8 @@ pub fn build_projection_mask( parquet_schema_desc: &SchemaDescriptor, ) -> ProjectionMask { if parquet_read_cols.has_nested() { - let leaf_indices = build_parquet_leaves_indices(parquet_schema_desc, &parquet_read_cols); - ProjectionMask::leaves(parquet_schema_desc, leaf_indices.into_iter()) + let leaf_indices = build_parquet_leaves_indices(parquet_schema_desc, parquet_read_cols); + ProjectionMask::leaves(parquet_schema_desc, leaf_indices) } else { ProjectionMask::roots(parquet_schema_desc, parquet_read_cols.root_indices_iter()) } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index b359d01ea4a8..ca0ef3feac2f 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -425,7 +425,7 @@ impl ParquetReaderBuilder { read_format.projection_indices().iter().copied(), ); - let projection_mask = build_projection_mask(&parquet_read_cols, &parquet_schema_desc); + let projection_mask = build_projection_mask(&parquet_read_cols, parquet_schema_desc); let selection = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await;