Skip to content

Commit b288dbe

Browse files
committed
Add datafusion cli for iceberg
Signed-off-by: Ray Liu <[email protected]>
1 parent 6dc9413 commit b288dbe

File tree

6 files changed

+361
-5
lines changed

6 files changed

+361
-5
lines changed

Cargo.toml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,24 +52,27 @@ arrow-select = { version = "54.2.0" }
5252
arrow-string = { version = "54.2.0" }
5353
async-std = "1.12"
5454
async-trait = "0.1.86"
55-
aws-config = "1"
55+
aws-config = "1.5"
5656
aws-sdk-glue = "1.39"
5757
bimap = "0.6"
5858
bitvec = "1.0.1"
5959
bytes = "1.6"
6060
chrono = "0.4.38"
6161
ctor = "0.2.8"
62+
clap = { version = "4.5.32", features = ["derive", "cargo"] }
6263
datafusion = "45"
64+
datafusion-cli = "45"
6365
derive_builder = "0.20"
6466
expect-test = "1"
67+
dirs = "6.0.0"
6568
fnv = "1.0.7"
6669
futures = "0.3"
6770
hive_metastore = "0.1"
6871
http = "1.1"
69-
iceberg = { version = "0.4.0", path = "./crates/iceberg" }
70-
iceberg-catalog-memory = { version = "0.4.0", path = "./crates/catalog/memory" }
71-
iceberg-catalog-rest = { version = "0.4.0", path = "./crates/catalog/rest" }
72-
iceberg-datafusion = { version = "0.4.0", path = "./crates/integrations/datafusion" }
72+
iceberg = { path = "./crates/iceberg" }
73+
iceberg-catalog-memory = { path = "./crates/catalog/memory" }
74+
iceberg-catalog-rest = { path = "./crates/catalog/rest" }
75+
iceberg-datafusion = { path = "./crates/integrations/datafusion" }
7376
itertools = "0.13"
7477
mockito = "1"
7578
murmur3 = "0.5.2"
@@ -99,6 +102,7 @@ tokio = { version = "1.44", default-features = false }
99102
tracing = "0.1.37"
100103
tracing-subscriber = "0.3.8"
101104
typed-builder = "0.20"
105+
toml = "0.8.20"
102106
url = "2.5.4"
103107
uuid = { version = "1.14", features = ["v7"] }
104108
volo-thrift = "0.10"

crates/integrations/cli/Cargo.toml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "iceberg-cli"
20+
description = "Apache iceberg client"
21+
version.workspace = true
22+
edition.workspace = true
23+
homepage.workspace = true
24+
repository.workspace = true
25+
license.workspace = true
26+
rust-version.workspace = true
27+
readme = "README.md"
28+
29+
[dependencies]
30+
clap = {workspace = true}
31+
datafusion-cli = {workspace = true}
32+
datafusion = {workspace = true}
33+
tokio = {workspace = true}
34+
anyhow = {workspace = true}
35+
iceberg-datafusion = {workspace = true}
36+
toml = {workspace = true}
37+
iceberg-catalog-rest = {workspace = true}
38+
tracing = {workspace = true}
39+
tracing-subscriber = {workspace = true}
40+
dirs = {workspace = true}

