Skip to content

Commit ea01119

Browse files
committed
feat(file_index): add basic bitmap index
1 parent 4275c6b commit ea01119

File tree

4 files changed

+366
-0
lines changed

4 files changed

+366
-0
lines changed

crates/paimon/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ bytes = "1.7.1"
4040
bitflags = "2.6.0"
4141
tokio = { version = "1.39.2", features = ["macros"] }
4242
chrono = { version = "0.4.38", features = ["serde"] }
43+
roaring = "0.10.6"
44+
indexmap = { version = "2.5.0", features = ["serde"] }
4345
serde = { version = "1", features = ["derive"] }
4446
serde_bytes = "0.11.15"
4547
serde_json = "1.0.120"

crates/paimon/src/error.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,20 @@ pub enum Error {
6565
display("Paimon hitting invalid file index format: {}", message)
6666
)]
6767
FileIndexFormatInvalid { message: String },
68+
#[snafu(visibility(pub(crate)), display("Serialization error: {}", source))]
69+
SerializationError { source: serde_json::Error },
70+
#[snafu(visibility(pub(crate)), display("Deserialization error: {}", source))]
71+
DeserializationError { source: serde_json::Error },
72+
#[snafu(
73+
visibility(pub(crate)),
74+
display("Roaring bitmap serialization error: {}", source)
75+
)]
76+
BitmapSerializationError { source: std::io::Error },
77+
#[snafu(
78+
visibility(pub(crate)),
79+
display("Bitmap deserialization error: {}", source)
80+
)]
81+
BitmapDeserializationError { source: std::io::Error },
6882
}
6983

