Skip to content

Commit bab30ae

Browse files
authored
Add ability to skip or transform page encoding statistics in Parquet metadata (#8797)
1 parent b93fa52 commit bab30ae

File tree

9 files changed

+467
-25
lines changed

9 files changed

+467
-25
lines changed

parquet/benches/metadata.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121
use parquet::basic::{Encoding, PageType, Type as PhysicalType};
2222
use parquet::file::metadata::{
2323
ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions,
24-
ParquetMetaDataReader, ParquetMetaDataWriter, RowGroupMetaData,
24+
ParquetMetaDataReader, ParquetMetaDataWriter, ParquetStatisticsPolicy, RowGroupMetaData,
2525
};
2626
use parquet::file::statistics::Statistics;
2727
use parquet::file::writer::TrackedWrite;
@@ -173,6 +173,23 @@ fn criterion_benchmark(c: &mut Criterion) {
173173
})
174174
});
175175

176+
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true);
177+
c.bench_function("decode metadata with stats mask", |b| {
178+
b.iter(|| {
179+
ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options))
180+
.unwrap();
181+
})
182+
});
183+
184+
let options =
185+
ParquetMetaDataOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
186+
c.bench_function("decode metadata with skip PES", |b| {
187+
b.iter(|| {
188+
ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options))
189+
.unwrap();
190+
})
191+
});
192+
176193
let buf: Bytes = black_box(encoded_meta()).into();
177194
c.bench_function("decode parquet metadata (wide)", |b| {
178195
b.iter(|| {
@@ -187,6 +204,21 @@ fn criterion_benchmark(c: &mut Criterion) {
187204
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
188205
})
189206
});
207+
208+
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true);
209+
c.bench_function("decode metadata (wide) with stats mask", |b| {
210+
b.iter(|| {
211+
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
212+
})
213+
});
214+
215+
let options =
216+
ParquetMetaDataOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
217+
c.bench_function("decode metadata (wide) with skip PES", |b| {
218+
b.iter(|| {
219+
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
220+
})
221+
});
190222
}
191223

192224
criterion_group!(benches, criterion_benchmark);

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::encryption::decrypt::FileDecryptionProperties;
4242
use crate::errors::{ParquetError, Result};
4343
use crate::file::metadata::{
4444
PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45-
RowGroupMetaData,
45+
ParquetStatisticsPolicy, RowGroupMetaData,
4646
};
4747
use crate::file::reader::{ChunkReader, SerializedPageReader};
4848
use crate::schema::types::SchemaDescriptor;
@@ -557,6 +557,30 @@ impl ArrowReaderOptions {
557557
self
558558
}
559559

560+
/// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
561+
/// (defaults to `false`).
562+
///
563+
/// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
564+
/// might be desirable.
565+
///
566+
/// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
567+
/// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
568+
/// [`encoding_stats`]:
569+
/// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
570+
pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
571+
self.metadata_options.set_encoding_stats_as_mask(val);
572+
self
573+
}
574+
575+
/// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
576+
///
577+
/// [`encoding_stats`]:
578+
/// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
579+
pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
580+
self.metadata_options.set_encoding_stats_policy(policy);
581+
self
582+
}
583+
560584
/// Provide the file decryption properties to use when reading encrypted parquet files.
561585
///
562586
/// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
@@ -1420,7 +1444,7 @@ pub(crate) mod tests {
14201444
FloatType, Int32Type, Int64Type, Int96, Int96Type,
14211445
};
14221446
use crate::errors::Result;
1423-
use crate::file::metadata::ParquetMetaData;
1447+
use crate::file::metadata::{ParquetMetaData, ParquetStatisticsPolicy};
14241448
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
14251449
use crate::file::writer::SerializedFileWriter;
14261450
use crate::schema::parser::parse_message_type;
@@ -1474,6 +1498,69 @@ pub(crate) mod tests {
14741498
assert_eq!(expected.as_ref(), builder.metadata.as_ref());
14751499
}
14761500

