Skip to content

Commit 8a94602

Browse files
committed
feat(file_index): add basic bitmap index
1 parent 4275c6b commit 8a94602

File tree

4 files changed

+358
-0
lines changed

4 files changed

+358
-0
lines changed

crates/paimon/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ bytes = "1.7.1"
4040
bitflags = "2.6.0"
4141
tokio = { version = "1.39.2", features = ["macros"] }
4242
chrono = { version = "0.4.38", features = ["serde"] }
43+
roaring = "0.10.6"
44+
indexmap = { version = "2.5.0", features = ["serde"] }
4345
serde = { version = "1", features = ["derive"] }
4446
serde_bytes = "0.11.15"
4547
serde_json = "1.0.120"

crates/paimon/src/error.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,20 @@ pub enum Error {
6565
display("Paimon hitting invalid file index format: {}", message)
6666
)]
6767
FileIndexFormatInvalid { message: String },
68+
#[snafu(visibility(pub(crate)), display("Serialization error: {}", source))]
69+
SerializationError { source: serde_json::Error },
70+
#[snafu(visibility(pub(crate)), display("Deserialization error: {}", source))]
71+
DeserializationError { source: serde_json::Error },
72+
#[snafu(
73+
visibility(pub(crate)),
74+
display("Roaring bitmap serialization error: {}", source)
75+
)]
76+
BitmapSerializationError { source: std::io::Error },
77+
#[snafu(
78+
visibility(pub(crate)),
79+
display("Bitmap deserialization error: {}", source)
80+
)]
81+
BitmapDeserializationError { source: std::io::Error },
6882
}
6983