7084
impl From<opendal::Error> for Error {
Lines changed: 347 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,347 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::hash::Hash;
19+
use std::sync::Arc;
20+
21+
use crate::error::{BitmapSerializationSnafu, DeserializationSnafu, SerializationSnafu};
22+
use crate::io::{FileRead, InputFile};
23+
24+
use bytes::{Buf, BufMut, Bytes, BytesMut};
25+
use indexmap::IndexMap;
26+
use roaring::RoaringBitmap;
27+
use serde::de::DeserializeOwned;
28+
use serde::{Deserialize, Serialize};
29+
use snafu::ResultExt;
30+
31+
const BITMAP_VERSION_1: u8 = 1;
32+
33+
#[derive(Serialize, Deserialize)]
34+
struct BitmapFileIndexMeta<K>
35+
where
36+
K: Serialize + DeserializeOwned + Clone + Eq + Hash,
37+
{
38+
row_count: u32,
39+
non_null_bitmap_number: u32,
40+
has_null_value: bool,
41+
null_value_offset: Option<i64>,
42+
#[serde(with = "indexmap::map::serde_seq")]
43+
bitmap_offsets: IndexMap<K, i64>,
44+
}
45+
46+
pub struct BitmapFileIndexWriter<K>
47+
where
48+
K: Serialize + DeserializeOwned + Clone + Eq + Hash,
49+
{
50+
id2bitmap: IndexMap<K, RoaringBitmap>,
51+
null_bitmap: RoaringBitmap,
52+
row_number: u32,
53+
}
54+
55+
impl<K> Default for BitmapFileIndexWriter<K>
56+
where
57+
K: Serialize + DeserializeOwned + Clone + Eq + Hash,
58+
{
59+
fn default() -> Self {
60+
Self::new()
61+
}
62+
}
63+
64+
impl<K> BitmapFileIndexWriter<K>
65+
where
66+
K: Serialize + DeserializeOwned + Clone + Eq + Hash,
67+
{
68+
pub fn new() -> Self {
69+
Self {
70+
id2bitmap: IndexMap::new(),
71+
null_bitmap: RoaringBitmap::new(),
72+
row_number: 0,
73+
}
74+
}
75+
76+
pub fn write(&mut self, key: Option<K>) {
77+
if let Some(key) = key {
78+
self.id2bitmap
79+
.entry(key)
80+
.or_default()
81+
.insert(self.row_number);
82+
} else {
83+
self.null_bitmap.insert(self.row_number);
84+
}
85+
self.row_number += 1;
86+
}
87+
88+
pub fn serialized_bytes(&self) -> crate::Result<Bytes> {
89+
// 1. Serialize the null_bitmap
90+
let mut null_bitmap_bytes = Vec::new();
91+
self.null_bitmap
92+
.serialize_into(&mut null_bitmap_bytes)
93+
.context(BitmapSerializationSnafu)?;
94+
let null_bitmap_size = null_bitmap_bytes.len();
95+
96+
// 2. Serialize each bitmap and calculate total size
97+
let mut bitmap_offsets = IndexMap::new();
98+
let mut serialized_bitmaps = Vec::new();
99+
let mut total_bitmap_size = 0;
100+
101+
for (key, bitmap) in &self.id2bitmap {
102+
if bitmap.len() == 1 {
103+
// Single value bitmap, offset is negative
104+
let value = -1_i64 - bitmap.iter().next().unwrap() as i64;
105+
bitmap_offsets.insert(key.clone(), value);
106+
} else {
107+
let mut bitmap_bytes = Vec::new();
108+
bitmap
109+
.serialize_into(&mut bitmap_bytes)
110+
.context(BitmapSerializationSnafu)?;
111+
let bitmap_size = bitmap_bytes.len();
112+
serialized_bitmaps.push((bitmap_bytes, bitmap_size));
113+
bitmap_offsets.insert(key.clone(), total_bitmap_size as i64);
114+
total_bitmap_size += bitmap_size;
115+
}
116+
}
117+
118+
// 3. Handle null bitmap offset
119+
let null_value_offset = if !self.null_bitmap.is_empty() {
120+
Some(if self.null_bitmap.len() == 1 {
121+
-1_i64 - self.null_bitmap.iter().next().unwrap() as i64
122+
} else {
123+
0_i64
124+
})
125+
} else {
126+
None
127+
};
128+
129+
// 4. Create metadata and serialize it
130+
let meta = BitmapFileIndexMeta {
131+
row_count: self.row_number,
132+
non_null_bitmap_number: self.id2bitmap.len() as u32,
133+
has_null_value: !self.null_bitmap.is_empty(),
134+
null_value_offset,
135+
bitmap_offsets,
136+
};
137+
138+
let meta_bytes = serde_json::to_vec(&meta).context(SerializationSnafu)?;
139+
let meta_size = meta_bytes.len();
140+
141+
// 5. Calculate total size
142+
let version_size = 1; // BITMAP_VERSION_1 is a single byte
143+
let meta_size_size = 8; // u64
144+
let total_size = version_size
145+
+ meta_size_size
146+
+ meta_size
147+
+ if self.null_bitmap.len() > 1 {
148+
null_bitmap_size
149+
} else {
150+
0
151+
}
152+
+ total_bitmap_size;
153+
154+
// 6. Allocate buffer with total_size
155+
let mut output = BytesMut::with_capacity(total_size);
156+
157+
// 7. Write data into buffer
158+
// Write version
159+
output.put_u8(BITMAP_VERSION_1);
160+
161+
// Write meta_size as u64
162+
output.put_u64_le(meta_size as u64);
163+
164+
// Write metadata
165+
output.put_slice(&meta_bytes);
166+
167+
// Write null_bitmap if necessary
168+
if self.null_bitmap.len() > 1 {
169+
output.put_slice(&null_bitmap_bytes);
170+
}
171+
172+
// Write all bitmaps
173+
for (bitmap_bytes, _size) in serialized_bitmaps {
174+
output.put_slice(&bitmap_bytes);
175+
}
176+
177+
Ok(output.freeze())
178+
}
179+
}
180+
181+
pub struct BitmapFileIndexReader<K>
182+
where
183+
K: Serialize + DeserializeOwned + Clone + Eq + Hash,
184+
{
185+
input_file: Arc<InputFile>,
186+
meta: BitmapFileIndexMeta<K>,
187+
bitmaps: IndexMap<K, RoaringBitmap>,
188+
null_bitmap: Option<RoaringBitmap>,
189+
body_offset: u64,
190+
}
191+
192+
impl<K> BitmapFileIndexReader<K>
193+
where
194+
K: Serialize + DeserializeOwned + Clone + Eq + Hash,
195+
{
196+
pub async fn new(input_file: Arc<InputFile>) -> crate::Result<Self> {
197+
let input = input_file.read().await?;
198+
let mut buf = input.clone();
199+
200+
if buf.remaining() < 1 {
201+
return Err(crate::Error::FileIndexFormatInvalid {
202+
message: "File too small to contain version byte".to_string(),
203+
});
204+
}
205+
let version = buf.get_u8();
206+
if version != BITMAP_VERSION_1 {
207+
return Err(crate::Error::FileIndexFormatInvalid {
208+
message: format!("Unsupported version: {}", version),
209+
});
210+
}
211+
212+
if buf.remaining() < 8 {
213+
return Err(crate::Error::FileIndexFormatInvalid {
214+
message: "File too small to contain meta_size".to_string(),
215+
});
216+
}
217+
let meta_size = buf.get_u64_le() as usize;
218+
219+
if buf.remaining() < meta_size {
220+
return Err(crate::Error::FileIndexFormatInvalid {
221+
message: "File too small to contain metadata".to_string(),
222+
});
223+
}
224+
let meta_bytes = buf.copy_to_bytes(meta_size);
225+
226+
let meta: BitmapFileIndexMeta<K> =
227+
serde_json::from_slice(&meta_bytes).context(DeserializationSnafu)?;
228+
229+
let body_offset = input.len() - buf.remaining();
230+
231+
Ok(Self {
232+
input_file,
233+
meta,
234+
bitmaps: IndexMap::new(),
235+
null_bitmap: None,
236+
body_offset: body_offset as u64,
237+
})
238+
}
239+
240+
pub async fn get_bitmap(&mut self, key: Option<&K>) -> crate::Result<RoaringBitmap> {
241+
if let Some(key) = key {
242+
if let Some(bitmap) = self.bitmaps.get(key) {
243+
return Ok(bitmap.clone());
244+
}
245+
if let Some(&offset) = self.meta.bitmap_offsets.get(key) {
246+
let bitmap = self.read_bitmap(offset).await?;
247+
self.bitmaps.insert(key.clone(), bitmap.clone());
248+
Ok(bitmap)
249+
} else {
250+
Ok(RoaringBitmap::new())
251+
}
252+
} else {
253+
if let Some(bitmap) = &self.null_bitmap {
254+
return Ok(bitmap.clone());
255+
}
256+
if let Some(offset) = self.meta.null_value_offset {
257+
let bitmap = self.read_bitmap(offset).await?;
258+
self.null_bitmap = Some(bitmap.clone());
259+
Ok(bitmap)
260+
} else {
261+
Ok(RoaringBitmap::new())
262+
}
263+
}
264+
}
265+
266+
async fn read_bitmap(&self, offset: i64) -> crate::Result<RoaringBitmap> {
267+
if offset < 0 {
268+
let index = (-1 - offset) as u32;
269+
let mut bitmap = RoaringBitmap::new();
270+
bitmap.insert(index);
271+
Ok(bitmap)
272+
} else {
273+
let bitmap_pos = self.body_offset as i64 + offset;
274+
let file_meta = self.input_file.metadata().await?;
275+
if bitmap_pos < 0 {
276+
return Err(crate::Error::FileIndexFormatInvalid {
277+
message: format!("Invalid bitmap offset: {}", bitmap_pos),
278+
});
279+
}
280+
281+
if bitmap_pos as u64 > file_meta.size {
282+
return Err(crate::Error::FileIndexFormatInvalid {
283+
message: format!(
284+
"Bitmap offset {} exceeds file size {}",
285+
bitmap_pos, file_meta.size
286+
),
287+
});
288+
}
289+
290+
let reader = self.input_file.reader().await?;
291+
let range = bitmap_pos as u64..file_meta.size;
292+
let buf = reader.read(range).await?;
293+
let bitmap = RoaringBitmap::deserialize_from(&mut &buf[..])
294+
.map_err(|e| crate::Error::BitmapDeserializationError { source: e })?;
295+
296+
Ok(bitmap)
297+
}
298+
}
299+
}
300+
301+
#[cfg(test)]
302+
mod basic_bitmap_index_test {
303+
use super::*;
304+
use crate::io::FileIO;
305+
use std::sync::Arc;
306+
307+
#[tokio::test]
308+
async fn test_basic_bitmap_index_read_write() -> crate::Result<()> {
309+
let path = "memory:/tmp/test_basic_bitmap_index";
310+
let file_io = FileIO::from_url(path)?.build()?;
311+
312+
let mut writer = BitmapFileIndexWriter::<String>::new();
313+
314+
writer.write(Some("key1".to_string()));
315+
writer.write(None);
316+
writer.write(Some("key2".to_string()));
317+
writer.write(Some("key1".to_string()));
318+
319+
let bytes = writer.serialized_bytes()?;
320+
321+
let output = file_io.new_output(path)?;
322+
let mut file_writer = output.writer().await?;
323+
file_writer.write(bytes).await?;
324+
file_writer.close().await?;
325+
326+
let input_file = output.to_input_file();
327+
328+
let mut reader = BitmapFileIndexReader::<String>::new(Arc::new(input_file)).await?;
329+
330+
let bitmap_key1 = reader.get_bitmap(Some(&"key1".to_string())).await?;
331+
assert_eq!(bitmap_key1.len(), 2);
332+
assert!(bitmap_key1.contains(0));
333+
assert!(bitmap_key1.contains(3));
334+
335+
let bitmap_key2 = reader.get_bitmap(Some(&"key2".to_string())).await?;
336+
assert_eq!(bitmap_key2.len(), 1);
337+
assert!(bitmap_key2.contains(2));
338+
339+
let bitmap_none = reader.get_bitmap(None).await?;
340+
assert_eq!(bitmap_none.len(), 1);
341+
assert!(bitmap_none.contains(1));
342+
343+
file_io.delete_file(path).await?;
344+
345+
Ok(())
346+
}
347+
}

crates/paimon/src/file_index/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,6 @@
1717

1818
mod file_index_format;
1919
pub use file_index_format::*;
20+
21+
mod bitmap_index;
22+
pub use bitmap_index::*;

0 commit comments

Comments
 (0)