crates/integrations/cli/README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one
3+
~ or more contributor license agreements. See the NOTICE file
4+
~ distributed with this work for additional information
5+
~ regarding copyright ownership. The ASF licenses this file
6+
~ to you under the Apache License, Version 2.0 (the
7+
~ "License"); you may not use this file except in compliance
8+
~ with the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing,
13+
~ software distributed under the License is distributed on an
14+
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
~ KIND, either express or implied. See the License for the
16+
~ specific language governing permissions and limitations
17+
~ under the License.
18+
-->
19+
20+
21+
# Introduction
22+
23+
Iceberg CLI (`iceberg-cli`) is a small command line utility that runs SQL queries against tables,
24+
which is backed by the DataFusion engine.
25+
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::collections::HashMap;
20+
use std::fs::read_to_string;
21+
use std::path::Path;
22+
use std::sync::Arc;
23+
24+
use anyhow::anyhow;
25+
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
26+
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
27+
use iceberg_datafusion::IcebergCatalogProvider;
28+
use toml::{Table as TomlTable, Value};
29+
30+
const CONFIG_NAME_CATALOGS: &str = "catalogs";
31+
32+
#[derive(Debug)]
33+
pub struct IcebergCatalogList {
34+
catalogs: HashMap<String, Arc<IcebergCatalogProvider>>,
35+
}
36+
37+
impl IcebergCatalogList {
38+
pub async fn parse(path: &Path) -> anyhow::Result<Self> {
39+
let toml_table: TomlTable = toml::from_str(&read_to_string(path)?)?;
40+
Self::parse_table(&toml_table).await
41+
}
42+
43+
pub async fn parse_table(configs: &TomlTable) -> anyhow::Result<Self> {
44+
if let Value::Array(catalogs_config) =
45+
configs.get(CONFIG_NAME_CATALOGS).ok_or_else(|| {
46+
anyhow::Error::msg(format!("{CONFIG_NAME_CATALOGS} entry not found in config"))
47+
})?
48+
{
49+
let mut catalogs = HashMap::with_capacity(catalogs_config.len());
50+
for config in catalogs_config {
51+
if let Value::Table(table_config) = config {
52+
let (name, catalog_provider) =
53+
IcebergCatalogList::parse_one(table_config).await?;
54+
catalogs.insert(name, catalog_provider);
55+
} else {
56+
return Err(anyhow!("{CONFIG_NAME_CATALOGS} entry must be a table"));
57+
}
58+
}
59+
Ok(Self { catalogs })
60+
} else {
61+
Err(anyhow!("{CONFIG_NAME_CATALOGS} must be an array of table!"))
62+
}
63+
}
64+
65+
async fn parse_one(
66+
config: &TomlTable,
67+
) -> anyhow::Result<(String, Arc<IcebergCatalogProvider>)> {
68+
let name = config
69+
.get("name")
70+
.ok_or_else(|| anyhow::anyhow!("name not found for catalog"))?
71+
.as_str()
72+
.ok_or_else(|| anyhow::anyhow!("name is not string"))?;
73+
74+
let r#type = config
75+
.get("type")
76+
.ok_or_else(|| anyhow::anyhow!("type not found for catalog"))?
77+
.as_str()
78+
.ok_or_else(|| anyhow::anyhow!("type is not string"))?;
79+
80+
if r#type != "rest" {
81+
return Err(anyhow::anyhow!("Only rest catalog is supported for now!"));
82+
}
83+
84+
let catalog_config = config
85+
.get("config")
86+
.ok_or_else(|| anyhow::anyhow!("config not found for catalog {name}"))?
87+
.as_table()
88+
.ok_or_else(|| anyhow::anyhow!("config is not table for catalog {name}"))?;
89+
90+
let uri = catalog_config
91+
.get("uri")
92+
.ok_or_else(|| anyhow::anyhow!("uri not found for catalog {name}"))?
93+
.as_str()
94+
.ok_or_else(|| anyhow::anyhow!("uri is not string"))?;
95+
96+
let warehouse = catalog_config
97+
.get("warehouse")
98+
.ok_or_else(|| anyhow::anyhow!("warehouse not found for catalog {name}"))?
99+
.as_str()
100+
.ok_or_else(|| anyhow::anyhow!("warehouse is not string for catalog {name}"))?;
101+
102+
let props_table = catalog_config
103+
.get("props")
104+
.ok_or_else(|| anyhow::anyhow!("props not found for catalog {name}"))?
105+
.as_table()
106+
.ok_or_else(|| anyhow::anyhow!("props is not table for catalog {name}"))?;
107+
108+
let mut props = HashMap::with_capacity(props_table.len());
109+
for (key, value) in props_table {
110+
let value_str = value
111+
.as_str()
112+
.ok_or_else(|| anyhow::anyhow!("props {key} is not string"))?;
113+
props.insert(key.to_string(), value_str.to_string());
114+
}
115+
116+
let rest_catalog_config = RestCatalogConfig::builder()
117+
.uri(uri.to_string())
118+
.warehouse(warehouse.to_string())
119+
.props(props)
120+
.build();
121+
122+
Ok((
123+
name.to_string(),
124+
Arc::new(
125+
IcebergCatalogProvider::try_new(Arc::new(RestCatalog::new(rest_catalog_config)))
126+
.await?,
127+
),
128+
))
129+
}
130+
}
131+
132+
impl CatalogProviderList for IcebergCatalogList {
133+
fn as_any(&self) -> &dyn Any {
134+
self
135+
}
136+
137+
fn register_catalog(
138+
&self,
139+
_name: String,
140+
_catalog: Arc<dyn CatalogProvider>,
141+
) -> Option<Arc<dyn CatalogProvider>> {
142+
tracing::error!("Registering catalog is not supported yet");
143+
None
144+
}
145+
146+
fn catalog_names(&self) -> Vec<String> {
147+
self.catalogs.keys().cloned().collect()
148+
}
149+
150+
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
151+
self.catalogs
152+
.get(name)
153+
.map(|c| c.clone() as Arc<dyn CatalogProvider>)
154+
}
155+
}

