From 5905473afe64907c08ff1bb7ceb7ad4e2e2cb5a2 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sat, 30 Nov 2024 17:03:44 -0500 Subject: [PATCH 01/12] Add Puffin FileMetadata --- crates/iceberg/src/puffin/compression.rs | 5 +- crates/iceberg/src/puffin/metadata.rs | 809 ++++++++++++++++++ crates/iceberg/src/puffin/mod.rs | 4 + crates/iceberg/src/puffin/test_utils.rs | 158 ++++ .../empty-puffin-uncompressed.bin | Bin 0 -> 32 bytes .../sample-metric-data-compressed-zstd.bin | Bin 0 -> 417 bytes .../sample-metric-data-uncompressed.bin | Bin 0 -> 355 bytes 7 files changed, 975 insertions(+), 1 deletion(-) create mode 100644 crates/iceberg/src/puffin/metadata.rs create mode 100644 crates/iceberg/src/puffin/test_utils.rs create mode 100644 crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin create mode 100644 crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin create mode 100644 crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin diff --git a/crates/iceberg/src/puffin/compression.rs b/crates/iceberg/src/puffin/compression.rs index 710698df8d..652e8974ee 100644 --- a/crates/iceberg/src/puffin/compression.rs +++ b/crates/iceberg/src/puffin/compression.rs @@ -15,9 +15,12 @@ // specific language governing permissions and limitations // under the License. +use serde::{Deserialize, Serialize}; + use crate::{Error, ErrorKind, Result}; -#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] /// Data compression formats pub enum CompressionCodec { #[default] diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs new file mode 100644 index 0000000000..8e10c881f9 --- /dev/null +++ b/crates/iceberg/src/puffin/metadata.rs @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub(crate) struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { + FooterPayloadCompressed, +} + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct ByteNumber(pub u8); + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct BitNumber(pub u8); + +static FLAGS_BY_BYTE_AND_BIT: Lazy> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert( + ( + Flag::FooterPayloadCompressed.byte_number(), + Flag::FooterPayloadCompressed.bit_number(), + ), + Flag::FooterPayloadCompressed, + ); + m +}); + +impl Flag { + pub(crate) fn byte_number(&self) -> ByteNumber { + match self { + Flag::FooterPayloadCompressed => ByteNumber(0), + } + } + + pub(crate) fn bit_number(&self) -> BitNumber { + match self { + Flag::FooterPayloadCompressed => BitNumber(0), + } + } + + fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option { + FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +/// Metadata about a puffin file. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +pub(crate) struct FileMetadata { + /// Metadata about blobs in file + pub(crate) blobs: Vec, + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + /// Arbitrary meta-information, like writer identification/version. + pub(crate) properties: HashMap, +} + +impl FileMetadata { + pub(crate) const MAGIC_LENGTH: u8 = 4; + pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; + + // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. + // + // Footer + // | + // ------------------------------------------------- + // | | + // Magic FooterPayload FooterPayloadLength Flags Magic + // | | + // ----------------------------- + // | + // FOOTER_STRUCT + + const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; + const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; + const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET + + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH; + pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4; + const FOOTER_STRUCT_MAGIC_OFFSET: u8 = + FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH; + pub(crate) const FOOTER_STRUCT_LENGTH: u8 = + FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + + fn check_magic(bytes: &[u8]) -> Result<()> { + if bytes != FileMetadata::MAGIC { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Bad magic value: {:?} should be {:?}", + bytes, + FileMetadata::MAGIC + ), + )) + } else { + Ok(()) + } + } + + async fn read_footer_payload_length( + file_read: &dyn FileRead, + input_file_length: u64, + ) -> Result { + let start = input_file_length - u64::from(FileMetadata::FOOTER_STRUCT_LENGTH); + let end = start + u64::from(FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH); + let footer_payload_length_bytes = file_read.read(start..end).await?; + let mut buf = [0; 4]; + buf.copy_from_slice(&footer_payload_length_bytes); + let footer_payload_length = u32::from_le_bytes(buf); + Ok(footer_payload_length) + } + + async fn read_footer_bytes( + file_read: &dyn FileRead, + input_file_length: u64, + footer_payload_length: u32, + ) -> Result { + let footer_length = u64::from(footer_payload_length) + + u64::from(FileMetadata::FOOTER_STRUCT_LENGTH) + + u64::from(FileMetadata::MAGIC_LENGTH); + let start = input_file_length - footer_length; + let end = input_file_length; + file_read.read(start..end).await + } + + fn err_out_of_bounds() -> Result { + Err(Error::new( + ErrorKind::DataInvalid, + "Index range is out of bounds.", + )) + } + + fn decode_flags(footer_bytes: &[u8]) -> Result> { + let mut flags = HashSet::new(); + for byte_number in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { + let byte_offset = footer_bytes.len() + - usize::from(FileMetadata::MAGIC_LENGTH) + - usize::from(FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH) + + usize::from(byte_number); + + let mut flag_byte = match footer_bytes.get(byte_offset) { + None => FileMetadata::err_out_of_bounds(), + Some(byte) => Ok(*byte), + }?; + let mut bit_number = 0; + while flag_byte != 0 { + if flag_byte & 0x1 != 0 { + match Flag::from(&(ByteNumber(byte_number), BitNumber(bit_number))) { + Some(flag) => flags.insert(flag), + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Unknown flag byte {} and bit {} combination", + byte_number, bit_number + ), + )) + } + }; + } + flag_byte >>= 1; + bit_number += 1; + } + } + Ok(flags) + } + + fn extract_footer_payload_as_str( + footer_bytes: &[u8], + footer_payload_length: u32, + ) -> Result { + let flags = FileMetadata::decode_flags(footer_bytes)?; + let footer_compression_codec = if flags.contains(&Flag::FooterPayloadCompressed) { + CompressionCodec::Lz4 + } else { + CompressionCodec::None + }; + + let start_offset = usize::from(FileMetadata::MAGIC_LENGTH); + let end_offset = + usize::from(FileMetadata::MAGIC_LENGTH) + usize::try_from(footer_payload_length)?; + let footer_payload_bytes = match footer_bytes.get(start_offset..end_offset) { + None => FileMetadata::err_out_of_bounds(), + Some(data) => Ok(data), + }?; + let decompressed_footer_payload_bytes = + footer_compression_codec.decompress(footer_payload_bytes.into())?; + + match String::from_utf8(decompressed_footer_payload_bytes) { + Err(src) => Err(Error::new( + ErrorKind::DataInvalid, + "Footer is not a valid UTF-8 string", + ) + .with_source(src)), + Ok(str) => Ok(str), + } + } + + fn from_json_str(string: &str) -> Result { + match serde_json::from_str::(string) { + Ok(file_metadata) => Ok(file_metadata), + Err(src) => Err( + Error::new(ErrorKind::DataInvalid, "Given string is not valid JSON") + .with_source(src), + ), + } + } + + #[rustfmt::skip] + /// Returns the file metadata about a Puffin file + pub(crate) async fn read(input_file: &InputFile) -> Result { + let file_read = input_file.reader().await?; + + let first_four_bytes = file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?; + FileMetadata::check_magic(&first_four_bytes)?; + + let input_file_length = input_file.metadata().await?.size; + let footer_payload_length = FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?; + let footer_bytes = FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length).await?; + + let magic_length = usize::from(FileMetadata::MAGIC_LENGTH); + FileMetadata::check_magic(&footer_bytes[..magic_length])?; // first four bytes of footer + FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?; // last four bytes of footer + + let footer_payload_str = FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?; + FileMetadata::from_json_str(&footer_payload_str) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use bytes::Bytes; + use tempfile::TempDir; + + use crate::io::{FileIOBuilder, InputFile}; + use crate::puffin::metadata::{BlobMetadata, CompressionCodec, FileMetadata}; + use crate::puffin::test_utils::{ + empty_footer_payload, empty_footer_payload_bytes, empty_footer_payload_bytes_length_bytes, + java_empty_uncompressed_input_file, java_uncompressed_metric_input_file, + java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata, + zstd_compressed_metric_file_metadata, + }; + + const INVALID_MAGIC_VALUE: [u8; 4] = [80, 70, 65, 0]; + + async fn input_file_with_bytes(temp_dir: &TempDir, slice: &[u8]) -> InputFile { + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + let path_buf = temp_dir.path().join("abc.puffin"); + let temp_path = path_buf.to_str().unwrap(); + let output_file = file_io.new_output(temp_path).unwrap(); + + output_file + .write(Bytes::copy_from_slice(slice)) + .await + .unwrap(); + + output_file.to_input_file() + } + + async fn input_file_with_payload(temp_dir: &TempDir, payload_str: &str) -> InputFile { + let payload_bytes = payload_str.as_bytes(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(payload_bytes); + bytes.extend(u32::to_le_bytes(payload_bytes.len() as u32)); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC); + + input_file_with_bytes(temp_dir, &bytes).await + } + + #[tokio::test] + async fn test_file_starting_with_invalid_magic_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(INVALID_MAGIC_VALUE.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]", + ) + } + + #[tokio::test] + async fn test_file_with_invalid_magic_at_start_of_footer_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(INVALID_MAGIC_VALUE.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]", + ) + } + + #[tokio::test] + async fn test_file_ending_with_invalid_magic_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(INVALID_MAGIC_VALUE); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]", + ) + } + + #[tokio::test] + async fn test_encoded_payload_length_larger_than_actual_payload_length_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(u32::to_le_bytes( + empty_footer_payload_bytes().len() as u32 + 1, + )); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC.to_vec()); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Bad magic value: [49, 80, 70, 65] should be [80, 70, 65, 49]", + ) + } + + #[tokio::test] + async fn test_encoded_payload_length_smaller_than_actual_payload_length_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(u32::to_le_bytes( + empty_footer_payload_bytes().len() as u32 - 1, + )); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC.to_vec()); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Bad magic value: [70, 65, 49, 123] should be [80, 70, 65, 49]", + ) + } + + #[tokio::test] + async fn test_lz4_compressed_footer_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0b00000001, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC.to_vec()); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "FeatureUnsupported => LZ4 decompression is not supported currently", + ) + } + + #[tokio::test] + async fn test_unknown_byte_bit_combination_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0b00000010, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC.to_vec()); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Unknown flag byte 0 and bit 1 combination", + ) + } + + #[tokio::test] + async fn test_non_utf8_string_payload_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let payload_bytes: [u8; 4] = [0, 159, 146, 150]; + let payload_bytes_length_bytes: [u8; 4] = u32::to_le_bytes(payload_bytes.len() as u32); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(payload_bytes); + bytes.extend(payload_bytes_length_bytes); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC.to_vec()); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap_err().to_string(), + "DataInvalid => Footer is not a valid UTF-8 string, source: invalid utf-8 sequence of 1 bytes from index 1", + ) + } + + #[tokio::test] + async fn test_minimal_valid_file_returns_file_metadata() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + FileMetadata { + blobs: vec![], + properties: HashMap::new(), + } + ) + } + + #[tokio::test] + async fn test_returns_file_metadata_property() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "blobs" : [ ], + "properties" : { + "a property" : "a property value" + } + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + FileMetadata { + blobs: vec![], + properties: { + let mut map = HashMap::new(); + map.insert("a property".to_string(), "a property value".to_string()); + map + }, + } + ) + } + + #[tokio::test] + async fn test_returns_file_metadata_properties() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "blobs" : [ ], + "properties" : { + "a property" : "a property value", + "another one": "also with value" + } + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + FileMetadata { + blobs: vec![], + properties: { + let mut map = HashMap::new(); + map.insert("a property".to_string(), "a property value".to_string()); + map.insert("another one".to_string(), "also with value".to_string()); + map + }, + } + ) + } + + #[tokio::test] + async fn test_returns_error_if_blobs_field_is_missing() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "properties" : {} + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap_err().to_string(), + format!( + "DataInvalid => Given string is not valid JSON, source: missing field `blobs` at line 3 column 13" + ), + ) + } + + #[tokio::test] + async fn test_returns_error_if_blobs_field_is_bad() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "blobs" : {} + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap_err().to_string(), + format!("DataInvalid => Given string is not valid JSON, source: invalid type: map, expected a sequence at line 2 column 26"), + ) + } + + #[tokio::test] + async fn test_returns_blobs_metadatas() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "blobs" : [ + { + "type" : "type-a", + "fields" : [ 1 ], + "snapshot-id" : 14, + "sequence-number" : 3, + "offset" : 4, + "length" : 16 + }, + { + "type" : "type-bbb", + "fields" : [ 2, 3, 4 ], + "snapshot-id" : 77, + "sequence-number" : 4, + "offset" : 21474836470000, + "length" : 79834 + } + ] + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + FileMetadata { + blobs: vec![ + BlobMetadata { + r#type: "type-a".to_string(), + input_fields: vec![1], + snapshot_id: 14, + sequence_number: 3, + offset: 4, + length: 16, + compression_codec: CompressionCodec::None, + properties: HashMap::new(), + }, + BlobMetadata { + r#type: "type-bbb".to_string(), + input_fields: vec![2, 3, 4], + snapshot_id: 77, + sequence_number: 4, + offset: 21474836470000, + length: 79834, + compression_codec: CompressionCodec::None, + properties: HashMap::new(), + }, + ], + properties: HashMap::new(), + } + ) + } + + #[tokio::test] + async fn test_returns_properties_in_blob_metadata() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "blobs" : [ + { + "type" : "type-a", + "fields" : [ 1 ], + "snapshot-id" : 14, + "sequence-number" : 3, + "offset" : 4, + "length" : 16, + "properties" : { + "some key" : "some value" + } + } + ] + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + FileMetadata { + blobs: vec![BlobMetadata { + r#type: "type-a".to_string(), + input_fields: vec![1], + snapshot_id: 14, + sequence_number: 3, + offset: 4, + length: 16, + compression_codec: CompressionCodec::None, + properties: { + let mut map = HashMap::new(); + map.insert("some key".to_string(), "some value".to_string()); + map + }, + }], + properties: HashMap::new(), + } + ) + } + + #[tokio::test] + async fn test_returns_error_if_blobs_fields_value_is_outside_i32_range() { + let temp_dir = TempDir::new().unwrap(); + + let out_of_i32_range_number: i64 = i32::MAX as i64 + 1; + + let input_file = input_file_with_payload( + &temp_dir, + &format!( + r#"{{ + "blobs" : [ + {{ + "type" : "type-a", + "fields" : [ {} ], + "snapshot-id" : 14, + "sequence-number" : 3, + "offset" : 4, + "length" : 16 + }} + ] + }}"#, + out_of_i32_range_number + ), + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap_err().to_string(), + format!( + "DataInvalid => Given string is not valid JSON, source: invalid value: integer `{}`, expected i32 at line 5 column 51", + out_of_i32_range_number + ), + ) + } + + #[tokio::test] + async fn test_returns_errors_if_footer_payload_is_not_encoded_in_json_format() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload(&temp_dir, r#""blobs" = []"#).await; + assert_eq!( + FileMetadata::read(&input_file).await.unwrap_err().to_string(), + "DataInvalid => Given string is not valid JSON, source: invalid type: string \"blobs\", expected struct FileMetadata at line 1 column 7", + ) + } + + #[tokio::test] + async fn test_read_file_metadata_of_uncompressed_empty_file() { + let input_file = java_empty_uncompressed_input_file(); + let file_metadata = FileMetadata::read(&input_file).await.unwrap(); + assert_eq!(file_metadata, empty_footer_payload()) + } + + #[tokio::test] + async fn test_read_file_metadata_of_uncompressed_metric_data() { + let input_file = java_uncompressed_metric_input_file(); + let file_metadata = FileMetadata::read(&input_file).await.unwrap(); + assert_eq!(file_metadata, uncompressed_metric_file_metadata()) + } + + #[tokio::test] + async fn test_read_file_metadata_of_zstd_compressed_metric_data() { + let input_file = java_zstd_compressed_metric_input_file(); + let file_metadata = FileMetadata::read(&input_file).await.unwrap(); + assert_eq!(file_metadata, zstd_compressed_metric_file_metadata()) + } +} diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs index c13ebe420d..91bdf125fc 100644 --- a/crates/iceberg/src/puffin/mod.rs +++ b/crates/iceberg/src/puffin/mod.rs @@ -22,3 +22,7 @@ #![allow(dead_code)] mod compression; +mod metadata; + +#[cfg(test)] +mod test_utils; diff --git a/crates/iceberg/src/puffin/test_utils.rs b/crates/iceberg/src/puffin/test_utils.rs new file mode 100644 index 0000000000..b7bc172551 --- /dev/null +++ b/crates/iceberg/src/puffin/test_utils.rs @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use crate::io::{FileIOBuilder, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::puffin::metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY}; + +const JAVA_TESTDATA: &str = "testdata/puffin/java-generated"; +const EMPTY_UNCOMPRESSED: &str = "empty-puffin-uncompressed.bin"; +const METRIC_UNCOMPRESSED: &str = "sample-metric-data-uncompressed.bin"; +const METRIC_ZSTD_COMPRESSED: &str = "sample-metric-data-compressed-zstd.bin"; + +fn input_file_for_test_data(path: &str) -> InputFile { + FileIOBuilder::new_fs_io() + .build() + .unwrap() + .new_input(env!("CARGO_MANIFEST_DIR").to_owned() + "/" + path) + .unwrap() +} + +pub(crate) fn java_empty_uncompressed_input_file() -> InputFile { + input_file_for_test_data(&[JAVA_TESTDATA, EMPTY_UNCOMPRESSED].join("/")) +} + +pub(crate) fn java_uncompressed_metric_input_file() -> InputFile { + input_file_for_test_data(&[JAVA_TESTDATA, METRIC_UNCOMPRESSED].join("/")) +} + +pub(crate) fn java_zstd_compressed_metric_input_file() -> InputFile { + input_file_for_test_data(&[JAVA_TESTDATA, METRIC_ZSTD_COMPRESSED].join("/")) +} + +pub(crate) fn empty_footer_payload() -> FileMetadata { + FileMetadata { + blobs: Vec::new(), + properties: HashMap::new(), + } +} + +pub(crate) fn empty_footer_payload_bytes() -> Vec { + return serde_json::to_string::(&empty_footer_payload()) + .unwrap() + .as_bytes() + .to_vec(); +} + +pub(crate) fn empty_footer_payload_bytes_length_bytes() -> [u8; 4] { + u32::to_le_bytes(empty_footer_payload_bytes().len() as u32) +} + +pub(crate) const METRIC_BLOB_0_TYPE: &str = "some-blob"; +pub(crate) const METRIC_BLOB_0_INPUT_FIELDS: [i32; 1] = [1]; +pub(crate) const METRIC_BLOB_0_SNAPSHOT_ID: i64 = 2; +pub(crate) const METRIC_BLOB_0_SEQUENCE_NUMBER: i64 = 1; + +pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata { + BlobMetadata { + r#type: METRIC_BLOB_0_TYPE.to_string(), + input_fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, + offset: 4, + length: 22, + compression_codec: CompressionCodec::Zstd, + properties: HashMap::new(), + } +} + +pub(crate) fn uncompressed_metric_blob_0_metadata() -> BlobMetadata { + BlobMetadata { + r#type: METRIC_BLOB_0_TYPE.to_string(), + input_fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, + offset: 4, + length: 9, + compression_codec: CompressionCodec::None, + properties: HashMap::new(), + } +} + +pub(crate) const METRIC_BLOB_1_TYPE: &str = "some-other-blob"; +pub(crate) const METRIC_BLOB_1_INPUT_FIELDS: [i32; 1] = [2]; +pub(crate) const METRIC_BLOB_1_SNAPSHOT_ID: i64 = 2; +pub(crate) const METRIC_BLOB_1_SEQUENCE_NUMBER: i64 = 1; + +pub(crate) fn uncompressed_metric_blob_1_metadata() -> BlobMetadata { + BlobMetadata { + r#type: METRIC_BLOB_1_TYPE.to_string(), + input_fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, + offset: 13, + length: 83, + compression_codec: CompressionCodec::None, + properties: HashMap::new(), + } +} + +pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata { + BlobMetadata { + r#type: METRIC_BLOB_1_TYPE.to_string(), + input_fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, + offset: 26, + length: 77, + compression_codec: CompressionCodec::Zstd, + properties: HashMap::new(), + } +} + +pub(crate) const CREATED_BY_PROPERTY_VALUE: &str = "Test 1234"; + +pub(crate) fn file_properties() -> HashMap { + let mut properties = HashMap::new(); + properties.insert( + CREATED_BY_PROPERTY.to_string(), + CREATED_BY_PROPERTY_VALUE.to_string(), + ); + properties +} + +pub(crate) fn uncompressed_metric_file_metadata() -> FileMetadata { + FileMetadata { + blobs: vec![ + uncompressed_metric_blob_0_metadata(), + uncompressed_metric_blob_1_metadata(), + ], + properties: file_properties(), + } +} + +pub(crate) fn zstd_compressed_metric_file_metadata() -> FileMetadata { + FileMetadata { + blobs: vec![ + zstd_compressed_metric_blob_0_metadata(), + zstd_compressed_metric_blob_1_metadata(), + ], + properties: file_properties(), + } +} diff --git a/crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin b/crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin new file mode 100644 index 0000000000000000000000000000000000000000..142b45bd4ebe0b865064ef874325ff1c94399bb1 GIT binary patch literal 32 fcmWG=b2JP9;%cR&ocyF>C9CMzS{?=n0Eq(tjVlLb literal 0 HcmV?d00001 diff --git a/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin new file mode 100644 index 0000000000000000000000000000000000000000..ac8b69c76e5770823137232c7b84d8a9e4903977 GIT binary patch literal 417 zcmb7=u}Z{15Qg^@3u|9s87)K-B=PVdjo1j+9cU*fY<4HfLb4lo=Y$xpw6(IgvA43g zSMU||A$$SB%{3x|;B@oP&;QNL?Cdnze>wcz+nzEea;dN=E4_2IdRwdKTN~_Q)7u7l zUfY~Ao@*mq$CV(#KOUc+IaPwV_S{(FH|V`riUTILw4B% zbuB^$LvTE(5J91_R>L%zN8pWUePt=u3bHixc)dU)F*b`PM+aFFfh`J;1lc%(8cj)6 za0aWiP3zEmZA~n#LK!%>o)j#jpIl27x?lA4pY-(j8$X%+g%824vn@K(81C#rIuwK| X&bzSU1$yF1dNhXR@?gZ)HGJ(0r0$BX literal 0 HcmV?d00001 diff --git a/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin new file mode 100644 index 0000000000000000000000000000000000000000..ab8da13822c55c573b8ba925eb9ebbc33cd87f28 GIT binary patch literal 355 zcmb7PY+KhqS$DC(-` zheOd{v1(>i#}#>*2^CgSQ@bc|@HE*vl`jHw&~tW?8*fpyrKZ<2g`S#lJ{d}=q`)`~ znHbex;6!0$hw6S4KeZz}O1}v0KMAt?M%;B#;DY2*W@QQsR&14(16i?5T8D!h- Maoaw2Us3IU0?}=DoB#j- literal 0 HcmV?d00001 From 323ef9dba496e1b9f92551a4cc73b7bbd691477e Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sat, 14 Dec 2024 10:17:24 -0500 Subject: [PATCH 02/12] Fix comment locations --- crates/iceberg/src/puffin/compression.rs | 2 +- crates/iceberg/src/puffin/metadata.rs | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/puffin/compression.rs b/crates/iceberg/src/puffin/compression.rs index 652e8974ee..a9a56ef12c 100644 --- a/crates/iceberg/src/puffin/compression.rs +++ b/crates/iceberg/src/puffin/compression.rs @@ -19,9 +19,9 @@ use serde::{Deserialize, Serialize}; use crate::{Error, ErrorKind, Result}; +/// Data compression formats #[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] -/// Data compression formats pub enum CompressionCodec { #[default] /// No compression diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 8e10c881f9..8de28d92b4 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -29,10 +29,10 @@ use crate::{Error, ErrorKind, Result}; /// Example: "Trino version 381" pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] -#[serde(rename_all = "kebab-case")] /// Metadata about a blob. /// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] pub(crate) struct BlobMetadata { /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types pub(crate) r#type: String, @@ -98,15 +98,15 @@ impl Flag { } } -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] /// Metadata about a puffin file. /// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] pub(crate) struct FileMetadata { /// Metadata about blobs in file pub(crate) blobs: Vec, + /// Arbitrary meta-information, like writer identification/version. #[serde(skip_serializing_if = "HashMap::is_empty")] #[serde(default)] - /// Arbitrary meta-information, like writer identification/version. pub(crate) properties: HashMap, } @@ -260,8 +260,8 @@ impl FileMetadata { } } - #[rustfmt::skip] /// Returns the file metadata about a Puffin file + #[rustfmt::skip] pub(crate) async fn read(input_file: &InputFile) -> Result { let file_read = input_file.reader().await?; @@ -273,8 +273,10 @@ impl FileMetadata { let footer_bytes = FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length).await?; let magic_length = usize::from(FileMetadata::MAGIC_LENGTH); - FileMetadata::check_magic(&footer_bytes[..magic_length])?; // first four bytes of footer - FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?; // last four bytes of footer + // check first four bytes of footer + FileMetadata::check_magic(&footer_bytes[..magic_length])?; + // check last four bytes of footer + FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?; let footer_payload_str = FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?; FileMetadata::from_json_str(&footer_payload_str) From 3b286cf98191e313ea13299bdc3c6f733dd66e59 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sat, 14 Dec 2024 10:21:12 -0500 Subject: [PATCH 03/12] Put Ok(()) branch first --- crates/iceberg/src/puffin/metadata.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 8de28d92b4..d11fcf2552 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -137,7 +137,9 @@ impl FileMetadata { FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; fn check_magic(bytes: &[u8]) -> Result<()> { - if bytes != FileMetadata::MAGIC { + if bytes == FileMetadata::MAGIC { + Ok(()) + } else { Err(Error::new( ErrorKind::DataInvalid, format!( @@ -146,8 +148,6 @@ impl FileMetadata { FileMetadata::MAGIC ), )) - } else { - Ok(()) } } From e124d50c461f681c6977864d4c7d6ba026e613f8 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sat, 14 Dec 2024 10:22:54 -0500 Subject: [PATCH 04/12] Use map_err --- crates/iceberg/src/puffin/metadata.rs | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index d11fcf2552..f68324c2f6 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -240,24 +240,16 @@ impl FileMetadata { let decompressed_footer_payload_bytes = footer_compression_codec.decompress(footer_payload_bytes.into())?; - match String::from_utf8(decompressed_footer_payload_bytes) { - Err(src) => Err(Error::new( - ErrorKind::DataInvalid, - "Footer is not a valid UTF-8 string", - ) - .with_source(src)), - Ok(str) => Ok(str), - } + String::from_utf8(decompressed_footer_payload_bytes).map_err(|src| { + Error::new(ErrorKind::DataInvalid, "Footer is not a valid UTF-8 string") + .with_source(src) + }) } fn from_json_str(string: &str) -> Result { - match serde_json::from_str::(string) { - Ok(file_metadata) => Ok(file_metadata), - Err(src) => Err( - Error::new(ErrorKind::DataInvalid, "Given string is not valid JSON") - .with_source(src), - ), - } + serde_json::from_str::(string).map_err(|src| { + Error::new(ErrorKind::DataInvalid, "Given string is not valid JSON").with_source(src) + }) } /// Returns the file metadata about a Puffin file From f0107e21e79d8c695dc1b85f1da55b25a8109bbb Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sat, 14 Dec 2024 10:24:58 -0500 Subject: [PATCH 05/12] Inline err_out_of_bounds function --- crates/iceberg/src/puffin/metadata.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index f68324c2f6..745c95691b 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -177,13 +177,6 @@ impl FileMetadata { file_read.read(start..end).await } - fn err_out_of_bounds() -> Result { - Err(Error::new( - ErrorKind::DataInvalid, - "Index range is out of bounds.", - )) - } - fn decode_flags(footer_bytes: &[u8]) -> Result> { let mut flags = HashSet::new(); for byte_number in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { @@ -193,7 +186,10 @@ impl FileMetadata { + usize::from(byte_number); let mut flag_byte = match footer_bytes.get(byte_offset) { - None => FileMetadata::err_out_of_bounds(), + None => Err(Error::new( + ErrorKind::DataInvalid, + "Index range is out of bounds.", + )), Some(byte) => Ok(*byte), }?; let mut bit_number = 0; @@ -234,7 +230,10 @@ impl FileMetadata { let end_offset = usize::from(FileMetadata::MAGIC_LENGTH) + usize::try_from(footer_payload_length)?; let footer_payload_bytes = match footer_bytes.get(start_offset..end_offset) { - None => FileMetadata::err_out_of_bounds(), + None => Err(Error::new( + ErrorKind::DataInvalid, + "Index range is out of bounds.", + )), Some(data) => Ok(data), }?; let decompressed_footer_payload_bytes = From 1160b855ed3201b0dcdd7a74417dd704e9b9e321 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sat, 14 Dec 2024 10:27:10 -0500 Subject: [PATCH 06/12] Use ok_or_else --- crates/iceberg/src/puffin/metadata.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 745c95691b..e9e97b5bb1 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -185,13 +185,9 @@ impl FileMetadata { - usize::from(FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH) + usize::from(byte_number); - let mut flag_byte = match footer_bytes.get(byte_offset) { - None => Err(Error::new( - ErrorKind::DataInvalid, - "Index range is out of bounds.", - )), - Some(byte) => Ok(*byte), - }?; + let mut flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "Index range is out of bounds.") + })?; let mut bit_number = 0; while flag_byte != 0 { if flag_byte & 0x1 != 0 { @@ -229,13 +225,9 @@ impl FileMetadata { let start_offset = usize::from(FileMetadata::MAGIC_LENGTH); let end_offset = usize::from(FileMetadata::MAGIC_LENGTH) + usize::try_from(footer_payload_length)?; - let footer_payload_bytes = match footer_bytes.get(start_offset..end_offset) { - None => Err(Error::new( - ErrorKind::DataInvalid, - "Index range is out of bounds.", - )), - Some(data) => Ok(data), - }?; + let footer_payload_bytes = footer_bytes + .get(start_offset..end_offset) + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Index range is out of bounds."))?; let decompressed_footer_payload_bytes = footer_compression_codec.decompress(footer_payload_bytes.into())?; From 667a9e33e089e0996a54915a456dc03cce07ca9f Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sat, 14 Dec 2024 10:30:01 -0500 Subject: [PATCH 07/12] Remove #[rustfmt::skip] --- crates/iceberg/src/puffin/metadata.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index e9e97b5bb1..1441043cd9 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -244,7 +244,6 @@ impl FileMetadata { } /// Returns the file metadata about a Puffin file - #[rustfmt::skip] pub(crate) async fn read(input_file: &InputFile) -> Result { let file_read = input_file.reader().await?; @@ -252,8 +251,11 @@ impl FileMetadata { FileMetadata::check_magic(&first_four_bytes)?; let input_file_length = input_file.metadata().await?.size; - let footer_payload_length = FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?; - let footer_bytes = FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length).await?; + let footer_payload_length = + FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?; + let footer_bytes = + FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length) + .await?; let magic_length = usize::from(FileMetadata::MAGIC_LENGTH); // check first four bytes of footer @@ -261,7 +263,8 @@ impl FileMetadata { // check last four bytes of footer FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?; - let footer_payload_str = FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?; + let footer_payload_str = + FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?; FileMetadata::from_json_str(&footer_payload_str) } } From fae13ca162cf85832b31dbc50d2b945948c81912 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sat, 4 Jan 2025 13:38:39 -0500 Subject: [PATCH 08/12] Rename input_fields to fields --- crates/iceberg/src/puffin/metadata.rs | 9 ++++----- crates/iceberg/src/puffin/test_utils.rs | 8 ++++---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 1441043cd9..8bd21d3c95 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -37,8 +37,7 @@ pub(crate) struct BlobMetadata { /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types pub(crate) r#type: String, /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. - #[serde(rename = "fields")] - pub(crate) input_fields: Vec, + pub(crate) fields: Vec, /// ID of the Iceberg table's snapshot the blob was computed from pub(crate) snapshot_id: i64, /// Sequence number of the Iceberg table's snapshot the blob was computed from @@ -658,7 +657,7 @@ mod tests { blobs: vec![ BlobMetadata { r#type: "type-a".to_string(), - input_fields: vec![1], + fields: vec![1], snapshot_id: 14, sequence_number: 3, offset: 4, @@ -668,7 +667,7 @@ mod tests { }, BlobMetadata { r#type: "type-bbb".to_string(), - input_fields: vec![2, 3, 4], + fields: vec![2, 3, 4], snapshot_id: 77, sequence_number: 4, offset: 21474836470000, @@ -711,7 +710,7 @@ mod tests { FileMetadata { blobs: vec![BlobMetadata { r#type: "type-a".to_string(), - input_fields: vec![1], + fields: vec![1], snapshot_id: 14, sequence_number: 3, offset: 4, diff --git a/crates/iceberg/src/puffin/test_utils.rs b/crates/iceberg/src/puffin/test_utils.rs index b7bc172551..e49e51d50a 100644 --- a/crates/iceberg/src/puffin/test_utils.rs +++ b/crates/iceberg/src/puffin/test_utils.rs @@ -72,7 +72,7 @@ pub(crate) const METRIC_BLOB_0_SEQUENCE_NUMBER: i64 = 1; pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata { BlobMetadata { r#type: METRIC_BLOB_0_TYPE.to_string(), - input_fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(), + fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(), snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID, sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, offset: 4, @@ -85,7 +85,7 @@ pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata { pub(crate) fn uncompressed_metric_blob_0_metadata() -> BlobMetadata { BlobMetadata { r#type: METRIC_BLOB_0_TYPE.to_string(), - input_fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(), + fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(), snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID, sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, offset: 4, @@ -103,7 +103,7 @@ pub(crate) const METRIC_BLOB_1_SEQUENCE_NUMBER: i64 = 1; pub(crate) fn uncompressed_metric_blob_1_metadata() -> BlobMetadata { BlobMetadata { r#type: METRIC_BLOB_1_TYPE.to_string(), - input_fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(), + fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(), snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID, sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, offset: 13, @@ -116,7 +116,7 @@ pub(crate) fn uncompressed_metric_blob_1_metadata() -> BlobMetadata { pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata { BlobMetadata { r#type: METRIC_BLOB_1_TYPE.to_string(), - input_fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(), + fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(), snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID, sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, offset: 26, From 23237739c290501a2b0a04e998ed299d371b1584 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sun, 5 Jan 2025 10:55:18 -0500 Subject: [PATCH 09/12] Simplify flag parsing --- crates/iceberg/src/puffin/metadata.rs | 83 +++++++++++---------------- 1 file changed, 32 insertions(+), 51 deletions(-) diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 8bd21d3c95..367996e8c2 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -18,7 +18,6 @@ use std::collections::{HashMap, HashSet}; use bytes::Bytes; -use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use crate::io::{FileRead, InputFile}; @@ -56,44 +55,36 @@ pub(crate) struct BlobMetadata { pub(crate) properties: HashMap, } -#[derive(Clone, PartialEq, Eq, Hash, Debug)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub(crate) enum Flag { - FooterPayloadCompressed, + FooterPayloadCompressed = 0, } -#[derive(PartialEq, Eq, Hash)] -pub(crate) struct ByteNumber(pub u8); - -#[derive(PartialEq, Eq, Hash)] -pub(crate) struct BitNumber(pub u8); - -static FLAGS_BY_BYTE_AND_BIT: Lazy> = Lazy::new(|| { - let mut m = HashMap::new(); - m.insert( - ( - Flag::FooterPayloadCompressed.byte_number(), - Flag::FooterPayloadCompressed.bit_number(), - ), - Flag::FooterPayloadCompressed, - ); - m -}); - impl Flag { - pub(crate) fn byte_number(&self) -> ByteNumber { - match self { - Flag::FooterPayloadCompressed => ByteNumber(0), - } + pub(crate) fn byte_idx(self) -> u8 { + (self as u8) / 8 } - pub(crate) fn bit_number(&self) -> BitNumber { - match self { - Flag::FooterPayloadCompressed => BitNumber(0), - } + pub(crate) fn bit_idx(self) -> u8 { + (self as u8) % 8 } - fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option { - FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + fn matches(self, byte_idx: &u8, bit_idx: &u8) -> bool { + &self.byte_idx() == byte_idx && &self.bit_idx() == bit_idx + } + + fn from(byte_idx: &u8, bit_idx: &u8) -> Result { + if Flag::FooterPayloadCompressed.matches(byte_idx, bit_idx) { + Ok(Flag::FooterPayloadCompressed) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Unknown flag byte {} and bit {} combination", + byte_idx, bit_idx + ), + )) + } } } @@ -178,35 +169,25 @@ impl FileMetadata { fn decode_flags(footer_bytes: &[u8]) -> Result> { let mut flags = HashSet::new(); - for byte_number in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { + + for byte_idx in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { let byte_offset = footer_bytes.len() - usize::from(FileMetadata::MAGIC_LENGTH) - usize::from(FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH) - + usize::from(byte_number); + + usize::from(byte_idx); - let mut flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| { + let flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| { Error::new(ErrorKind::DataInvalid, "Index range is out of bounds.") })?; - let mut bit_number = 0; - while flag_byte != 0 { - if flag_byte & 0x1 != 0 { - match Flag::from(&(ByteNumber(byte_number), BitNumber(bit_number))) { - Some(flag) => flags.insert(flag), - None => { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Unknown flag byte {} and bit {} combination", - byte_number, bit_number - ), - )) - } - }; + + for bit_idx in 0..8 { + if ((flag_byte >> bit_idx) & 1) != 0 { + let flag = Flag::from(&byte_idx, &bit_idx)?; + flags.insert(flag); } - flag_byte >>= 1; - bit_number += 1; } } + Ok(flags) } From 342071ecf23a9ac13601df88faaf92ada3c7c866 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Tue, 7 Jan 2025 20:58:49 -0500 Subject: [PATCH 10/12] Remove unnecessary reference --- crates/iceberg/src/puffin/metadata.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 367996e8c2..7f8e3a6620 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -69,11 +69,11 @@ impl Flag { (self as u8) % 8 } - fn matches(self, byte_idx: &u8, bit_idx: &u8) -> bool { - &self.byte_idx() == byte_idx && &self.bit_idx() == bit_idx + fn matches(self, byte_idx: u8, bit_idx: u8) -> bool { + self.byte_idx() == byte_idx && self.bit_idx() == bit_idx } - fn from(byte_idx: &u8, bit_idx: &u8) -> Result { + fn from(byte_idx: u8, bit_idx: u8) -> Result { if Flag::FooterPayloadCompressed.matches(byte_idx, bit_idx) { Ok(Flag::FooterPayloadCompressed) } else { @@ -182,7 +182,7 @@ impl FileMetadata { for bit_idx in 0..8 { if ((flag_byte >> bit_idx) & 1) != 0 { - let flag = Flag::from(&byte_idx, &bit_idx)?; + let flag = Flag::from(byte_idx, bit_idx)?; flags.insert(flag); } } From 398a7702b5d64143b603cf3c332ddabd16e5fbee Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Tue, 7 Jan 2025 22:20:20 -0500 Subject: [PATCH 11/12] Make BlobMetadata.length a u64 (instead of usize) --- crates/iceberg/src/puffin/metadata.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 7f8e3a6620..ea808e1244 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -44,7 +44,7 @@ pub(crate) struct BlobMetadata { /// The offset in the file where the blob contents start pub(crate) offset: u64, /// The length of the blob stored in the file (after compression, if compressed) - pub(crate) length: usize, + pub(crate) length: u64, /// The compression codec used to compress the data #[serde(skip_serializing_if = "CompressionCodec::is_none")] #[serde(default)] From 25f587397df797a315d2f599a3d191a9f8260a66 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sat, 11 Jan 2025 14:37:56 -0500 Subject: [PATCH 12/12] Replace from with as --- crates/iceberg/src/puffin/metadata.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index ea808e1244..9d00032253 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -145,8 +145,8 @@ impl FileMetadata { file_read: &dyn FileRead, input_file_length: u64, ) -> Result { - let start = input_file_length - u64::from(FileMetadata::FOOTER_STRUCT_LENGTH); - let end = start + u64::from(FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH); + let start = input_file_length - FileMetadata::FOOTER_STRUCT_LENGTH as u64; + let end = start + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as u64; let footer_payload_length_bytes = file_read.read(start..end).await?; let mut buf = [0; 4]; buf.copy_from_slice(&footer_payload_length_bytes); @@ -159,9 +159,9 @@ impl FileMetadata { input_file_length: u64, footer_payload_length: u32, ) -> Result { - let footer_length = u64::from(footer_payload_length) - + u64::from(FileMetadata::FOOTER_STRUCT_LENGTH) - + u64::from(FileMetadata::MAGIC_LENGTH); + let footer_length = footer_payload_length as u64 + + FileMetadata::FOOTER_STRUCT_LENGTH as u64 + + FileMetadata::MAGIC_LENGTH as u64; let start = input_file_length - footer_length; let end = input_file_length; file_read.read(start..end).await @@ -172,9 +172,9 @@ impl FileMetadata { for byte_idx in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { let byte_offset = footer_bytes.len() - - usize::from(FileMetadata::MAGIC_LENGTH) - - usize::from(FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH) - + usize::from(byte_idx); + - FileMetadata::MAGIC_LENGTH as usize + - FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize + + byte_idx as usize; let flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| { Error::new(ErrorKind::DataInvalid, "Index range is out of bounds.") @@ -202,9 +202,9 @@ impl FileMetadata { CompressionCodec::None }; - let start_offset = usize::from(FileMetadata::MAGIC_LENGTH); + let start_offset = FileMetadata::MAGIC_LENGTH as usize; let end_offset = - usize::from(FileMetadata::MAGIC_LENGTH) + usize::try_from(footer_payload_length)?; + FileMetadata::MAGIC_LENGTH as usize + usize::try_from(footer_payload_length)?; let footer_payload_bytes = footer_bytes .get(start_offset..end_offset) .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Index range is out of bounds."))?; @@ -237,7 +237,7 @@ impl FileMetadata { FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length) .await?; - let magic_length = usize::from(FileMetadata::MAGIC_LENGTH); + let magic_length = FileMetadata::MAGIC_LENGTH as usize; // check first four bytes of footer FileMetadata::check_magic(&footer_bytes[..magic_length])?; // check last four bytes of footer