Skip to content

Commit a22440e

Browse files
authored
feat(puffin): Add PuffinReader (#892)
Part of #744 # Summary - Add PuffinReader # Context - This is the fourth of a number of PRs to add support for Iceberg Puffin file format. - It might be helpful to refer to the overarching [PR](#714) from which these changes were split to understand better how these changes will fit in to the larger picture. - It may also be helpful to refer to the Java reference implementation for PuffinReader [here](https://github.com/apache/iceberg/blob/8cd5b1985d3f9c55ab2ced174559a8416b6ca1b4/core/src/main/java/org/apache/iceberg/puffin/PuffinReader.java#L123).
1 parent b454ce6 commit a22440e

File tree

5 files changed

+193
-1
lines changed

5 files changed

+193
-1
lines changed

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ serde_derive = { workspace = true }
7878
serde_json = { workspace = true }
7979
serde_repr = { workspace = true }
8080
serde_with = { workspace = true }
81-
tokio = { workspace = true, optional = true }
81+
tokio = { workspace = true, optional = true, features = ["sync"] }
8282
typed-builder = { workspace = true }
8383
url = { workspace = true }
8484
uuid = { workspace = true }

crates/iceberg/src/puffin/blob.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
/// A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library.
21+
pub(crate) const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1";
22+
23+
/// The blob
24+
#[derive(Debug, PartialEq, Clone)]
25+
pub(crate) struct Blob {
26+
/// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
27+
pub(crate) r#type: String,
28+
/// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
29+
pub(crate) fields: Vec<i32>,
30+
/// ID of the Iceberg table's snapshot the blob was computed from
31+
pub(crate) snapshot_id: i64,
32+
/// Sequence number of the Iceberg table's snapshot the blob was computed from
33+
pub(crate) sequence_number: i64,
34+
/// The uncompressed blob data
35+
pub(crate) data: Vec<u8>,
36+
/// Arbitrary meta-information about the blob
37+
pub(crate) properties: HashMap<String, String>,
38+
}

crates/iceberg/src/puffin/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
// Temporarily allowing this while crate is under active development
2222
#![allow(dead_code)]
2323

24+
mod blob;
2425
mod compression;
2526
mod metadata;
27+
#[cfg(feature = "tokio")]
28+
mod reader;
2629

2730
#[cfg(test)]
2831
mod test_utils;
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use tokio::sync::OnceCell;
19+
20+
use crate::io::{FileRead, InputFile};
21+
use crate::puffin::blob::Blob;
22+
use crate::puffin::metadata::{BlobMetadata, FileMetadata};
23+
use crate::Result;
24+
25+
/// Puffin reader
26+
pub(crate) struct PuffinReader {
27+
input_file: InputFile,
28+
file_metadata: OnceCell<FileMetadata>,
29+
}
30+
31+
impl PuffinReader {
32+
/// Returns a new Puffin reader
33+
pub(crate) fn new(input_file: InputFile) -> Self {
34+
Self {
35+
input_file,
36+
file_metadata: OnceCell::new(),
37+
}
38+
}
39+
40+
/// Returns file metadata
41+
pub(crate) async fn file_metadata(&self) -> Result<&FileMetadata> {
42+
self.file_metadata
43+
.get_or_try_init(|| FileMetadata::read(&self.input_file))
44+
.await
45+
}
46+
47+
/// Returns blob
48+
pub(crate) async fn blob(&self, blob_metadata: &BlobMetadata) -> Result<Blob> {
49+
let file_read = self.input_file.reader().await?;
50+
let start = blob_metadata.offset;
51+
let end = start + blob_metadata.length;
52+
let bytes = file_read.read(start..end).await?.to_vec();
53+
let data = blob_metadata.compression_codec.decompress(bytes)?;
54+
55+
Ok(Blob {
56+
r#type: blob_metadata.r#type.clone(),
57+
fields: blob_metadata.fields.clone(),
58+
snapshot_id: blob_metadata.snapshot_id,
59+
sequence_number: blob_metadata.sequence_number,
60+
data,
61+
properties: blob_metadata.properties.clone(),
62+
})
63+
}
64+
}
65+
66+
#[cfg(test)]
67+
mod tests {
68+
69+
use crate::puffin::reader::PuffinReader;
70+
use crate::puffin::test_utils::{
71+
blob_0, blob_1, java_uncompressed_metric_input_file,
72+
java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
73+
zstd_compressed_metric_file_metadata,
74+
};
75+
76+
#[tokio::test]
77+
async fn test_puffin_reader_uncompressed_metric_data() {
78+
let input_file = java_uncompressed_metric_input_file();
79+
let puffin_reader = PuffinReader::new(input_file);
80+
81+
let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
82+
assert_eq!(file_metadata, uncompressed_metric_file_metadata());
83+
84+
assert_eq!(
85+
puffin_reader
86+
.blob(file_metadata.blobs.first().unwrap())
87+
.await
88+
.unwrap(),
89+
blob_0()
90+
);
91+
92+
assert_eq!(
93+
puffin_reader
94+
.blob(file_metadata.blobs.get(1).unwrap())
95+
.await
96+
.unwrap(),
97+
blob_1(),
98+
)
99+
}
100+
101+
#[tokio::test]
102+
async fn test_puffin_reader_zstd_compressed_metric_data() {
103+
let input_file = java_zstd_compressed_metric_input_file();
104+
let puffin_reader = PuffinReader::new(input_file);
105+
106+
let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
107+
assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
108+
109+
assert_eq!(
110+
puffin_reader
111+
.blob(file_metadata.blobs.first().unwrap())
112+
.await
113+
.unwrap(),
114+
blob_0()
115+
);
116+
117+
assert_eq!(
118+
puffin_reader
119+
.blob(file_metadata.blobs.get(1).unwrap())
120+
.await
121+
.unwrap(),
122+
blob_1(),
123+
)
124+
}
125+
}

crates/iceberg/src/puffin/test_utils.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::collections::HashMap;
1919

20+
use super::blob::Blob;
2021
use crate::io::{FileIOBuilder, InputFile};
2122
use crate::puffin::compression::CompressionCodec;
2223
use crate::puffin::metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY};
@@ -68,6 +69,7 @@ pub(crate) const METRIC_BLOB_0_TYPE: &str = "some-blob";
6869
pub(crate) const METRIC_BLOB_0_INPUT_FIELDS: [i32; 1] = [1];
6970
pub(crate) const METRIC_BLOB_0_SNAPSHOT_ID: i64 = 2;
7071
pub(crate) const METRIC_BLOB_0_SEQUENCE_NUMBER: i64 = 1;
72+
pub(crate) const METRIC_BLOB_0_DATA: &str = "abcdefghi";
7173

