diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml new file mode 100644 index 0000000000..61c03fddfd --- /dev/null +++ b/crates/catalog/hms/Cargo.toml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-catalog-hms" +version = "0.1.0" +edition = "2021" + +categories = ["database"] +description = "Apache Iceberg Hive Metastore Catalog Support" +repository = "https://github.com/apache/iceberg-rust" +license = "Apache-2.0" +keywords = ["iceberg", "hive", "catalog"] + +[dependencies] +async-trait = { workspace = true } +hive_metastore = "0.0.1" +iceberg = { workspace = true } +# the thrift upstream suffered from no regular rust release. +# +# [test-rs](https://github.com/tent-rs) is an organization that helps resolves this +# issue. And [tent-thrift](https://github.com/tent-rs/thrift) is a fork of the thrift +# crate, built from the thrift upstream with only version bumped. +thrift = { package = "tent-thrift", version = "0.18.1" } +typed-builder = { workspace = true } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs new file mode 100644 index 0000000000..2b1fe2cc4a --- /dev/null +++ b/crates/catalog/hms/src/catalog.rs @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::utils::*; +use async_trait::async_trait; +use hive_metastore::{TThriftHiveMetastoreSyncClient, ThriftHiveMetastoreSyncClient}; +use iceberg::table::Table; +use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent}; +use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; +use std::sync::{Arc, Mutex}; +use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol}; +use thrift::transport::{ + ReadHalf, TBufferedReadTransport, TBufferedWriteTransport, TIoChannel, WriteHalf, +}; +use typed_builder::TypedBuilder; + +/// Hive metastore Catalog configuration. +#[derive(Debug, TypedBuilder)] +pub struct HmsCatalogConfig { + address: String, +} + +/// TODO: We only support binary protocol for now. +type HmsClientType = ThriftHiveMetastoreSyncClient< + TBinaryInputProtocol>>, + TBinaryOutputProtocol>>, +>; + +/// # TODO +/// +/// we are using the same connection everytime, we should support connection +/// pool in the future. +struct HmsClient(Arc>); + +impl HmsClient { + fn call(&self, f: impl FnOnce(&mut HmsClientType) -> thrift::Result) -> Result { + let mut client = self.0.lock().unwrap(); + f(&mut client).map_err(from_thrift_error) + } +} + +/// Hive metastore Catalog. +pub struct HmsCatalog { + config: HmsCatalogConfig, + client: HmsClient, +} + +impl Debug for HmsCatalog { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HmsCatalog") + .field("config", &self.config) + .finish_non_exhaustive() + } +} + +impl HmsCatalog { + /// Create a new hms catalog. + pub fn new(config: HmsCatalogConfig) -> Result { + let mut channel = thrift::transport::TTcpChannel::new(); + channel + .open(config.address.as_str()) + .map_err(from_thrift_error)?; + let (i_chan, o_chan) = channel.split().map_err(from_thrift_error)?; + let i_chan = TBufferedReadTransport::new(i_chan); + let o_chan = TBufferedWriteTransport::new(o_chan); + let i_proto = TBinaryInputProtocol::new(i_chan, true); + let o_proto = TBinaryOutputProtocol::new(o_chan, true); + let client = ThriftHiveMetastoreSyncClient::new(i_proto, o_proto); + Ok(Self { + config, + client: HmsClient(Arc::new(Mutex::new(client))), + }) + } +} + +/// Refer to for implementation details. +#[async_trait] +impl Catalog for HmsCatalog { + /// HMS doesn't support nested namespaces. + /// + /// We will return empty list if parent is some. + /// + /// Align with java implementation: + async fn list_namespaces( + &self, + parent: Option<&NamespaceIdent>, + ) -> Result> { + let dbs = if parent.is_some() { + return Ok(vec![]); + } else { + self.client.call(|client| client.get_all_databases())? + }; + + Ok(dbs.into_iter().map(NamespaceIdent::new).collect()) + } + + async fn create_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result { + todo!() + } + + async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result { + todo!() + } + + async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result { + todo!() + } + + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result<()> { + todo!() + } + + async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { + todo!() + } + + async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result> { + todo!() + } + + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> Result { + todo!() + } + + async fn load_table(&self, _table: &TableIdent) -> Result
{ + todo!() + } + + async fn drop_table(&self, _table: &TableIdent) -> Result<()> { + todo!() + } + + async fn stat_table(&self, _table: &TableIdent) -> Result { + todo!() + } + + async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { + todo!() + } + + async fn update_table(&self, _commit: TableCommit) -> Result
{ + todo!() + } +} diff --git a/crates/catalog/hms/src/lib.rs b/crates/catalog/hms/src/lib.rs new file mode 100644 index 0000000000..b75e74977a --- /dev/null +++ b/crates/catalog/hms/src/lib.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg Hive Metastore Catalog implementation. + +#![deny(missing_docs)] + +mod catalog; +pub use catalog::*; + +mod utils; diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs new file mode 100644 index 0000000000..0daa52aa1f --- /dev/null +++ b/crates/catalog/hms/src/utils.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iceberg::{Error, ErrorKind}; + +/// Format a thrift error into iceberg error. +pub fn from_thrift_error(error: thrift::Error) -> Error { + Error::new( + ErrorKind::Unexpected, + "operation failed for hitting thrift error".to_string(), + ) + .with_source(error) +}