Skip to content

Commit

Permalink
feat(puffin): Add PuffinReader
Browse files Browse the repository at this point in the history
  • Loading branch information
fqaiser94 committed Jan 15, 2025
1 parent ae04c8a commit 62a2e6d
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 0 deletions.
38 changes: 38 additions & 0 deletions crates/iceberg/src/puffin/blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

/// A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library.
pub(crate) const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1";

/// The blob
#[derive(Debug, PartialEq, Clone)]
pub(crate) struct Blob {
/// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
pub(crate) r#type: String,
/// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
pub(crate) fields: Vec<i32>,
/// ID of the Iceberg table's snapshot the blob was computed from
pub(crate) snapshot_id: i64,
/// Sequence number of the Iceberg table's snapshot the blob was computed from
pub(crate) sequence_number: i64,
/// The actual blob data
pub(crate) data: Vec<u8>,
/// Arbitrary meta-information about the blob
pub(crate) properties: HashMap<String, String>,
}
2 changes: 2 additions & 0 deletions crates/iceberg/src/puffin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
// Temporarily allowing this while crate is under active development
#![allow(dead_code)]

mod blob;
mod compression;
mod metadata;
mod reader;

#[cfg(test)]
mod test_utils;
126 changes: 126 additions & 0 deletions crates/iceberg/src/puffin/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::io::{FileRead, InputFile};
use crate::puffin::blob::Blob;
use crate::puffin::metadata::{BlobMetadata, FileMetadata};
use crate::Result;

/// Puffin reader
pub(crate) struct PuffinReader {
input_file: InputFile,
file_metadata: Option<FileMetadata>,
}

impl PuffinReader {
/// Returns a new Puffin reader
pub(crate) fn new(input_file: InputFile) -> Self {
Self {
input_file,
file_metadata: None,
}
}

/// Returns file metadata
pub(crate) async fn file_metadata(&mut self) -> Result<&FileMetadata> {
if let Some(ref file_metadata) = self.file_metadata {
Ok(file_metadata)
} else {
let file_metadata = FileMetadata::read(&self.input_file).await?;
Ok(self.file_metadata.insert(file_metadata))
}
}

/// Returns blob
pub(crate) async fn blob(&self, blob_metadata: BlobMetadata) -> Result<Blob> {
let file_read = self.input_file.reader().await?;
let start = blob_metadata.offset;
let end = start + blob_metadata.length;
let bytes = file_read.read(start..end).await?.to_vec();
let data = blob_metadata.compression_codec.decompress(bytes)?;

Ok(Blob {
r#type: blob_metadata.r#type,
fields: blob_metadata.fields,
snapshot_id: blob_metadata.snapshot_id,
sequence_number: blob_metadata.sequence_number,
data,
properties: blob_metadata.properties,
})
}
}

#[cfg(test)]
mod tests {

use crate::puffin::reader::PuffinReader;
use crate::puffin::test_utils::{
blob_0, blob_1, java_uncompressed_metric_input_file,
java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
zstd_compressed_metric_file_metadata,
};

#[tokio::test]
async fn test_puffin_reader_uncompressed_metric_data() {
let input_file = java_uncompressed_metric_input_file();
let mut puffin_reader = PuffinReader::new(input_file);

let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
assert_eq!(file_metadata, uncompressed_metric_file_metadata());

assert_eq!(
puffin_reader
.blob(file_metadata.blobs.first().unwrap().clone())
.await
.unwrap(),
blob_0()
);

assert_eq!(
puffin_reader
.blob(file_metadata.blobs.get(1).unwrap().clone())
.await
.unwrap(),
blob_1(),
)
}

#[tokio::test]
async fn test_puffin_reader_zstd_compressed_metric_data() {
let input_file = java_zstd_compressed_metric_input_file();
let mut puffin_reader = PuffinReader::new(input_file);

let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());

assert_eq!(
puffin_reader
.blob(file_metadata.blobs.first().unwrap().clone())
.await
.unwrap(),
blob_0()
);

assert_eq!(
puffin_reader
.blob(file_metadata.blobs.get(1).unwrap().clone())
.await
.unwrap(),
blob_1(),
)
}
}
26 changes: 26 additions & 0 deletions crates/iceberg/src/puffin/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::collections::HashMap;

use super::blob::Blob;
use crate::io::{FileIOBuilder, InputFile};
use crate::puffin::compression::CompressionCodec;
use crate::puffin::metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY};
Expand Down Expand Up @@ -68,6 +69,7 @@ pub(crate) const METRIC_BLOB_0_TYPE: &str = "some-blob";
pub(crate) const METRIC_BLOB_0_INPUT_FIELDS: [i32; 1] = [1];
pub(crate) const METRIC_BLOB_0_SNAPSHOT_ID: i64 = 2;
pub(crate) const METRIC_BLOB_0_SEQUENCE_NUMBER: i64 = 1;
pub(crate) const METRIC_BLOB_0_DATA: &str = "abcdefghi";

pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata {
BlobMetadata {
Expand Down Expand Up @@ -95,10 +97,23 @@ pub(crate) fn uncompressed_metric_blob_0_metadata() -> BlobMetadata {
}
}

pub(crate) fn blob_0() -> Blob {
Blob {
r#type: METRIC_BLOB_0_TYPE.to_string(),
fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(),
snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID,
sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER,
data: METRIC_BLOB_0_DATA.as_bytes().to_vec(),
properties: HashMap::new(),
}
}

pub(crate) const METRIC_BLOB_1_TYPE: &str = "some-other-blob";
pub(crate) const METRIC_BLOB_1_INPUT_FIELDS: [i32; 1] = [2];
pub(crate) const METRIC_BLOB_1_SNAPSHOT_ID: i64 = 2;
pub(crate) const METRIC_BLOB_1_SEQUENCE_NUMBER: i64 = 1;
pub(crate) const METRIC_BLOB_1_DATA: &str =
"some blob \u{0000} binary data 🤯 that is not very very very very very very long, is it?";

pub(crate) fn uncompressed_metric_blob_1_metadata() -> BlobMetadata {
BlobMetadata {
Expand Down Expand Up @@ -126,6 +141,17 @@ pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata {
}
}

pub(crate) fn blob_1() -> Blob {
Blob {
r#type: METRIC_BLOB_1_TYPE.to_string(),
fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(),
snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID,
sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER,
data: METRIC_BLOB_1_DATA.as_bytes().to_vec(),
properties: HashMap::new(),
}
}

pub(crate) const CREATED_BY_PROPERTY_VALUE: &str = "Test 1234";

pub(crate) fn file_properties() -> HashMap<String, String> {
Expand Down

0 comments on commit 62a2e6d

Please sign in to comment.