1501+
#[test]
1502+
fn test_page_encoding_stats_mask() {
1503+
let testdata = arrow::util::test_util::parquet_test_data();
1504+
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1505+
let file = File::open(path).unwrap();
1506+
1507+
let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1508+
let builder =
1509+
ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1510+
1511+
let row_group_metadata = builder.metadata.row_group(0);
1512+
1513+
// test page encoding stats
1514+
let page_encoding_stats = row_group_metadata
1515+
.column(0)
1516+
.page_encoding_stats_mask()
1517+
.unwrap();
1518+
assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1519+
let page_encoding_stats = row_group_metadata
1520+
.column(2)
1521+
.page_encoding_stats_mask()
1522+
.unwrap();
1523+
assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1524+
}
1525+
1526+
#[test]
1527+
fn test_page_encoding_stats_skipped() {
1528+
let testdata = arrow::util::test_util::parquet_test_data();
1529+
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1530+
let file = File::open(path).unwrap();
1531+
1532+
// test skipping all
1533+
let arrow_options =
1534+
ArrowReaderOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
1535+
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1536+
file.try_clone().unwrap(),
1537+
arrow_options,
1538+
)
1539+
.unwrap();
1540+
1541+
let row_group_metadata = builder.metadata.row_group(0);
1542+
for column in row_group_metadata.columns() {
1543+
assert!(column.page_encoding_stats().is_none());
1544+
assert!(column.page_encoding_stats_mask().is_none());
1545+
}
1546+
1547+
// test skipping all but one column and converting to mask
1548+
let arrow_options = ArrowReaderOptions::new()
1549+
.with_encoding_stats_as_mask(true)
1550+
.with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1551+
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1552+
file.try_clone().unwrap(),
1553+
arrow_options,
1554+
)
1555+
.unwrap();
1556+
1557+
let row_group_metadata = builder.metadata.row_group(0);
1558+
for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1559+
assert!(column.page_encoding_stats().is_none());
1560+
assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1561+
}
1562+
}
1563+
14771564
#[test]
14781565
fn test_arrow_reader_single_column() {
14791566
let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");

parquet/src/basic.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,11 @@ impl EncodingMask {
737737
self.0 & (1 << (val as i32)) != 0
738738
}
739739

740+
/// Test if this mask has only the bit for the given [`Encoding`] set.
741+
pub fn is_only(&self, val: Encoding) -> bool {
742+
self.0 == (1 << (val as i32))
743+
}
744+
740745
/// Test if all [`Encoding`]s in a given set are present in this mask.
741746
pub fn all_set<'a>(&self, mut encodings: impl Iterator<Item = &'a Encoding>) -> bool {
742747
encodings.all(|&e| self.is_set(e))
@@ -2510,4 +2515,14 @@ mod tests {
25102515
"Parquet error: Attempt to create invalid mask: 0x2"
25112516
);
25122517
}
2518+
2519+
#[test]
2520+
fn test_encoding_mask_is_only() {
2521+
let mask = EncodingMask::new_from_encodings([Encoding::PLAIN].iter());
2522+
assert!(mask.is_only(Encoding::PLAIN));
2523+
2524+
let mask =
2525+
EncodingMask::new_from_encodings([Encoding::PLAIN, Encoding::PLAIN_DICTIONARY].iter());
2526+
assert!(!mask.is_only(Encoding::PLAIN));
2527+
}
25132528
}

parquet/src/file/metadata/memory.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
use crate::basic::{BoundaryOrder, ColumnOrder, Compression, Encoding, PageType};
2222
use crate::data_type::private::ParquetValueType;
2323
use crate::file::metadata::{
24-
ColumnChunkMetaData, FileMetaData, KeyValue, PageEncodingStats, RowGroupMetaData, SortingColumn,
24+
ColumnChunkMetaData, FileMetaData, KeyValue, PageEncodingStats, ParquetPageEncodingStats,
25+
RowGroupMetaData, SortingColumn,
2526
};
2627
use crate::file::page_index::column_index::{
2728
ByteArrayColumnIndex, ColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
@@ -185,6 +186,15 @@ impl HeapSize for Encoding {
185186
}
186187
}
187188

