Skip to content

Commit

Permalink
Object Cache: caches parsed Manifests and ManifestLists for performan…
Browse files Browse the repository at this point in the history
…ce (apache#512)

* feat: adds ObjectCache, to cache Manifests and ManifestLists

* refactor: change obj cache method names and use more readable default usize value

* chore: improve error message

Co-authored-by: Renjie Liu <[email protected]>

* fix: change object cache retrieval method visibility

Co-authored-by: Renjie Liu <[email protected]>

* feat: improved error message in object cache get_manifest

* test(object-cache): add unit tests for object cache manifest and manifest list retrieval

* fix: ensure that object cache insertions are weighted by size

* test: fix test typo

* fix: ensure object cache weight is that of the wrapped item, not the Arc

---------

Co-authored-by: Renjie Liu <[email protected]>
  • Loading branch information
sdd and liurenjie1024 authored Aug 19, 2024
1 parent 4440af6 commit a1ec0fa
Show file tree
Hide file tree
Showing 12 changed files with 599 additions and 68 deletions.
12 changes: 4 additions & 8 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,12 @@ impl Catalog for GlueCatalog {

builder.send().await.map_err(from_aws_sdk_error)?;

let table = Table::builder()
Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
.build();

Ok(table)
.build()
}

/// Loads a table from the Glue Catalog and constructs a `Table` object
Expand Down Expand Up @@ -432,17 +430,15 @@ impl Catalog for GlueCatalog {
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

let table = Table::builder()
Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(
NamespaceIdent::new(db_name),
table_name.to_owned(),
))
.build();

Ok(table)
.build()
}
}
}
Expand Down
12 changes: 4 additions & 8 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,14 +369,12 @@ impl Catalog for HmsCatalog {
.await
.map_err(from_thrift_error)?;

let table = Table::builder()
Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
.build();

Ok(table)
.build()
}

/// Loads a table from the Hive Metastore and constructs a `Table` object
Expand Down Expand Up @@ -407,17 +405,15 @@ impl Catalog for HmsCatalog {
let metadata_content = self.file_io.new_input(&metadata_location)?.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

let table = Table::builder()
Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(
NamespaceIdent::new(db_name),
table.name.clone(),
))
.build();

Ok(table)
.build()
}

/// Asynchronously drops a table from the database.
Expand Down
13 changes: 5 additions & 8 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,12 @@ impl Catalog for MemoryCatalog {

root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;

let table = Table::builder()
Table::builder()
.file_io(self.file_io.clone())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(table_ident)
.build();

Ok(table)
.build()
}

/// Load table from the catalog.
Expand All @@ -227,14 +225,13 @@ impl Catalog for MemoryCatalog {
let input_file = self.file_io.new_input(metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
let table = Table::builder()

Table::builder()
.file_io(self.file_io.clone())
.metadata_location(metadata_location.clone())
.metadata(metadata)
.identifier(table_ident.clone())
.build();

Ok(table)
.build()
}

/// Drop a table from the catalog.
Expand Down
16 changes: 8 additions & 8 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ impl Catalog for RestCatalog {
.load_file_io(resp.metadata_location.as_deref(), resp.config)
.await?;

let table = Table::builder()
Table::builder()
.identifier(table_ident)
.file_io(file_io)
.metadata(resp.metadata)
Expand All @@ -526,9 +526,7 @@ impl Catalog for RestCatalog {
"Metadata location missing in create table response!",
)
})?)
.build();

Ok(table)
.build()
}

/// Load table from the catalog.
Expand Down Expand Up @@ -560,9 +558,9 @@ impl Catalog for RestCatalog {
.metadata(resp.metadata);

if let Some(metadata_location) = resp.metadata_location {
Ok(table_builder.metadata_location(metadata_location).build())
table_builder.metadata_location(metadata_location).build()
} else {
Ok(table_builder.build())
table_builder.build()
}
}

Expand Down Expand Up @@ -661,12 +659,12 @@ impl Catalog for RestCatalog {
let file_io = self
.load_file_io(Some(&resp.metadata_location), None)
.await?;
Ok(Table::builder()
Table::builder()
.identifier(commit.identifier().clone())
.file_io(file_io)
.metadata(resp.metadata)
.metadata_location(resp.metadata_location)
.build())
.build()
}
}

Expand Down Expand Up @@ -1661,6 +1659,7 @@ mod tests {
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
.build()
.unwrap()
};

let table = Transaction::new(&table1)
Expand Down Expand Up @@ -1785,6 +1784,7 @@ mod tests {
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
.build()
.unwrap()
};

let table_result = Transaction::new(&table1)
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ derive_builder = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
moka = { version = "0.12.8", features = ["future"] }
murmur3 = { workspace = true }
once_cell = { workspace = true }
opendal = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ use storage_memory::*;
mod storage_s3;
#[cfg(feature = "storage-s3")]
pub use storage_s3::*;
pub(crate) mod object_cache;
#[cfg(feature = "storage-fs")]
mod storage_fs;

#[cfg(feature = "storage-fs")]
use storage_fs::*;
#[cfg(feature = "storage-gcs")]
Expand Down
Loading

0 comments on commit a1ec0fa

Please sign in to comment.