Skip to content

Commit adf4c2f

Browse files
committed
Work with empty datapage v2 (only null values)
1 parent 56e8208 commit adf4c2f

File tree

1 file changed

+112
-11
lines changed

1 file changed

+112
-11
lines changed

parquet/src/file/serialized_reader.rs

Lines changed: 112 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -452,21 +452,20 @@ pub(crate) fn decode_page(
452452
// maximum page header size and abort if that is exceeded.
453453
let buffer = match decompressor {
454454
Some(decompressor) if can_decompress => {
455-
let uncompressed_size = page_header.uncompressed_page_size as usize;
456-
let mut decompressed = Vec::with_capacity(uncompressed_size);
457-
let compressed = &buffer.as_ref()[offset..];
455+
let uncompressed_page_size = page_header.uncompressed_page_size as usize;
456+
let decompressed_size = uncompressed_page_size - offset;
457+
let mut decompressed = Vec::with_capacity(uncompressed_page_size);
458458
decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
459-
decompressor.decompress(
460-
compressed,
461-
&mut decompressed,
462-
Some(uncompressed_size - offset),
463-
)?;
459+
if decompressed_size != 0 {
460+
let compressed = &buffer.as_ref()[offset..];
461+
decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
462+
}
464463

465-
if decompressed.len() != uncompressed_size {
464+
if decompressed.len() != uncompressed_page_size {
466465
return Err(general_err!(
467466
"Actual decompressed size doesn't match the expected one ({} vs {})",
468467
decompressed.len(),
469-
uncompressed_size
468+
uncompressed_page_size
470469
));
471470
}
472471

@@ -1021,7 +1020,7 @@ mod tests {
10211020
use crate::file::properties::{EnabledStatistics, WriterProperties};
10221021
use crate::format::BoundaryOrder;
10231022

1024-
use crate::basic::{self, ColumnOrder};
1023+
use crate::basic::{self, ColumnOrder, SortOrder};
10251024
use crate::column::reader::ColumnReader;
10261025
use crate::data_type::private::ParquetValueType;
10271026
use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
@@ -1321,6 +1320,108 @@ mod tests {
13211320
assert_eq!(page_count, 2);
13221321
}
13231322

1323+
#[test]
1324+
fn test_file_reader_empty_datapage_v2() {
1325+
let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1326+
let reader_result = SerializedFileReader::new(test_file);
1327+
assert!(reader_result.is_ok());
1328+
let reader = reader_result.unwrap();
1329+
1330+
// Test contents in Parquet metadata
1331+
let metadata = reader.metadata();
1332+
assert_eq!(metadata.num_row_groups(), 1);
1333+
1334+
// Test contents in file metadata
1335+
let file_metadata = metadata.file_metadata();
1336+
assert!(file_metadata.created_by().is_some());
1337+
assert_eq!(
1338+
file_metadata.created_by().unwrap(),
1339+
"parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1340+
);
1341+
assert!(file_metadata.key_value_metadata().is_some());
1342+
assert_eq!(
1343+
file_metadata.key_value_metadata().to_owned().unwrap().len(),
1344+
2
1345+
);
1346+
1347+
assert_eq!(file_metadata.num_rows(), 1);
1348+
assert_eq!(file_metadata.version(), 1);
1349+
let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1350+
assert_eq!(
1351+
file_metadata.column_orders(),
1352+
Some(vec![expected_order].as_ref())
1353+
);
1354+
1355+
let row_group_metadata = metadata.row_group(0);
1356+
1357+
// Check each column order
1358+
for i in 0..row_group_metadata.num_columns() {
1359+
assert_eq!(file_metadata.column_order(i), expected_order);
1360+
}
1361+
1362+
// Test row group reader
1363+
let row_group_reader_result = reader.get_row_group(0);
1364+
assert!(row_group_reader_result.is_ok());
1365+
let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1366+
assert_eq!(
1367+
row_group_reader.num_columns(),
1368+
row_group_metadata.num_columns()
1369+
);
1370+
assert_eq!(
1371+
row_group_reader.metadata().total_byte_size(),
1372+
row_group_metadata.total_byte_size()
1373+
);
1374+
1375+
// Test page readers
1376+
// TODO: test for every column
1377+
let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1378+
assert!(page_reader_0_result.is_ok());
1379+
let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1380+
let mut page_count = 0;
1381+
while let Ok(Some(page)) = page_reader_0.get_next_page() {
1382+
let is_expected_page = match page {
1383+
Page::DictionaryPage {
1384+
buf,
1385+
num_values,
1386+
encoding,
1387+
is_sorted,
1388+
} => {
1389+
assert_eq!(buf.len(), 7);
1390+
assert_eq!(num_values, 1);
1391+
assert_eq!(encoding, Encoding::PLAIN);
1392+
assert!(!is_sorted);
1393+
true
1394+
}
1395+
Page::DataPageV2 {
1396+
buf,
1397+
num_values,
1398+
encoding,
1399+
num_nulls,
1400+
num_rows,
1401+
def_levels_byte_len,
1402+
rep_levels_byte_len,
1403+
is_compressed,
1404+
statistics,
1405+
} => {
1406+
assert_eq!(buf.len(), 2);
1407+
assert_eq!(num_values, 1);
1408+
assert_eq!(encoding, Encoding::PLAIN);
1409+
assert_eq!(num_nulls, 1);
1410+
assert_eq!(num_rows, 1);
1411+
assert_eq!(def_levels_byte_len, 2);
1412+
assert_eq!(rep_levels_byte_len, 0);
1413+
assert!(is_compressed);
1414+
assert!(statistics.is_none());
1415+
true
1416+
}
1417+
_ => false,
1418+
};
1419+
assert!(is_expected_page);
1420+
page_count += 1;
1421+
}
1422+
assert_eq!(page_count, 1);
1423+
}
1424+
13241425
fn get_serialized_page_reader<R: ChunkReader>(
13251426
file_reader: &SerializedFileReader<R>,
13261427
row_group: usize,

0 commit comments

Comments
 (0)