Skip to content

Commit 6a6c631

Browse files
authored
Handle compressed empty DataPage v2 (#7389)
* Work with empty datapage v2 (only null values) * Remove obsolete TODO * Check datapage header values for plausibility * Replace `as usize` with `usize::try_from(…)?` where useful * Assert reading page_v2_empty_compressed.parquet * Make get_next_page in test fail on errors * Split long error message * Move parquet-testing to latest master
1 parent 5760049 commit 6a6c631

File tree

2 files changed

+230
-23
lines changed

2 files changed

+230
-23
lines changed

parquet/src/file/serialized_reader.rs

Lines changed: 229 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R
322322
Ok(Box::new(SerializedPageReader::new_with_properties(
323323
Arc::clone(&self.chunk_reader),
324324
col,
325-
self.metadata.num_rows() as usize,
325+
usize::try_from(self.metadata.num_rows())?,
326326
page_locations,
327327
props,
328328
)?))
@@ -442,8 +442,24 @@ pub(crate) fn decode_page(
442442
let mut can_decompress = true;
443443

444444
if let Some(ref header_v2) = page_header.data_page_header_v2 {
445-
offset = (header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length)
446-
as usize;
445+
if header_v2.definition_levels_byte_length < 0
446+
|| header_v2.repetition_levels_byte_length < 0
447+
|| header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
448+
> page_header.uncompressed_page_size
449+
{
450+
return Err(general_err!(
451+
"DataPage v2 header contains implausible values \
452+
for definition_levels_byte_length ({}) \
453+
and repetition_levels_byte_length ({}) \
454+
given DataPage header provides uncompressed_page_size ({})",
455+
header_v2.definition_levels_byte_length,
456+
header_v2.repetition_levels_byte_length,
457+
page_header.uncompressed_page_size
458+
));
459+
}
460+
offset = usize::try_from(
461+
header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
462+
)?;
447463
// When is_compressed flag is missing the page is considered compressed
448464
can_decompress = header_v2.is_compressed.unwrap_or(true);
449465
}
@@ -452,21 +468,20 @@ pub(crate) fn decode_page(
452468
// maximum page header size and abort if that is exceeded.
453469
let buffer = match decompressor {
454470
Some(decompressor) if can_decompress => {
455-
let uncompressed_size = page_header.uncompressed_page_size as usize;
456-
let mut decompressed = Vec::with_capacity(uncompressed_size);
457-
let compressed = &buffer.as_ref()[offset..];
471+
let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
472+
let decompressed_size = uncompressed_page_size - offset;
473+
let mut decompressed = Vec::with_capacity(uncompressed_page_size);
458474
decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
459-
decompressor.decompress(
460-
compressed,
461-
&mut decompressed,
462-
Some(uncompressed_size - offset),
463-
)?;
475+
if decompressed_size > 0 {
476+
let compressed = &buffer.as_ref()[offset..];
477+
decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
478+
}
464479

465-
if decompressed.len() != uncompressed_size {
480+
if decompressed.len() != uncompressed_page_size {
466481
return Err(general_err!(
467482
"Actual decompressed size doesn't match the expected one ({} vs {})",
468483
decompressed.len(),
469-
uncompressed_size
484+
uncompressed_page_size
470485
));
471486
}
472487

@@ -652,8 +667,8 @@ impl<R: ChunkReader> SerializedPageReader<R> {
652667
}
653668
}
654669
None => SerializedPageReaderState::Values {
655-
offset: start as usize,
656-
remaining_bytes: len as usize,
670+
offset: usize::try_from(start)?,
671+
remaining_bytes: usize::try_from(len)?,
657672
next_page_header: None,
658673
page_ordinal: 0,
659674
require_dictionary: meta.dictionary_page_offset().is_some(),
@@ -717,9 +732,9 @@ impl<R: ChunkReader> SerializedPageReader<R> {
717732
..
718733
} => {
719734
if let Some(page) = dictionary_page {
720-
Ok(Some(page.offset as usize))
735+
Ok(Some(usize::try_from(page.offset)?))
721736
} else if let Some(page) = page_locations.front() {
722-
Ok(Some(page.offset as usize))
737+
Ok(Some(usize::try_from(page.offset)?))
723738
} else {
724739
Ok(None)
725740
}
@@ -861,7 +876,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
861876
None => return Ok(None),
862877
};
863878

864-
let page_len = front.compressed_page_size as usize;
879+
let page_len = usize::try_from(front.compressed_page_size)?;
865880

866881
let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
867882

@@ -1031,7 +1046,7 @@ mod tests {
10311046
use crate::file::properties::{EnabledStatistics, WriterProperties};
10321047
use crate::format::BoundaryOrder;
10331048

1034-
use crate::basic::{self, ColumnOrder};
1049+
use crate::basic::{self, ColumnOrder, SortOrder};
10351050
use crate::column::reader::ColumnReader;
10361051
use crate::data_type::private::ParquetValueType;
10371052
use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
@@ -1193,7 +1208,7 @@ mod tests {
11931208
assert!(page_reader_0_result.is_ok());
11941209
let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
11951210
let mut page_count = 0;
1196-
while let Ok(Some(page)) = page_reader_0.get_next_page() {
1211+
while let Some(page) = page_reader_0.get_next_page().unwrap() {
11971212
let is_expected_page = match page {
11981213
Page::DictionaryPage {
11991214
buf,
@@ -1287,7 +1302,7 @@ mod tests {
12871302
assert!(page_reader_0_result.is_ok());
12881303
let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
12891304
let mut page_count = 0;
1290-
while let Ok(Some(page)) = page_reader_0.get_next_page() {
1305+
while let Some(page) = page_reader_0.get_next_page().unwrap() {
12911306
let is_expected_page = match page {
12921307
Page::DictionaryPage {
12931308
buf,
@@ -1331,6 +1346,198 @@ mod tests {
13311346
assert_eq!(page_count, 2);
13321347
}
13331348

1349+
#[test]
1350+
fn test_file_reader_empty_compressed_datapage_v2() {
1351+
// this file has a compressed datapage that un-compresses to 0 bytes
1352+
let test_file = get_test_file("page_v2_empty_compressed.parquet");
1353+
let reader_result = SerializedFileReader::new(test_file);
1354+
assert!(reader_result.is_ok());
1355+
let reader = reader_result.unwrap();
1356+
1357+
// Test contents in Parquet metadata
1358+
let metadata = reader.metadata();
1359+
assert_eq!(metadata.num_row_groups(), 1);
1360+
1361+
// Test contents in file metadata
1362+
let file_metadata = metadata.file_metadata();
1363+
assert!(file_metadata.created_by().is_some());
1364+
assert_eq!(
1365+
file_metadata.created_by().unwrap(),
1366+
"parquet-cpp-arrow version 14.0.2"
1367+
);
1368+
assert!(file_metadata.key_value_metadata().is_some());
1369+
assert_eq!(
1370+
file_metadata.key_value_metadata().to_owned().unwrap().len(),
1371+
1
1372+
);
1373+
1374+
assert_eq!(file_metadata.num_rows(), 10);
1375+
assert_eq!(file_metadata.version(), 2);
1376+
let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1377+
assert_eq!(
1378+
file_metadata.column_orders(),
1379+
Some(vec![expected_order].as_ref())
1380+
);
1381+
1382+
let row_group_metadata = metadata.row_group(0);
1383+
1384+
// Check each column order
1385+
for i in 0..row_group_metadata.num_columns() {
1386+
assert_eq!(file_metadata.column_order(i), expected_order);
1387+
}
1388+
1389+
// Test row group reader
1390+
let row_group_reader_result = reader.get_row_group(0);
1391+
assert!(row_group_reader_result.is_ok());
1392+
let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1393+
assert_eq!(
1394+
row_group_reader.num_columns(),
1395+
row_group_metadata.num_columns()
1396+
);
1397+
assert_eq!(
1398+
row_group_reader.metadata().total_byte_size(),
1399+
row_group_metadata.total_byte_size()
1400+
);
1401+
1402+
// Test page readers
1403+
let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1404+
assert!(page_reader_0_result.is_ok());
1405+
let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1406+
let mut page_count = 0;
1407+
while let Some(page) = page_reader_0.get_next_page().unwrap() {
1408+
let is_expected_page = match page {
1409+
Page::DictionaryPage {
1410+
buf,
1411+
num_values,
1412+
encoding,
1413+
is_sorted,
1414+
} => {
1415+
assert_eq!(buf.len(), 0);
1416+
assert_eq!(num_values, 0);
1417+
assert_eq!(encoding, Encoding::PLAIN);
1418+
assert!(!is_sorted);
1419+
true
1420+
}
1421+
Page::DataPageV2 {
1422+
buf,
1423+
num_values,
1424+
encoding,
1425+
num_nulls,
1426+
num_rows,
1427+
def_levels_byte_len,
1428+
rep_levels_byte_len,
1429+
is_compressed,
1430+
statistics,
1431+
} => {
1432+
assert_eq!(buf.len(), 3);
1433+
assert_eq!(num_values, 10);
1434+
assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1435+
assert_eq!(num_nulls, 10);
1436+
assert_eq!(num_rows, 10);
1437+
assert_eq!(def_levels_byte_len, 2);
1438+
assert_eq!(rep_levels_byte_len, 0);
1439+
assert!(is_compressed);
1440+
assert!(statistics.is_some());
1441+
true
1442+
}
1443+
_ => false,
1444+
};
1445+
assert!(is_expected_page);
1446+
page_count += 1;
1447+
}
1448+
assert_eq!(page_count, 2);
1449+
}
1450+
1451+
#[test]
1452+
fn test_file_reader_empty_datapage_v2() {
1453+
// this file has 0 bytes compressed datapage that un-compresses to 0 bytes
1454+
let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1455+
let reader_result = SerializedFileReader::new(test_file);
1456+
assert!(reader_result.is_ok());
1457+
let reader = reader_result.unwrap();
1458+
1459+
// Test contents in Parquet metadata
1460+
let metadata = reader.metadata();
1461+
assert_eq!(metadata.num_row_groups(), 1);
1462+
1463+
// Test contents in file metadata
1464+
let file_metadata = metadata.file_metadata();
1465+
assert!(file_metadata.created_by().is_some());
1466+
assert_eq!(
1467+
file_metadata.created_by().unwrap(),
1468+
"parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1469+
);
1470+
assert!(file_metadata.key_value_metadata().is_some());
1471+
assert_eq!(
1472+
file_metadata.key_value_metadata().to_owned().unwrap().len(),
1473+
2
1474+
);
1475+
1476+
assert_eq!(file_metadata.num_rows(), 1);
1477+
assert_eq!(file_metadata.version(), 1);
1478+
let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1479+
assert_eq!(
1480+
file_metadata.column_orders(),
1481+
Some(vec![expected_order].as_ref())
1482+
);
1483+
1484+
let row_group_metadata = metadata.row_group(0);
1485+
1486+
// Check each column order
1487+
for i in 0..row_group_metadata.num_columns() {
1488+
assert_eq!(file_metadata.column_order(i), expected_order);
1489+
}
1490+
1491+
// Test row group reader
1492+
let row_group_reader_result = reader.get_row_group(0);
1493+
assert!(row_group_reader_result.is_ok());
1494+
let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1495+
assert_eq!(
1496+
row_group_reader.num_columns(),
1497+
row_group_metadata.num_columns()
1498+
);
1499+
assert_eq!(
1500+
row_group_reader.metadata().total_byte_size(),
1501+
row_group_metadata.total_byte_size()
1502+
);
1503+
1504+
// Test page readers
1505+
let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1506+
assert!(page_reader_0_result.is_ok());
1507+
let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1508+
let mut page_count = 0;
1509+
while let Some(page) = page_reader_0.get_next_page().unwrap() {
1510+
let is_expected_page = match page {
1511+
Page::DataPageV2 {
1512+
buf,
1513+
num_values,
1514+
encoding,
1515+
num_nulls,
1516+
num_rows,
1517+
def_levels_byte_len,
1518+
rep_levels_byte_len,
1519+
is_compressed,
1520+
statistics,
1521+
} => {
1522+
assert_eq!(buf.len(), 2);
1523+
assert_eq!(num_values, 1);
1524+
assert_eq!(encoding, Encoding::PLAIN);
1525+
assert_eq!(num_nulls, 1);
1526+
assert_eq!(num_rows, 1);
1527+
assert_eq!(def_levels_byte_len, 2);
1528+
assert_eq!(rep_levels_byte_len, 0);
1529+
assert!(is_compressed);
1530+
assert!(statistics.is_none());
1531+
true
1532+
}
1533+
_ => false,
1534+
};
1535+
assert!(is_expected_page);
1536+
page_count += 1;
1537+
}
1538+
assert_eq!(page_count, 1);
1539+
}
1540+
13341541
fn get_serialized_page_reader<R: ChunkReader>(
13351542
file_reader: &SerializedFileReader<R>,
13361543
row_group: usize,
@@ -1361,7 +1568,7 @@ mod tests {
13611568
SerializedPageReader::new_with_properties(
13621569
Arc::clone(&row_group.chunk_reader),
13631570
col,
1364-
row_group.metadata.num_rows() as usize,
1571+
usize::try_from(row_group.metadata.num_rows())?,
13651572
page_locations,
13661573
props,
13671574
)

0 commit comments

Comments
 (0)