crates/integrations/cli/src/lib.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#![doc = include_str!("../README.md")]
19+
pub const ICEBERG_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
20+
21+
mod catalog;
22+
pub use catalog::*;

crates/integrations/cli/src/main.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::path::PathBuf;
19+
use std::process::ExitCode;
20+
use std::str::FromStr;
21+
use std::sync::Arc;
22+
23+
use clap::Parser;
24+
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
25+
use datafusion::prelude::{SessionConfig, SessionContext};
26+
use datafusion_cli::exec;
27+
use datafusion_cli::print_format::PrintFormat;
28+
use datafusion_cli::print_options::{MaxRows, PrintOptions};
29+
use iceberg_cli::{IcebergCatalogList, ICEBERG_CLI_VERSION};
30+
31+
#[derive(Debug, Parser, PartialEq)]
32+
#[clap(author, version, about, long_about= None)]
33+
struct Args {
34+
#[clap(
35+
short = 'r',
36+
long,
37+
help = "Parse catalog config instead of using ~/.icebergrc"
38+
)]
39+
rc: Option<String>,
40+
41+
#[clap(long, value_enum, default_value_t = PrintFormat::Automatic)]
42+
format: PrintFormat,
43+
44+
#[clap(
45+
short,
46+
long,
47+
help = "Reduce printing other than the results and work quietly"
48+
)]
49+
quiet: bool,
50+
51+
#[clap(
52+
long,
53+
help = "The max number of rows to display for 'Table' format\n[possible values: numbers(0/10/...), inf(no limit)]",
54+
default_value = "40"
55+
)]
56+
maxrows: MaxRows,
57+
58+
#[clap(long, help = "Enables console syntax highlighting")]
59+
color: bool,
60+
}
61+
62+
#[tokio::main]
63+
/// Calls [`main_inner`], then handles printing errors and returning the correct exit code
64+
pub async fn main() -> ExitCode {
65+
tracing_subscriber::fmt::init();
66+
67+
if let Err(e) = main_inner().await {
68+
println!("Error: {e}");
69+
return ExitCode::FAILURE;
70+
}
71+
72+
ExitCode::SUCCESS
73+
}
74+
75+
async fn main_inner() -> anyhow::Result<()> {
76+
let args = Args::parse();
77+
78+
if !args.quiet {
79+
println!("ICEBERG CLI v{}", ICEBERG_CLI_VERSION);
80+
}
81+
82+
let session_config = SessionConfig::from_env()?.with_information_schema(true);
83+
84+
let rt_builder = RuntimeEnvBuilder::new();
85+
86+
let runtime_env = rt_builder.build_arc()?;
87+
88+
// enable dynamic file query
89+
let ctx = SessionContext::new_with_config_rt(session_config, runtime_env).enable_url_table();
90+
ctx.refresh_catalogs().await?;
91+
92+
let mut print_options = PrintOptions {
93+
format: args.format,
94+
quiet: args.quiet,
95+
maxrows: args.maxrows,
96+
color: args.color,
97+
};
98+
99+
let rc = match args.rc {
100+
Some(file) => PathBuf::from_str(&file)?,
101+
None => dirs::home_dir()
102+
.map(|h| h.join(".icebergrc"))
103+
.ok_or_else(|| anyhow::anyhow!("cannot find home directory"))?,
104+
};
105+
106+
let catalogs = Arc::new(IcebergCatalogList::parse(&rc).await?);
107+
ctx.register_catalog_list(catalogs);
108+
109+
Ok(exec::exec_from_repl(&ctx, &mut print_options).await?)
110+
}

0 commit comments

Comments
 (0)