7084
impl From<opendal::Error> for Error {
Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::hash::Hash;
19+
use std::sync::Arc;
20+
21+
use crate::error::{BitmapSerializationSnafu, DeserializationSnafu, SerializationSnafu};
22+
use crate::io::{FileRead, InputFile};
23+
24+
use bytes::{Buf, BufMut, Bytes, BytesMut};
25+
use indexmap::IndexMap;
26+
use roaring::RoaringBitmap;
27+
use serde::de::DeserializeOwned;
28+
use serde::{Deserialize, Serialize};
29+
use snafu::ResultExt;
30+
31+
const BITMAP_VERSION_1: u8 = 1;
32+
33+
#[derive(Serialize, Deserialize)]
34+
struct BitmapFileIndexMeta<K>
35+
where
36+
K: Serialize + DeserializeOwned + Clone + Eq + Hash,
37+
{
38+
row_count: u32,
39+
non_null_bitmap_number: u32,
40+
has_null_value: bool,
41+
null_value_offset: Option<i64>,
42+
#[serde(with = "indexmap::map::serde_seq")]
43+
bitmap_offsets: IndexMap<K, i64>,
44+
}
45+
46+
pub struct BitmapFileIndexWriter<K>
47+
where
48+
K: Serialize + DeserializeOwned + Clone + Eq + Hash,
49+
{
50+
id2bitmap: IndexMap<K, RoaringBitmap>,
51+
null_bitmap: RoaringBitmap,
52+
row_number: u32,
53+
}
54+
55+
impl<K> BitmapFileIndexWriter<K>
56+
where
57+
K: Serialize + DeserializeOwned + Clone + Eq + Hash,
58+
{
59+
pub fn new() -> Self {
60+
Self {
61+
id2bitmap: IndexMap::new(),
62+
null_bitmap: RoaringBitmap::new(),
63+
row_number: 0,
64+
}
65+
}
66+
67+
pub fn write(&mut self, key: Option<K>) {
68+
if let Some(key) = key {
69+
self.id2bitmap
70+
.entry(key)
71+
.or_insert_with(RoaringBitmap::new)
72+
.insert(self.row_number);
73+
} else {
74+
self.null_bitmap.insert(self.row_number);
75+
}
76+
self.row_number += 1;
77+
}
78+
79+
pub fn serialized_bytes(&self) -> crate::Result<Bytes> {
80+
// 1. Serialize the null_bitmap
81+
let mut null_bitmap_bytes = Vec::new();
82+
self.null_bitmap
83+
.serialize_into(&mut null_bitmap_bytes)
84+
.context(BitmapSerializationSnafu)?;
85+
let null_bitmap_size = null_bitmap_bytes.len();
86+
87+
// 2. Serialize each bitmap and calculate total size
88+
let mut bitmap_offsets = IndexMap::new();
89+
let mut serialized_bitmaps = Vec::new();
90+
let mut total_bitmap_size = 0;
91+
92+
for (key, bitmap) in &self.id2bitmap {
93+
if bitmap.len() == 1 {
94+
// Single value bitmap, offset is negative
95+
let value = -1_i64 - bitmap.iter().next().unwrap() as i64;
96+
bitmap_offsets.insert(key.clone(), value);
97+
} else {
98+
let mut bitmap_bytes = Vec::new();
99+
bitmap
100+
.serialize_into(&mut bitmap_bytes)
101+
.context(BitmapSerializationSnafu)?;
102+
let bitmap_size = bitmap_bytes.len();
103+
serialized_bitmaps.push((bitmap_bytes, bitmap_size));
104+
bitmap_offsets.insert(key.clone(), total_bitmap_size as i64);
105+
total_bitmap_size += bitmap_size;
106+
}
107+
}
108+
109+
// 3. Handle null bitmap offset
110+
let null_value_offset = if !self.null_bitmap.is_empty() {
111+
Some(if self.null_bitmap.len() == 1 {
112+
-1_i64 - self.null_bitmap.iter().next().unwrap() as i64
113+
} else {
114+
0_i64
115+
})
116+
} else {
117+
None
118+
};
119+
120+
// 4. Create metadata and serialize it
121+
let meta = BitmapFileIndexMeta {
122+
row_count: self.row_number,
123+
non_null_bitmap_number: self.id2bitmap.len() as u32,
124+
has_null_value: !self.null_bitmap.is_empty(),
125+
null_value_offset,
126+
bitmap_offsets,
127+
};
128+
129+
let meta_bytes = serde_json::to_vec(&meta).context(SerializationSnafu)?;
130+
let meta_size = meta_bytes.len();
131+
132+
// 5. Calculate total size
133+
let version_size = 1; // BITMAP_VERSION_1 is a single byte
134+
let meta_size_size = 8; // u64
135+
let total_size = version_size
136+
+ meta_size_size
137+
+ meta_size
138+
+ if self.null_bitmap.len() > 1 {
139+
null_bitmap_size
140+
} else {
141+
0
142+
}
143+
+ total_bitmap_size;
144+
145+
// 6. Allocate buffer with total_size
146+
let mut output = BytesMut::with_capacity(total_size);
147+
148+
// 7. Write data into buffer
149+
// Write version
150+
output.put_u8(BITMAP_VERSION_1);
151+
152+
// Write meta_size as u64
153+
output.put_u64_le(meta_size as u64);
154+
155+
// Write metadata
156+
output.put_slice(&meta_bytes);
157+
158+
// Write null_bitmap if necessary
159+
if self.null_bitmap.len() > 1 {
160+
output.put_slice(&null_bitmap_bytes);
161+
}
162+
163+
// Write all bitmaps
164+
for (bitmap_bytes, _size) in serialized_bitmaps {
165+
output.put_slice(&bitmap_bytes);
166+
}
167+
168+
Ok(output.freeze())
169+
}
170+
}
171+
172+
pub struct BitmapFileIndexReader<K>
173+
where
174+
K: Serialize + DeserializeOwned + Clone + Eq + Hash,
175+
{
176+
input_file: Arc<InputFile>,
177+
meta: BitmapFileIndexMeta<K>,
178+
bitmaps: IndexMap<K, RoaringBitmap>,
179+
null_bitmap: Option<RoaringBitmap>,
180+
body_offset: u64,
181+
}
182+
183+
impl<K> BitmapFileIndexReader<K>
184+
where
185+
K: Serialize + DeserializeOwned + Clone + Eq + Hash,
186+
{
187+
pub async fn new(input_file: Arc<InputFile>) -> crate::Result<Self> {
188+
let input = input_file.read().await?;
189+
let mut buf = input.clone();
190+
191+
if buf.remaining() < 1 {
192+
return Err(crate::Error::FileIndexFormatInvalid {
193+
message: "File too small to contain version byte".to_string(),
194+
});
195+
}
196+
let version = buf.get_u8();
197+
if version != BITMAP_VERSION_1 {
198+
return Err(crate::Error::FileIndexFormatInvalid {
199+
message: format!("Unsupported version: {}", version),
200+
});
201+
}
202+
203+
if buf.remaining() < 8 {
204+
return Err(crate::Error::FileIndexFormatInvalid {
205+
message: "File too small to contain meta_size".to_string(),
206+
});
207+
}
208+
let meta_size = buf.get_u64_le() as usize;
209+
210+
if buf.remaining() < meta_size {
211+
return Err(crate::Error::FileIndexFormatInvalid {
212+
message: "File too small to contain metadata".to_string(),
213+
});
214+
}
215+
let meta_bytes = buf.copy_to_bytes(meta_size);
216+
217+
let meta: BitmapFileIndexMeta<K> =
218+
serde_json::from_slice(&meta_bytes).context(DeserializationSnafu)?;
219+
220+
let body_offset = input.len() - buf.remaining();
221+
222+
Ok(Self {
223+
input_file,
224+
meta,
225+
bitmaps: IndexMap::new(),
226+
null_bitmap: None,
227+
body_offset: body_offset as u64,
228+
})
229+
}
230+
231+
pub async fn get_bitmap(&mut self, key: Option<&K>) -> crate::Result<RoaringBitmap> {
232+
if let Some(key) = key {
233+
if let Some(bitmap) = self.bitmaps.get(key) {
234+
return Ok(bitmap.clone());
235+
}
236+
if let Some(&offset) = self.meta.bitmap_offsets.get(key) {
237+
let bitmap = self.read_bitmap(offset).await?;
238+
self.bitmaps.insert(key.clone(), bitmap.clone());
239+
Ok(bitmap)
240+
} else {
241+
Ok(RoaringBitmap::new())
242+
}
243+
} else {
244+
if let Some(bitmap) = &self.null_bitmap {
245+
return Ok(bitmap.clone());
246+
}
247+
if let Some(offset) = self.meta.null_value_offset {
248+
let bitmap = self.read_bitmap(offset).await?;
249+
self.null_bitmap = Some(bitmap.clone());
250+
Ok(bitmap)
251+
} else {
252+
Ok(RoaringBitmap::new())
253+
}
254+
}
255+
}
256+
257+
async fn read_bitmap(&self, offset: i64) -> crate::Result<RoaringBitmap> {
258+
if offset < 0 {
259+
let index = (-1 - offset) as u32;
260+
let mut bitmap = RoaringBitmap::new();
261+
bitmap.insert(index);
262+
Ok(bitmap)
263+
} else {
264+
let bitmap_pos = self.body_offset as i64 + offset;
265+
let file_meta = self.input_file.metadata().await?;
266+
if bitmap_pos < 0 {
267+
return Err(crate::Error::FileIndexFormatInvalid {
268+
message: format!("Invalid bitmap offset: {}", bitmap_pos),
269+
});
270+
}
271+
272+
if bitmap_pos as u64 > file_meta.size {
273+
return Err(crate::Error::FileIndexFormatInvalid {
274+
message: format!(
275+
"Bitmap offset {} exceeds file size {}",
276+
bitmap_pos, file_meta.size
277+
),
278+
});
279+
}
280+
281+
let reader = self.input_file.reader().await?;
282+
let range = bitmap_pos as u64..file_meta.size;
283+
let buf = reader.read(range).await?;
284+
let bitmap = RoaringBitmap::deserialize_from(&mut &buf[..])
285+
.map_err(|e| crate::Error::BitmapDeserializationError { source: e })?;
286+
287+
Ok(bitmap)
288+
}
289+
}
290+
}
291+
292+
#[cfg(test)]
293+
mod basic_bitmap_index_test {
294+
use super::*;
295+
use crate::io::FileIO;
296+
use bytes::Bytes;
297+
use std::sync::Arc;
298+
299+
#[tokio::test]
300+
async fn test_basic_bitmap_index_read_write() -> crate::Result<()> {
301+
let path = "memory:/tmp/test_basic_bitmap_index";
302+
let file_io = FileIO::from_url(path)?.build()?;
303+
304+
let mut writer = BitmapFileIndexWriter::<String>::new();
305+
306+
writer.write(Some("key1".to_string()));
307+
writer.write(None);
308+
writer.write(Some("key2".to_string()));
309+
writer.write(Some("key1".to_string()));
310+
311+
let bytes = writer.serialized_bytes()?;
312+
313+
let output = file_io.new_output(path)?;
314+
let mut file_writer = output.writer().await?;
315+
file_writer.write(Bytes::from(bytes)).await?;
316+
file_writer.close().await?;
317+
318+
let input_file = output.to_input_file();
319+
320+
let mut reader = BitmapFileIndexReader::<String>::new(Arc::new(input_file)).await?;
321+
322+
let bitmap_key1 = reader.get_bitmap(Some(&"key1".to_string())).await?;
323+
assert_eq!(bitmap_key1.len(), 2);
324+
assert!(bitmap_key1.contains(0));
325+
assert!(bitmap_key1.contains(3));
326+
327+
let bitmap_key2 = reader.get_bitmap(Some(&"key2".to_string())).await?;
328+
assert_eq!(bitmap_key2.len(), 1);
329+
assert!(bitmap_key2.contains(2));
330+
331+
let bitmap_none = reader.get_bitmap(None).await?;
332+
assert_eq!(bitmap_none.len(), 1);
333+
assert!(bitmap_none.contains(1));
334+
335+
file_io.delete_file(path).await?;
336+
337+
Ok(())
338+
}
339+
}

crates/paimon/src/file_index/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,6 @@
1717

1818
mod file_index_format;
1919
pub use file_index_format::*;
20+
21+
mod bitmap_index;
22+
pub use bitmap_index::*;

0 commit comments

Comments
 (0)