Skip to content

Commit

Permalink
Add Puffin reader and writer
Browse files Browse the repository at this point in the history
  • Loading branch information
fqaiser94 committed Nov 30, 2024
1 parent f3a571d commit 61ac9bf
Show file tree
Hide file tree
Showing 18 changed files with 1,810 additions and 8 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"crates/iceberg",
"crates/integration_tests",
"crates/integrations/*",
"crates/puffin",
"crates/test_utils",
]
exclude = ["bindings/python"]
Expand Down Expand Up @@ -98,3 +99,4 @@ uuid = { version = "1.6.1", features = ["v7"] }
volo-thrift = "0.10"
hive_metastore = "0.1"
tera = "1"
zstd = "0.13.2"
2 changes: 2 additions & 0 deletions crates/iceberg/src/writer/file_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::Result;

mod parquet_writer;
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};

mod track_writer;
pub use track_writer::TrackWriter;

pub mod location_generator;

Expand Down
6 changes: 3 additions & 3 deletions crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! The module contains the file writer for parquet file format.
use std::collections::HashMap;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use arrow_schema::SchemaRef as ArrowSchemaRef;
Expand Down Expand Up @@ -81,7 +81,7 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr

async fn build(self) -> crate::Result<Self::R> {
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
let written_size = Arc::new(AtomicI64::new(0));
let written_size = Arc::new(AtomicU64::new(0));
let out_file = self.file_io.new_output(
self.location_generator
.generate_location(&self.file_name_generator.generate_file_name()),
Expand Down Expand Up @@ -214,7 +214,7 @@ pub struct ParquetWriter {
schema: SchemaRef,
out_file: OutputFile,
writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>,
written_size: Arc<AtomicI64>,
written_size: Arc<AtomicU64>,
current_row_num: usize,
}

Expand Down
16 changes: 11 additions & 5 deletions crates/iceberg/src/writer/file_writer/track_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use bytes::Bytes;
Expand All @@ -24,18 +24,24 @@ use crate::io::FileWrite;
use crate::Result;

/// `TrackWriter` is used to track the written size.
pub(crate) struct TrackWriter {
pub struct TrackWriter {
inner: Box<dyn FileWrite>,
written_size: Arc<AtomicI64>,
written_size: Arc<AtomicU64>,
}

impl TrackWriter {
pub fn new(writer: Box<dyn FileWrite>, written_size: Arc<AtomicI64>) -> Self {
/// Create new writer
pub fn new(writer: Box<dyn FileWrite>, written_size: Arc<AtomicU64>) -> Self {
Self {
inner: writer,
written_size,
}
}

/// Number of bytes written so far
pub fn bytes_written(&self) -> u64 {
self.written_size.load(std::sync::atomic::Ordering::SeqCst)
}
}

#[async_trait::async_trait]
Expand All @@ -44,7 +50,7 @@ impl FileWrite for TrackWriter {
let size = bs.len();
self.inner.write(bs).await.map(|v| {
self.written_size
.fetch_add(size as i64, std::sync::atomic::Ordering::Relaxed);
.fetch_add(size as u64, std::sync::atomic::Ordering::Relaxed);
v
})
}
Expand Down
41 changes: 41 additions & 0 deletions crates/puffin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "iceberg-puffin"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
rust-version = { workspace = true }

categories = ["database"]
description = "Apache Iceberg Puffin"
repository = { workspace = true }
license = { workspace = true }
keywords = ["iceberg", "puffin"]

[dependencies]
bytes = { workspace = true }
iceberg = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
zstd = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
tokio = { workspace = true }
38 changes: 38 additions & 0 deletions crates/puffin/src/blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

/// A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library.
pub const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1";

/// The blob
#[derive(Debug, PartialEq, Clone)]
pub struct Blob {
/// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
pub r#type: String,
/// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
pub input_fields: Vec<i32>,
/// ID of the Iceberg table's snapshot the blob was computed from
pub snapshot_id: i64,
/// Sequence number of the Iceberg table's snapshot the blob was computed from
pub sequence_number: i64,
/// The actual blob data
pub data: Vec<u8>,
/// Arbitrary meta-information about the blob
pub properties: HashMap<String, String>,
}
123 changes: 123 additions & 0 deletions crates/puffin/src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use iceberg::{Error, ErrorKind, Result};
use serde::{Deserialize, Serialize};

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Copy)]
#[serde(rename_all = "lowercase")]
#[derive(Default)]
/// Data compression formats
pub enum CompressionCodec {
#[default]
/// No compression
None,
/// LZ4 single compression frame with content size present
Lz4,
/// Zstandard single compression frame with content size present
Zstd,
}

impl CompressionCodec {
pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
match self {
CompressionCodec::None => Ok(bytes),
CompressionCodec::Lz4 => Err(Error::new(
ErrorKind::FeatureUnsupported,
"LZ4 decompression is not supported currently",
)),
CompressionCodec::Zstd => {
let decompressed = zstd::stream::decode_all(&bytes[..])?;
Ok(decompressed)
}
}
}

pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
match self {
CompressionCodec::None => Ok(bytes),
CompressionCodec::Lz4 => Err(Error::new(
ErrorKind::FeatureUnsupported,
"LZ4 compression is not supported currently",
)),
CompressionCodec::Zstd => {
let writer = Vec::<u8>::new();
let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
encoder.include_checksum(true)?;
encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
std::io::copy(&mut &bytes[..], &mut encoder)?;
let compressed = encoder.finish()?;
Ok(compressed)
}
}
}

pub(crate) fn is_none(&self) -> bool {
matches!(self, CompressionCodec::None)
}
}

#[cfg(test)]
mod tests {
use crate::compression::CompressionCodec;

#[tokio::test]
async fn test_compression_codec_none() {
let compression_codec = CompressionCodec::None;
let bytes_vec = [0_u8; 100].to_vec();

let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
assert_eq!(bytes_vec, compressed);

let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
assert_eq!(compressed, decompressed)
}

#[tokio::test]
async fn test_compression_codec_lz4() {
let compression_codec = CompressionCodec::Lz4;
let bytes_vec = [0_u8; 100].to_vec();

assert_eq!(
compression_codec
.compress(bytes_vec.clone())
.unwrap_err()
.to_string(),
"FeatureUnsupported => LZ4 compression is not supported currently",
);

assert_eq!(
compression_codec
.decompress(bytes_vec.clone())
.unwrap_err()
.to_string(),
"FeatureUnsupported => LZ4 decompression is not supported currently",
)
}

#[tokio::test]
async fn test_compression_codec_zstd() {
let compression_codec = CompressionCodec::Zstd;
let bytes_vec = [0_u8; 100].to_vec();

let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
assert!(compressed.len() < bytes_vec.len());

let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
assert_eq!(decompressed, bytes_vec)
}
}
38 changes: 38 additions & 0 deletions crates/puffin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Iceberg Puffin implementation.
#![deny(missing_docs)]

mod blob;
pub use blob::{Blob, APACHE_DATASKETCHES_THETA_V1};

mod compression;
pub use compression::CompressionCodec;

mod metadata;
pub use metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY};

mod reader;
pub use reader::PuffinReader;

#[cfg(test)]
mod test_utils;

mod writer;
pub use writer::PuffinWriter;
Loading

0 comments on commit 61ac9bf

Please sign in to comment.