Skip to content

Commit d941ff1

Browse files
authored
Support parquet bloom filter length (#4885)
* Support parquet bloom filter length Signed-off-by: Letian Jiang <[email protected]> * update Signed-off-by: Letian Jiang <[email protected]> --------- Signed-off-by: Letian Jiang <[email protected]>
1 parent 3b0ede4 commit d941ff1

File tree

5 files changed

+195
-371
lines changed

5 files changed

+195
-371
lines changed

parquet/src/bloom_filter/mod.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,12 @@ pub struct Sbbf(Vec<Block>);
135135

136136
const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
137137

138-
/// given an initial offset, and a [ChunkReader], try to read out a bloom filter header and return
138+
/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
139139
/// both the header and the offset after it (for bitset).
140-
fn chunk_read_bloom_filter_header_and_offset<R: ChunkReader>(
140+
fn chunk_read_bloom_filter_header_and_offset(
141141
offset: u64,
142-
reader: Arc<R>,
142+
buffer: Bytes,
143143
) -> Result<(BloomFilterHeader, u64), ParquetError> {
144-
let buffer = reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE)?;
145144
let (header, length) = read_bloom_filter_header_and_length(buffer)?;
146145
Ok((header, offset + length))
147146
}
@@ -271,8 +270,13 @@ impl Sbbf {
271270
return Ok(None);
272271
};
273272

273+
let buffer = match column_metadata.bloom_filter_length() {
274+
Some(length) => reader.get_bytes(offset, length as usize),
275+
None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
276+
}?;
277+
274278
let (header, bitset_offset) =
275-
chunk_read_bloom_filter_header_and_offset(offset, reader.clone())?;
279+
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
276280

277281
match header.algorithm {
278282
BloomFilterAlgorithm::BLOCK(_) => {
@@ -289,11 +293,17 @@ impl Sbbf {
289293
// this match exists to future proof the singleton hash enum
290294
}
291295
}
292-
// length in bytes
293-
let length: usize = header.num_bytes.try_into().map_err(|_| {
294-
ParquetError::General("Bloom filter length is invalid".to_string())
295-
})?;
296-
let bitset = reader.get_bytes(bitset_offset, length)?;
296+
297+
let bitset = match column_metadata.bloom_filter_length() {
298+
Some(_) => buffer.slice((bitset_offset - offset) as usize..),
299+
None => {
300+
let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
301+
ParquetError::General("Bloom filter length is invalid".to_string())
302+
})?;
303+
reader.get_bytes(bitset_offset, bitset_length)?
304+
}
305+
};
306+
297307
Ok(Some(Self::new(&bitset)))
298308
}
299309

parquet/src/file/metadata.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,7 @@ pub struct ColumnChunkMetaData {
474474
statistics: Option<Statistics>,
475475
encoding_stats: Option<Vec<PageEncodingStats>>,
476476
bloom_filter_offset: Option<i64>,
477+
bloom_filter_length: Option<i32>,
477478
offset_index_offset: Option<i64>,
478479
offset_index_length: Option<i32>,
479480
column_index_offset: Option<i64>,
@@ -591,6 +592,11 @@ impl ColumnChunkMetaData {
591592
self.bloom_filter_offset
592593
}
593594

595+
/// Returns the offset for the bloom filter.
596+
pub fn bloom_filter_length(&self) -> Option<i32> {
597+
self.bloom_filter_length
598+
}
599+
594600
/// Returns the offset for the column index.
595601
pub fn column_index_offset(&self) -> Option<i64> {
596602
self.column_index_offset
@@ -657,6 +663,7 @@ impl ColumnChunkMetaData {
657663
})
658664
.transpose()?;
659665
let bloom_filter_offset = col_metadata.bloom_filter_offset;
666+
let bloom_filter_length = col_metadata.bloom_filter_length;
660667
let offset_index_offset = cc.offset_index_offset;
661668
let offset_index_length = cc.offset_index_length;
662669
let column_index_offset = cc.column_index_offset;
@@ -677,6 +684,7 @@ impl ColumnChunkMetaData {
677684
statistics,
678685
encoding_stats,
679686
bloom_filter_offset,
687+
bloom_filter_length,
680688
offset_index_offset,
681689
offset_index_length,
682690
column_index_offset,
@@ -722,6 +730,7 @@ impl ColumnChunkMetaData {
722730
.as_ref()
723731
.map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()),
724732
bloom_filter_offset: self.bloom_filter_offset,
733+
bloom_filter_length: self.bloom_filter_length,
725734
}
726735
}
727736

@@ -752,6 +761,7 @@ impl ColumnChunkMetaDataBuilder {
752761
statistics: None,
753762
encoding_stats: None,
754763
bloom_filter_offset: None,
764+
bloom_filter_length: None,
755765
offset_index_offset: None,
756766
offset_index_length: None,
757767
column_index_offset: None,
@@ -837,6 +847,12 @@ impl ColumnChunkMetaDataBuilder {
837847
self
838848
}
839849

850+
/// Sets optional bloom filter length in bytes.
851+
pub fn set_bloom_filter_length(mut self, value: Option<i32>) -> Self {
852+
self.0.bloom_filter_length = value;
853+
self
854+
}
855+
840856
/// Sets optional offset index offset in bytes.
841857
pub fn set_offset_index_offset(mut self, value: Option<i64>) -> Self {
842858
self.0.offset_index_offset = value;
@@ -1053,6 +1069,7 @@ mod tests {
10531069
},
10541070
])
10551071
.set_bloom_filter_offset(Some(6000))
1072+
.set_bloom_filter_length(Some(25))
10561073
.set_offset_index_offset(Some(7000))
10571074
.set_offset_index_length(Some(25))
10581075
.set_column_index_offset(Some(8000))

parquet/src/file/writer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,15 @@ impl<W: Write + Send> SerializedFileWriter<W> {
267267
Some(bloom_filter) => {
268268
let start_offset = self.buf.bytes_written();
269269
bloom_filter.write(&mut self.buf)?;
270+
let end_offset = self.buf.bytes_written();
270271
// set offset and index for bloom filter
271-
column_chunk
272+
let column_chunk_meta = column_chunk
272273
.meta_data
273274
.as_mut()
274-
.expect("can't have bloom filter without column metadata")
275-
.bloom_filter_offset = Some(start_offset as i64);
275+
.expect("can't have bloom filter without column metadata");
276+
column_chunk_meta.bloom_filter_offset = Some(start_offset as i64);
277+
column_chunk_meta.bloom_filter_length =
278+
Some((end_offset - start_offset) as i32);
276279
}
277280
None => {}
278281
}

0 commit comments

Comments
 (0)