From a1ec0fa98113fc02b2479d81759fccd9dab10378 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 19 Aug 2024 10:48:43 +0100 Subject: [PATCH] Object Cache: caches parsed Manifests and ManifestLists for performance (#512) * feat: adds ObjectCache, to cache Manifests and ManifestLists * refactor: change obj cache method names and use more readable default usize value * chore: improve error message Co-authored-by: Renjie Liu * fix: change object cache retrieval method visibility Co-authored-by: Renjie Liu * feat: improved error message in object cache get_manifest * test(object-cache): add unit tests for object cache manifest and manifest list retrieval * fix: ensure that object cache insertions are weighted by size * test: fix test typo * fix: ensure object cache weight is that of the wrapped item, not the Arc --------- Co-authored-by: Renjie Liu --- crates/catalog/glue/src/catalog.rs | 12 +- crates/catalog/hms/src/catalog.rs | 12 +- crates/catalog/memory/src/catalog.rs | 13 +- crates/catalog/rest/src/catalog.rs | 16 +- crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/io/mod.rs | 2 + crates/iceberg/src/io/object_cache.rs | 402 ++++++++++++++++++++++ crates/iceberg/src/scan.rs | 49 +-- crates/iceberg/src/spec/manifest.rs | 2 +- crates/iceberg/src/spec/table_metadata.rs | 2 +- crates/iceberg/src/table.rs | 154 ++++++++- crates/iceberg/src/transaction.rs | 2 + 12 files changed, 599 insertions(+), 68 deletions(-) create mode 100644 crates/iceberg/src/io/object_cache.rs diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 16acaa719..18e30f3d0 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -381,14 +381,12 @@ impl Catalog for GlueCatalog { builder.send().await.map_err(from_aws_sdk_error)?; - let table = Table::builder() + Table::builder() .file_io(self.file_io()) .metadata_location(metadata_location) .metadata(metadata) .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) - .build(); - - Ok(table) + .build() } /// Loads a table from the Glue Catalog and constructs a `Table` object @@ -432,7 +430,7 @@ impl Catalog for GlueCatalog { let metadata_content = input_file.read().await?; let metadata = serde_json::from_slice::(&metadata_content)?; - let table = Table::builder() + Table::builder() .file_io(self.file_io()) .metadata_location(metadata_location) .metadata(metadata) @@ -440,9 +438,7 @@ impl Catalog for GlueCatalog { NamespaceIdent::new(db_name), table_name.to_owned(), )) - .build(); - - Ok(table) + .build() } } } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 524eceec7..6e5db1968 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -369,14 +369,12 @@ impl Catalog for HmsCatalog { .await .map_err(from_thrift_error)?; - let table = Table::builder() + Table::builder() .file_io(self.file_io()) .metadata_location(metadata_location) .metadata(metadata) .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) - .build(); - - Ok(table) + .build() } /// Loads a table from the Hive Metastore and constructs a `Table` object @@ -407,7 +405,7 @@ impl Catalog for HmsCatalog { let metadata_content = self.file_io.new_input(&metadata_location)?.read().await?; let metadata = serde_json::from_slice::(&metadata_content)?; - let table = Table::builder() + Table::builder() .file_io(self.file_io()) .metadata_location(metadata_location) .metadata(metadata) @@ -415,9 +413,7 @@ impl Catalog for HmsCatalog { NamespaceIdent::new(db_name), table.name.clone(), )) - .build(); - - Ok(table) + .build() } /// Asynchronously drops a table from the database. diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 44086f8d3..05038cb66 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -209,14 +209,12 @@ impl Catalog for MemoryCatalog { root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?; - let table = Table::builder() + Table::builder() .file_io(self.file_io.clone()) .metadata_location(metadata_location) .metadata(metadata) .identifier(table_ident) - .build(); - - Ok(table) + .build() } /// Load table from the catalog. @@ -227,14 +225,13 @@ impl Catalog for MemoryCatalog { let input_file = self.file_io.new_input(metadata_location)?; let metadata_content = input_file.read().await?; let metadata = serde_json::from_slice::(&metadata_content)?; - let table = Table::builder() + + Table::builder() .file_io(self.file_io.clone()) .metadata_location(metadata_location.clone()) .metadata(metadata) .identifier(table_ident.clone()) - .build(); - - Ok(table) + .build() } /// Drop a table from the catalog. diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index d74c8de06..2afefadef 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -516,7 +516,7 @@ impl Catalog for RestCatalog { .load_file_io(resp.metadata_location.as_deref(), resp.config) .await?; - let table = Table::builder() + Table::builder() .identifier(table_ident) .file_io(file_io) .metadata(resp.metadata) @@ -526,9 +526,7 @@ impl Catalog for RestCatalog { "Metadata location missing in create table response!", ) })?) - .build(); - - Ok(table) + .build() } /// Load table from the catalog. @@ -560,9 +558,9 @@ impl Catalog for RestCatalog { .metadata(resp.metadata); if let Some(metadata_location) = resp.metadata_location { - Ok(table_builder.metadata_location(metadata_location).build()) + table_builder.metadata_location(metadata_location).build() } else { - Ok(table_builder.build()) + table_builder.build() } } @@ -661,12 +659,12 @@ impl Catalog for RestCatalog { let file_io = self .load_file_io(Some(&resp.metadata_location), None) .await?; - Ok(Table::builder() + Table::builder() .identifier(commit.identifier().clone()) .file_io(file_io) .metadata(resp.metadata) .metadata_location(resp.metadata_location) - .build()) + .build() } } @@ -1661,6 +1659,7 @@ mod tests { .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) .build() + .unwrap() }; let table = Transaction::new(&table1) @@ -1785,6 +1784,7 @@ mod tests { .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) .build() + .unwrap() }; let table_result = Transaction::new(&table1) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 8b0a7e1eb..6218e98e5 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -60,6 +60,7 @@ derive_builder = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } +moka = { version = "0.12.8", features = ["future"] } murmur3 = { workspace = true } once_cell = { workspace = true } opendal = { workspace = true } diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index b8b7e2718..52a1da23a 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -78,8 +78,10 @@ use storage_memory::*; mod storage_s3; #[cfg(feature = "storage-s3")] pub use storage_s3::*; +pub(crate) mod object_cache; #[cfg(feature = "storage-fs")] mod storage_fs; + #[cfg(feature = "storage-fs")] use storage_fs::*; #[cfg(feature = "storage-gcs")] diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs new file mode 100644 index 000000000..3b89a4e6a --- /dev/null +++ b/crates/iceberg/src/io/object_cache.rs @@ -0,0 +1,402 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::io::FileIO; +use crate::spec::{ + FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef, +}; +use crate::{Error, ErrorKind, Result}; + +const DEFAULT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024; // 32MB + +#[derive(Clone, Debug)] +pub(crate) enum CachedItem { + ManifestList(Arc), + Manifest(Arc), +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub(crate) enum CachedObjectKey { + ManifestList((String, FormatVersion, SchemaId)), + Manifest(String), +} + +/// Caches metadata objects deserialized from immutable files +#[derive(Clone, Debug)] +pub struct ObjectCache { + cache: moka::future::Cache, + file_io: FileIO, + cache_disabled: bool, +} + +impl ObjectCache { + /// Creates a new [`ObjectCache`] + /// with the default cache size + pub(crate) fn new(file_io: FileIO) -> Self { + Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES) + } + + /// Creates a new [`ObjectCache`] + /// with a specific cache size + pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) -> Self { + if cache_size_bytes == 0 { + Self::with_disabled_cache(file_io) + } else { + Self { + cache: moka::future::Cache::builder() + .weigher(|_, val: &CachedItem| match val { + CachedItem::ManifestList(item) => size_of_val(item.as_ref()), + CachedItem::Manifest(item) => size_of_val(item.as_ref()), + } as u32) + .max_capacity(cache_size_bytes) + .build(), + file_io, + cache_disabled: false, + } + } + } + + /// Creates a new [`ObjectCache`] + /// with caching disabled + pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self { + Self { + cache: moka::future::Cache::new(0), + file_io, + cache_disabled: true, + } + } + + /// Retrieves an Arc [`Manifest`] from the cache + /// or retrieves one from FileIO and parses it if not present + pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result> { + if self.cache_disabled { + return manifest_file + .load_manifest(&self.file_io) + .await + .map(Arc::new); + } + + let key = CachedObjectKey::Manifest(manifest_file.manifest_path.clone()); + + let cache_entry = self + .cache + .entry_by_ref(&key) + .or_try_insert_with(self.fetch_and_parse_manifest(manifest_file)) + .await + .map_err(|err| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to load manifest {}", manifest_file.manifest_path), + ) + .with_source(err) + })? + .into_value(); + + match cache_entry { + CachedItem::Manifest(arc_manifest) => Ok(arc_manifest), + _ => Err(Error::new( + ErrorKind::Unexpected, + format!("cached object for key '{:?}' is not a Manifest", key), + )), + } + } + + /// Retrieves an Arc [`ManifestList`] from the cache + /// or retrieves one from FileIO and parses it if not present + pub(crate) async fn get_manifest_list( + &self, + snapshot: &SnapshotRef, + table_metadata: &TableMetadataRef, + ) -> Result> { + if self.cache_disabled { + return snapshot + .load_manifest_list(&self.file_io, table_metadata) + .await + .map(Arc::new); + } + + let key = CachedObjectKey::ManifestList(( + snapshot.manifest_list().to_string(), + table_metadata.format_version, + snapshot.schema_id().unwrap(), + )); + let cache_entry = self + .cache + .entry_by_ref(&key) + .or_try_insert_with(self.fetch_and_parse_manifest_list(snapshot, table_metadata)) + .await + .map_err(|err| Error::new(ErrorKind::Unexpected, err.as_ref().message()))? + .into_value(); + + match cache_entry { + CachedItem::ManifestList(arc_manifest_list) => Ok(arc_manifest_list), + _ => Err(Error::new( + ErrorKind::Unexpected, + format!("cached object for path '{:?}' is not a manifest list", key), + )), + } + } + + async fn fetch_and_parse_manifest(&self, manifest_file: &ManifestFile) -> Result { + let manifest = manifest_file.load_manifest(&self.file_io).await?; + + Ok(CachedItem::Manifest(Arc::new(manifest))) + } + + async fn fetch_and_parse_manifest_list( + &self, + snapshot: &SnapshotRef, + table_metadata: &TableMetadataRef, + ) -> Result { + let manifest_list = snapshot + .load_manifest_list(&self.file_io, table_metadata) + .await?; + + Ok(CachedItem::ManifestList(Arc::new(manifest_list))) + } +} + +#[cfg(test)] +mod tests { + use std::fs; + + use tempfile::TempDir; + use tera::{Context, Tera}; + use uuid::Uuid; + + use super::*; + use crate::io::{FileIO, OutputFile}; + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest, + ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, + ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, + }; + use crate::table::Table; + use crate::TableIdent; + + struct TableTestFixture { + table_location: String, + table: Table, + } + + impl TableTestFixture { + fn new() -> Self { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().join("table1"); + let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); + let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); + let table_metadata1_location = table_location.join("metadata/v1.json"); + + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + let table_metadata = { + let template_json_str = fs::read_to_string(format!( + "{}/testdata/example_table_metadata_v2.json", + env!("CARGO_MANIFEST_DIR") + )) + .unwrap(); + let mut context = Context::new(); + context.insert("table_location", &table_location); + context.insert("manifest_list_1_location", &manifest_list1_location); + context.insert("manifest_list_2_location", &manifest_list2_location); + context.insert("table_metadata_1_location", &table_metadata1_location); + + let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap(); + serde_json::from_str::(&metadata_json).unwrap() + }; + + let table = Table::builder() + .metadata(table_metadata) + .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) + .file_io(file_io.clone()) + .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) + .build() + .unwrap(); + + Self { + table_location: table_location.to_str().unwrap().to_string(), + table, + } + } + + fn next_manifest_file(&self) -> OutputFile { + self.table + .file_io() + .new_output(format!( + "{}/metadata/manifest_{}.avro", + self.table_location, + Uuid::new_v4() + )) + .unwrap() + } + + async fn setup_manifest_files(&mut self) { + let current_snapshot = self.table.metadata().current_snapshot().unwrap(); + let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); + let current_partition_spec = self.table.metadata().default_partition_spec().unwrap(); + + // Write data files + let data_file_manifest = ManifestWriter::new( + self.next_manifest_file(), + current_snapshot.snapshot_id(), + vec![], + ) + .write(Manifest::new( + ManifestMetadata::builder() + .schema((*current_schema).clone()) + .content(ManifestContentType::Data) + .format_version(FormatVersion::V2) + .partition_spec((**current_partition_spec).clone()) + .schema_id(current_schema.schema_id()) + .build(), + vec![ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/1.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(), + ) + .build()], + )) + .await + .unwrap(); + + // Write to manifest list + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(current_snapshot.manifest_list()) + .unwrap(), + current_snapshot.snapshot_id(), + current_snapshot + .parent_snapshot_id() + .unwrap_or(EMPTY_SNAPSHOT_ID), + current_snapshot.sequence_number(), + ); + manifest_list_write + .add_manifests(vec![data_file_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + } + } + + #[tokio::test] + async fn test_get_manifest_list_and_manifest_from_disabled_cache() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone()); + + let result_manifest_list = object_cache + .get_manifest_list( + fixture.table.metadata().current_snapshot().unwrap(), + &fixture.table.metadata_ref(), + ) + .await + .unwrap(); + + assert_eq!(result_manifest_list.entries().len(), 1); + + let manifest_file = result_manifest_list.entries().first().unwrap(); + let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap(); + + assert_eq!( + result_manifest + .entries() + .first() + .unwrap() + .file_path() + .split("/") + .last() + .unwrap(), + "1.parquet" + ); + } + + #[tokio::test] + async fn test_get_manifest_list_and_manifest_from_default_cache() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let object_cache = ObjectCache::new(fixture.table.file_io().clone()); + + // not in cache + let result_manifest_list = object_cache + .get_manifest_list( + fixture.table.metadata().current_snapshot().unwrap(), + &fixture.table.metadata_ref(), + ) + .await + .unwrap(); + + assert_eq!(result_manifest_list.entries().len(), 1); + + // retrieve cached version + let result_manifest_list = object_cache + .get_manifest_list( + fixture.table.metadata().current_snapshot().unwrap(), + &fixture.table.metadata_ref(), + ) + .await + .unwrap(); + + assert_eq!(result_manifest_list.entries().len(), 1); + + let manifest_file = result_manifest_list.entries().first().unwrap(); + + // not in cache + let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap(); + + assert_eq!( + result_manifest + .entries() + .first() + .unwrap() + .file_path() + .split("/") + .last() + .unwrap(), + "1.parquet" + ); + + // retrieve cached version + let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap(); + + assert_eq!( + result_manifest + .entries() + .first() + .unwrap() + .file_path() + .split("/") + .last() + .unwrap(), + "1.parquet" + ); + } +} diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 8a178dde9..04aa1f577 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -32,6 +32,7 @@ use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluato use crate::expr::visitors::inclusive_projection::InclusiveProjection; use crate::expr::visitors::manifest_evaluator::ManifestEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; +use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::runtime::spawn; use crate::spec::{ @@ -242,7 +243,7 @@ impl<'a> TableScanBuilder<'a> { case_sensitive: self.case_sensitive, predicate: self.filter.map(Arc::new), snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new), - file_io: self.table.file_io().clone(), + object_cache: self.table.object_cache(), field_ids: Arc::new(field_ids), partition_filter_cache: Arc::new(PartitionFilterCache::new()), manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), @@ -292,7 +293,7 @@ struct PlanContext { case_sensitive: bool, predicate: Option>, snapshot_bound_predicate: Option>, - file_io: FileIO, + object_cache: Arc, field_ids: Arc>, partition_filter_cache: Arc, @@ -454,8 +455,8 @@ struct ManifestFileContext { sender: Sender, field_ids: Arc>, - file_io: FileIO, bound_predicates: Option>, + object_cache: Arc, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, } @@ -477,24 +478,22 @@ impl ManifestFileContext { /// streaming its constituent [`ManifestEntries`] to the channel provided in the context async fn fetch_manifest_and_stream_manifest_entries(self) -> Result<()> { let ManifestFileContext { - file_io, + object_cache, manifest_file, bound_predicates, snapshot_schema, field_ids, - expression_evaluator_cache, mut sender, + expression_evaluator_cache, .. } = self; - let file_io_cloned = file_io.clone(); - let manifest = manifest_file.load_manifest(&file_io_cloned).await?; - - let (entries, _) = manifest.consume(); + let manifest = object_cache.get_manifest(&manifest_file).await?; - for manifest_entry in entries.into_iter() { + for manifest_entry in manifest.entries() { let manifest_entry_context = ManifestEntryContext { - manifest_entry, + // TODO: refactor to avoid clone + manifest_entry: manifest_entry.clone(), expression_evaluator_cache: expression_evaluator_cache.clone(), field_ids: field_ids.clone(), partition_spec_id: manifest_file.partition_spec_id, @@ -530,9 +529,10 @@ impl ManifestEntryContext { } impl PlanContext { - async fn get_manifest_list(&self) -> Result { - self.snapshot - .load_manifest_list(&self.file_io, &self.table_metadata) + async fn get_manifest_list(&self) -> Result> { + self.object_cache + .as_ref() + .get_manifest_list(&self.snapshot, &self.table_metadata) .await } @@ -559,19 +559,19 @@ impl PlanContext { fn build_manifest_file_contexts( &self, - manifest_list: ManifestList, + manifest_list: Arc, sender: Sender, ) -> Result>>> { let filtered_entries = manifest_list - .consume_entries() - .into_iter() + .entries() + .iter() .filter(|manifest_file| manifest_file.content == ManifestContentType::Data); // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; if self.predicate.is_some() { for manifest_file in filtered_entries { - let partition_bound_predicate = self.get_partition_filter(&manifest_file)?; + let partition_bound_predicate = self.get_partition_filter(manifest_file)?; // evaluate the ManifestFile against the partition filter. Skip // if it cannot contain any matching rows @@ -581,7 +581,7 @@ impl PlanContext { manifest_file.partition_spec_id, partition_bound_predicate.clone(), ) - .eval(&manifest_file)? + .eval(manifest_file)? { let mfc = self.create_manifest_file_context( manifest_file, @@ -603,7 +603,7 @@ impl PlanContext { fn create_manifest_file_context( &self, - manifest_file: ManifestFile, + manifest_file: &ManifestFile, partition_filter: Option>, sender: Sender, ) -> ManifestFileContext { @@ -620,10 +620,10 @@ impl PlanContext { }; ManifestFileContext { - manifest_file, + manifest_file: manifest_file.clone(), bound_predicates, sender, - file_io: self.file_io.clone(), + object_cache: self.object_cache.clone(), snapshot_schema: self.snapshot_schema.clone(), field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), @@ -938,9 +938,10 @@ mod tests { let table = Table::builder() .metadata(table_metadata) .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) - .file_io(file_io) + .file_io(file_io.clone()) .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) - .build(); + .build() + .unwrap(); Self { table_location: table_location.to_str().unwrap().to_string(), diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 14b8a8000..eb3d022fd 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -95,7 +95,7 @@ impl Manifest { } /// Consume this Manifest, returning its constituent parts - pub fn consume(self) -> (Vec, ManifestMetadata) { + pub fn into_parts(self) -> (Vec, ManifestMetadata) { let Self { entries, metadata } = self; (entries, metadata) } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index cd7f046c6..83b6017b9 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -856,7 +856,7 @@ pub(super) mod _serde { } } -#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)] #[repr(u8)] /// Iceberg format version pub enum FormatVersion { diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index d28d6e5d6..406f9dd65 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -16,28 +16,156 @@ // under the License. //! Table API for Apache Iceberg -use typed_builder::TypedBuilder; + +use std::sync::Arc; use crate::arrow::ArrowReaderBuilder; +use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; -use crate::{Result, TableIdent}; +use crate::{Error, ErrorKind, Result, TableIdent}; + +/// Builder to create table scan. +pub struct TableBuilder { + file_io: Option, + metadata_location: Option, + metadata: Option, + identifier: Option, + readonly: bool, + disable_cache: bool, + cache_size_bytes: Option, +} + +impl TableBuilder { + pub(crate) fn new() -> Self { + Self { + file_io: None, + metadata_location: None, + metadata: None, + identifier: None, + readonly: false, + disable_cache: false, + cache_size_bytes: None, + } + } + + /// required - sets the necessary FileIO to use for the table + pub fn file_io(mut self, file_io: FileIO) -> Self { + self.file_io = Some(file_io); + self + } + + /// optional - sets the tables metadata location + pub fn metadata_location>(mut self, metadata_location: T) -> Self { + self.metadata_location = Some(metadata_location.into()); + self + } + + /// required - passes in the TableMetadata to use for the Table + pub fn metadata>(mut self, metadata: T) -> Self { + self.metadata = Some(metadata.into()); + self + } + + /// required - passes in the TableIdent to use for the Table + pub fn identifier(mut self, identifier: TableIdent) -> Self { + self.identifier = Some(identifier); + self + } + + /// specifies if the Table is readonly or not (default not) + pub fn readonly(mut self, readonly: bool) -> Self { + self.readonly = readonly; + self + } + + /// specifies if the Table's metadata cache will be disabled, + /// so that reads of Manifests and ManifestLists will never + /// get cached. + pub fn disable_cache(mut self) -> Self { + self.disable_cache = true; + self + } + + /// optionally set a non-default metadata cache size + pub fn cache_size_bytes(mut self, cache_size_bytes: u64) -> Self { + self.cache_size_bytes = Some(cache_size_bytes); + self + } + + /// build the Table + pub fn build(self) -> Result { + let Self { + file_io, + metadata_location, + metadata, + identifier, + readonly, + disable_cache, + cache_size_bytes, + } = self; + + let Some(file_io) = file_io else { + return Err(Error::new( + ErrorKind::DataInvalid, + "FileIO must be provided with TableBuilder.file_io()", + )); + }; + + let Some(metadata) = metadata else { + return Err(Error::new( + ErrorKind::DataInvalid, + "TableMetadataRef must be provided with TableBuilder.metadata()", + )); + }; + + let Some(identifier) = identifier else { + return Err(Error::new( + ErrorKind::DataInvalid, + "TableIdent must be provided with TableBuilder.identifier()", + )); + }; + + let object_cache = if disable_cache { + Arc::new(ObjectCache::with_disabled_cache(file_io.clone())) + } else if let Some(cache_size_bytes) = cache_size_bytes { + Arc::new(ObjectCache::new_with_capacity( + file_io.clone(), + cache_size_bytes, + )) + } else { + Arc::new(ObjectCache::new(file_io.clone())) + }; + + Ok(Table { + file_io, + metadata_location, + metadata, + identifier, + readonly, + object_cache, + }) + } +} /// Table represents a table in the catalog. -#[derive(TypedBuilder, Debug, Clone)] +#[derive(Debug, Clone)] pub struct Table { file_io: FileIO, - #[builder(default, setter(strip_option, into))] metadata_location: Option, - #[builder(setter(into))] metadata: TableMetadataRef, identifier: TableIdent, - #[builder(default = false)] readonly: bool, + object_cache: Arc, } impl Table { + /// Returns a TableBuilder to build a table + pub fn builder() -> TableBuilder { + TableBuilder::new() + } + /// Returns table identifier. pub fn identifier(&self) -> &TableIdent { &self.identifier @@ -62,6 +190,11 @@ impl Table { &self.file_io } + /// Returns this table's object cache + pub(crate) fn object_cache(&self) -> Arc { + self.object_cache.clone() + } + /// Creates a table scan. pub fn scan(&self) -> TableScanBuilder<'_> { TableScanBuilder::new(self) @@ -117,11 +250,11 @@ impl StaticTable { let table = Table::builder() .metadata(metadata) .identifier(table_ident) - .file_io(file_io) + .file_io(file_io.clone()) .readonly(true) .build(); - Ok(Self(table)) + Ok(Self(table?)) } /// Creates a static table directly from metadata file and `FileIO` pub async fn from_metadata_file( @@ -232,8 +365,9 @@ mod tests { let table = Table::builder() .metadata(table_metadata) .identifier(static_identifier) - .file_io(file_io) - .build(); + .file_io(file_io.clone()) + .build() + .unwrap(); assert!(!table.readonly()); assert_eq!(table.identifier.name(), "table"); } diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 966a021fb..d416383d7 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -234,6 +234,7 @@ mod tests { .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) .build() + .unwrap() } fn make_v2_table() -> Table { @@ -252,6 +253,7 @@ mod tests { .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) .build() + .unwrap() } #[test]