7274
pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata {
7375
BlobMetadata {
@@ -95,10 +97,23 @@ pub(crate) fn uncompressed_metric_blob_0_metadata() -> BlobMetadata {
9597
}
9698
}
9799

100+
pub(crate) fn blob_0() -> Blob {
101+
Blob {
102+
r#type: METRIC_BLOB_0_TYPE.to_string(),
103+
fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(),
104+
snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID,
105+
sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER,
106+
data: METRIC_BLOB_0_DATA.as_bytes().to_vec(),
107+
properties: HashMap::new(),
108+
}
109+
}
110+
98111
pub(crate) const METRIC_BLOB_1_TYPE: &str = "some-other-blob";
99112
pub(crate) const METRIC_BLOB_1_INPUT_FIELDS: [i32; 1] = [2];
100113
pub(crate) const METRIC_BLOB_1_SNAPSHOT_ID: i64 = 2;
101114
pub(crate) const METRIC_BLOB_1_SEQUENCE_NUMBER: i64 = 1;
115+
pub(crate) const METRIC_BLOB_1_DATA: &str =
116+
"some blob \u{0000} binary data 🤯 that is not very very very very very very long, is it?";
102117

103118
pub(crate) fn uncompressed_metric_blob_1_metadata() -> BlobMetadata {
104119
BlobMetadata {
@@ -126,6 +141,17 @@ pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata {
126141
}
127142
}
128143

144+
pub(crate) fn blob_1() -> Blob {
145+
Blob {
146+
r#type: METRIC_BLOB_1_TYPE.to_string(),
147+
fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(),
148+
snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID,
149+
sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER,
150+
data: METRIC_BLOB_1_DATA.as_bytes().to_vec(),
151+
properties: HashMap::new(),
152+
}
153+
}
154+
129155
pub(crate) const CREATED_BY_PROPERTY_VALUE: &str = "Test 1234";
130156

131157
pub(crate) fn file_properties() -> HashMap<String, String> {

0 commit comments

Comments
 (0)