Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,11 +569,15 @@ fn parse_v1_level(
match encoding {
Encoding::RLE => {
let i32_size = std::mem::size_of::<i32>();
let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
Ok((
i32_size + data_size,
buf.slice(i32_size..i32_size + data_size),
))
if i32_size <= buf.len() {
let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
let end =
i32_size.checked_add(data_size).ok_or(general_err!("invalid level length"))?;
if end <= buf.len() {
return Ok((end, buf.slice(i32_size..end)));
}
}
Err(general_err!("not enough data to read levels"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely an improvement over the existing code, but it opens a question:

Given that we're reading bytes from a byte buffer, it seems like we must expect to hit this situation at least occasionally? And the correct response is to fetch more bytes, not fail? Is there some mechanism for handling that higher up in the call stack? Or is there some reason it should be impossible for this code to run off the end of the buffer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also -- it seems like read_num_bytes should do bounds checking internally and return Option<T>, so buffer overrun is obvious at the call site instead of a hidden panic footgun? The method has a half dozen other callers, and they all need to do manual bounds checking, in various ways and with varying degrees of safety. In particular, parquet/src/data_type.rs has two call sites that lack any visible bounds checks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this particular instance we're reading a buffer that should contain an entire page of data. If it doesn't, that likely points to a problem with the metadata.

Changes to read_num_bytes would likely need more careful consideration as I suspect it might be used in some performance critical sections.

}
#[allow(deprecated)]
Encoding::BIT_PACKED => {
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@ impl<T: DataType> Decoder<T> for DictDecoder<T> {
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
// First byte in `data` is bit width
let bit_width = data.as_ref()[0];
if bit_width > 32 {
return Err(general_err!(
"Invalid or corrupted Bit width {}. Max allowed is 32",
bit_width
));
}
let mut rle_decoder = RleDecoder::new(bit_width);
rle_decoder.set_data(data.slice(1..));
self.num_values = num_values;
Expand Down
5 changes: 4 additions & 1 deletion parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,10 @@ impl RleDecoder {
self.rle_left = (indicator_value >> 1) as u32;
let value_width = bit_util::ceil(self.bit_width as usize, 8);
self.current_value = bit_reader.get_aligned::<u64>(value_width);
assert!(self.current_value.is_some());
assert!(
self.current_value.is_some(),
"parquet_data_error: not enough data for RLE decoding"
);
}
true
} else {
Expand Down
15 changes: 15 additions & 0 deletions parquet/src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,26 @@ impl ChunkReader for Bytes {

fn get_read(&self, start: u64) -> Result<Self::T> {
let start = start as usize;
if start > self.len() {
return Err(eof_err!(
"Expected to read at offset {}, while file has length {}",
start,
self.len()
));
}
Ok(self.slice(start..).reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let start = start as usize;
if start > self.len() || start + length > self.len() {
return Err(eof_err!(
"Expected to read {} bytes at offset {}, while file has length {}",
length,
start,
self.len()
));
}
Ok(self.slice(start..start + length))
}
}
Expand Down
5 changes: 4 additions & 1 deletion parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ pub(crate) fn decode_page(
let buffer = match decompressor {
Some(decompressor) if can_decompress => {
let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
if offset > buffer.len() || offset > uncompressed_page_size {
return Err(general_err!("Invalid page header"));
}
let decompressed_size = uncompressed_page_size - offset;
let mut decompressed = Vec::with_capacity(uncompressed_page_size);
decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
Expand Down Expand Up @@ -458,7 +461,7 @@ pub(crate) fn decode_page(
}
_ => {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
unimplemented!("Page type {:?} is not supported", page_header.r#type)
return Err(general_err!("Page type {:?} is not supported", page_header.r#type));
}
};

Expand Down
2 changes: 2 additions & 0 deletions parquet/src/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,8 @@ fn schema_from_array_helper<'a>(
if !is_root_node {
builder = builder.with_repetition(rep);
}
} else if !is_root_node {
return Err(general_err!("Repetition level must be defined for non-root types"));
}
Ok((next_index, Arc::new(builder.build().unwrap())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know the unwrap is safe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build never returns an Err 😉. But good point, could replace unwrap with ?.

}
Expand Down
6 changes: 4 additions & 2 deletions parquet/tests/arrow_reader/bad_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ fn test_parquet_1481() {
}

#[test]
#[should_panic(expected = "assertion failed: self.current_value.is_some()")]
fn test_arrow_gh_41321() {
let err = read_file("ARROW-GH-41321.parquet").unwrap_err();
assert_eq!(err.to_string(), "TBD (currently panics)");
assert_eq!(
err.to_string(),
"External: Parquet argument error: Parquet error: Invalid or corrupted Bit width 254. Max allowed is 32"
);
}

#[test]
Expand Down
Loading