Skip to content

Commit 88fb923

Browse files
XiangpengHaoalamb
andauthored
Add peek_next_page_offset to SerializedPageReader (#6945)
* add peek_next_page_offset * Update parquet/src/file/serialized_reader.rs Co-authored-by: Andrew Lamb <[email protected]> --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent d0260fc commit 88fb923

File tree

1 file changed

+142
-0
lines changed

1 file changed

+142
-0
lines changed

parquet/src/file/serialized_reader.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,63 @@ impl<R: ChunkReader> SerializedPageReader<R> {
568568
physical_type: meta.column_type(),
569569
})
570570
}
571+
572+
/// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
573+
/// Unlike page metadata, an offset can uniquely identify a page.
574+
///
575+
/// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
576+
/// This function allows us to check if the next page is being cached or read previously.
577+
#[cfg(test)]
578+
fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
579+
match &mut self.state {
580+
SerializedPageReaderState::Values {
581+
offset,
582+
remaining_bytes,
583+
next_page_header,
584+
} => {
585+
loop {
586+
if *remaining_bytes == 0 {
587+
return Ok(None);
588+
}
589+
return if let Some(header) = next_page_header.as_ref() {
590+
if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
591+
Ok(Some(*offset))
592+
} else {
593+
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
594+
*next_page_header = None;
595+
continue;
596+
}
597+
} else {
598+
let mut read = self.reader.get_read(*offset as u64)?;
599+
let (header_len, header) = read_page_header_len(&mut read)?;
600+
*offset += header_len;
601+
*remaining_bytes -= header_len;
602+
let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
603+
Ok(Some(*offset))
604+
} else {
605+
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
606+
continue;
607+
};
608+
*next_page_header = Some(Box::new(header));
609+
page_meta
610+
};
611+
}
612+
}
613+
SerializedPageReaderState::Pages {
614+
page_locations,
615+
dictionary_page,
616+
..
617+
} => {
618+
if let Some(page) = dictionary_page {
619+
Ok(Some(page.offset as usize))
620+
} else if let Some(page) = page_locations.front() {
621+
Ok(Some(page.offset as usize))
622+
} else {
623+
Ok(None)
624+
}
625+
}
626+
}
627+
}
571628
}
572629

573630
impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
@@ -802,6 +859,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
802859

803860
#[cfg(test)]
804861
mod tests {
862+
use std::collections::HashSet;
863+
805864
use bytes::Buf;
806865

807866
use crate::file::properties::{EnabledStatistics, WriterProperties};
@@ -1107,6 +1166,89 @@ mod tests {
11071166
assert_eq!(page_count, 2);
11081167
}
11091168

1169+
fn get_serialized_page_reader<R: ChunkReader>(
1170+
file_reader: &SerializedFileReader<R>,
1171+
row_group: usize,
1172+
column: usize,
1173+
) -> Result<SerializedPageReader<R>> {
1174+
let row_group = {
1175+
let row_group_metadata = file_reader.metadata.row_group(row_group);
1176+
let props = Arc::clone(&file_reader.props);
1177+
let f = Arc::clone(&file_reader.chunk_reader);
1178+
SerializedRowGroupReader::new(
1179+
f,
1180+
row_group_metadata,
1181+
file_reader
1182+
.metadata
1183+
.offset_index()
1184+
.map(|x| x[row_group].as_slice()),
1185+
props,
1186+
)?
1187+
};
1188+
1189+
let col = row_group.metadata.column(column);
1190+
1191+
let page_locations = row_group
1192+
.offset_index
1193+
.map(|x| x[column].page_locations.clone());
1194+
1195+
let props = Arc::clone(&row_group.props);
1196+
SerializedPageReader::new_with_properties(
1197+
Arc::clone(&row_group.chunk_reader),
1198+
col,
1199+
row_group.metadata.num_rows() as usize,
1200+
page_locations,
1201+
props,
1202+
)
1203+
}
1204+
1205+
#[test]
1206+
fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1207+
let test_file = get_test_file("alltypes_plain.parquet");
1208+
let reader = SerializedFileReader::new(test_file)?;
1209+
1210+
let mut offset_set = HashSet::new();
1211+
let num_row_groups = reader.metadata.num_row_groups();
1212+
for row_group in 0..num_row_groups {
1213+
let num_columns = reader.metadata.row_group(row_group).num_columns();
1214+
for column in 0..num_columns {
1215+
let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1216+
1217+
while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1218+
match &page_reader.state {
1219+
SerializedPageReaderState::Pages {
1220+
page_locations,
1221+
dictionary_page,
1222+
..
1223+
} => {
1224+
if let Some(page) = dictionary_page {
1225+
assert_eq!(page.offset as usize, page_offset);
1226+
} else if let Some(page) = page_locations.front() {
1227+
assert_eq!(page.offset as usize, page_offset);
1228+
} else {
1229+
unreachable!()
1230+
}
1231+
}
1232+
SerializedPageReaderState::Values {
1233+
offset,
1234+
next_page_header,
1235+
..
1236+
} => {
1237+
assert!(next_page_header.is_some());
1238+
assert_eq!(*offset, page_offset);
1239+
}
1240+
}
1241+
let page = page_reader.get_next_page()?;
1242+
assert!(page.is_some());
1243+
let newly_inserted = offset_set.insert(page_offset);
1244+
assert!(newly_inserted);
1245+
}
1246+
}
1247+
}
1248+
1249+
Ok(())
1250+
}
1251+
11101252
#[test]
11111253
fn test_page_iterator() {
11121254
let file = get_test_file("alltypes_plain.parquet");

0 commit comments

Comments
 (0)