189+
impl HeapSize for ParquetPageEncodingStats {
190+
fn heap_size(&self) -> usize {
191+
match self {
192+
Self::Full(v) => v.heap_size(),
193+
Self::Mask(_) => 0,
194+
}
195+
}
196+
}
197+
188198
impl HeapSize for PageEncodingStats {
189199
fn heap_size(&self) -> usize {
190200
self.page_type.heap_size() + self.encoding.heap_size()

parquet/src/file/metadata/mod.rs

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ use crate::{
128128
};
129129

130130
pub use footer_tail::FooterTail;
131-
pub use options::ParquetMetaDataOptions;
131+
pub use options::{ParquetMetaDataOptions, ParquetStatisticsPolicy};
132132
pub use push_decoder::ParquetMetaDataPushDecoder;
133133
pub use reader::{PageIndexPolicy, ParquetMetaDataReader};
134134
use std::io::Write;
@@ -470,6 +470,16 @@ pub struct PageEncodingStats {
470470
}
471471
);
472472

473+
/// Internal representation of the page encoding stats in the [`ColumnChunkMetaData`].
474+
/// This is not publicly exposed, with different getters defined for each variant.
475+
#[derive(Debug, Clone, PartialEq)]
476+
enum ParquetPageEncodingStats {
477+
/// The full array of stats as defined in the Parquet spec.
478+
Full(Vec<PageEncodingStats>),
479+
/// A condensed version of only page encodings seen.
480+
Mask(EncodingMask),
481+
}
482+
473483
/// Reference counted pointer for [`FileMetaData`].
474484
pub type FileMetaDataPtr = Arc<FileMetaData>;
475485

@@ -812,7 +822,7 @@ pub struct ColumnChunkMetaData {
812822
dictionary_page_offset: Option<i64>,
813823
statistics: Option<Statistics>,
814824
geo_statistics: Option<Box<geo_statistics::GeospatialStatistics>>,
815-
encoding_stats: Option<Vec<PageEncodingStats>>,
825+
encoding_stats: Option<ParquetPageEncodingStats>,
816826
bloom_filter_offset: Option<i64>,
817827
bloom_filter_length: Option<i32>,
818828
offset_index_offset: Option<i64>,
@@ -1050,10 +1060,47 @@ impl ColumnChunkMetaData {
10501060
self.geo_statistics.as_deref()
10511061
}
10521062

1053-
/// Returns the offset for the page encoding stats,
1054-
/// or `None` if no page encoding stats are available.
1063+
/// Returns the page encoding statistics, or `None` if no page encoding statistics
1064+
/// are available (or they were converted to a mask).
10551065
pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> {
1056-
self.encoding_stats.as_ref()
1066+
match self.encoding_stats.as_ref() {
1067+
Some(ParquetPageEncodingStats::Full(stats)) => Some(stats),
1068+
_ => None,
1069+
}
1070+
}
1071+
1072+
/// Returns the page encoding statistics reduced to a bitmask, or `None` if statistics are
1073+
/// not available (or they were left in their original form).
1074+
///
1075+
/// The [`PageEncodingStats`] struct was added to the Parquet specification specifically to
1076+
/// enable fast determination of whether all pages in a column chunk are dictionary encoded
1077+
/// (see <https://github.com/apache/parquet-format/pull/16>).
1078+
/// Decoding the full page encoding statistics, however, can be very costly, and is not
1079+
/// necessary to support the aforementioned use case. As an alternative, this crate can
1080+
/// instead distill the list of `PageEncodingStats` down to a bitmask of just the encodings
1081+
/// used for data pages
1082+
/// (see [`ParquetMetaDataOptions::set_encoding_stats_as_mask`]).
1083+
/// To test for an all-dictionary-encoded chunk one could use this bitmask in the following way:
1084+
///
1085+
/// ```rust
1086+
/// use parquet::basic::Encoding;
1087+
/// use parquet::file::metadata::ColumnChunkMetaData;
1088+
/// // test if all data pages in the column chunk are dictionary encoded
1089+
/// fn is_all_dictionary_encoded(col_meta: &ColumnChunkMetaData) -> bool {
1090+
/// // check that dictionary encoding was used
1091+
/// col_meta.dictionary_page_offset().is_some()
1092+
/// && col_meta.page_encoding_stats_mask().is_some_and(|mask| {
1093+
/// // mask should only have one bit set, either for PLAIN_DICTIONARY or
1094+
/// // RLE_DICTIONARY
1095+
/// mask.is_only(Encoding::PLAIN_DICTIONARY) || mask.is_only(Encoding::RLE_DICTIONARY)
1096+
/// })
1097+
/// }
1098+
/// ```
1099+
pub fn page_encoding_stats_mask(&self) -> Option<&EncodingMask> {
1100+
match self.encoding_stats.as_ref() {
1101+
Some(ParquetPageEncodingStats::Mask(stats)) => Some(stats),
1102+
_ => None,
1103+
}
10571104
}
10581105

10591106
/// Returns the offset for the bloom filter.
@@ -1273,8 +1320,18 @@ impl ColumnChunkMetaDataBuilder {
12731320
}
12741321

12751322
/// Sets page encoding stats for this column chunk.
1323+
///
1324+
/// This will overwrite any existing stats, either `Vec` based or bitmask.
12761325
pub fn set_page_encoding_stats(mut self, value: Vec<PageEncodingStats>) -> Self {
1277-
self.0.encoding_stats = Some(value);
1326+
self.0.encoding_stats = Some(ParquetPageEncodingStats::Full(value));
1327+
self
1328+
}
1329+
1330+
/// Sets page encoding stats mask for this column chunk.
1331+
///
1332+
/// This will overwrite any existing stats, either `Vec` based or bitmask.
1333+
pub fn set_page_encoding_stats_mask(mut self, value: EncodingMask) -> Self {
1334+
self.0.encoding_stats = Some(ParquetPageEncodingStats::Mask(value));
12781335
self
12791336
}
12801337

0 commit comments

Comments
 (0)