Skip to content

Commit 32d82df

Browse files
committed
Add datafusion cli for iceberg
Signed-off-by: Ray Liu <[email protected]>
1 parent b31ebcc commit 32d82df

File tree

6 files changed

+314
-0
lines changed

6 files changed

+314
-0
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,13 @@ bitvec = "1.0.1"
5959
bytes = "1.6"
6060
chrono = "0.4.38"
6161
ctor = "0.2.8"
62+
clap = { version = "4.5.32", features = ["derive", "cargo"] }
6263
datafusion = "45"
64+
datafusion-cli = "45"
6365
derive_builder = "0.20"
6466
expect-test = "1"
67+
dirs = "6.0.0"
68+
env_logger = "0.11.0"
6569
fnv = "1.0.7"
6670
futures = "0.3"
6771
hive_metastore = "0.1"
@@ -99,6 +103,7 @@ tokio = { version = "1.44", default-features = false }
99103
tracing = "0.1.37"
100104
tracing-subscriber = "0.3.8"
101105
typed-builder = "0.20"
106+
toml = "0.8.20"
102107
url = "2.5.4"
103108
uuid = { version = "1.14", features = ["v7"] }
104109
volo-thrift = "0.10"

crates/integrations/cli/Cargo.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "iceberg-cli"
3+
description = "Apache iceberg client"
4+
version.workspace = true
5+
edition.workspace = true
6+
homepage.workspace = true
7+
repository.workspace = true
8+
license.workspace = true
9+
rust-version.workspace = true
10+
readme = "README.md"
11+
12+
[dependencies]
13+
clap = {workspace = true}
14+
datafusion-cli = {workspace = true}
15+
datafusion = {workspace = true}
16+
tokio = {workspace = true}
17+
env_logger = {workspace = true}
18+
anyhow = {workspace = true}
19+
iceberg-datafusion = {workspace = true}
20+
toml = {workspace = true}
21+
iceberg-catalog-rest = {workspace = true}
22+
log = {workspace = true}
23+
dirs = {workspace = true}

crates/integrations/cli/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Introduction
2+
3+
Iceberg CLI (`iceberg-cli`) is a small command line utility that runs SQL queries against tables,
4+
which is backed by the DataFusion engine.
5+
6+
# Configuration
7+
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::collections::HashMap;
20+
use std::fs::read_to_string;
21+
use std::path::Path;
22+
use std::sync::Arc;
23+
use anyhow::anyhow;
24+
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
25+
use iceberg_datafusion::IcebergCatalogProvider;
26+
use toml::{Table as TomlTable, Value};
27+
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
28+
use log::error;
29+
30+
const CONFIG_NAME_CATALOGS: &str = "catalogs";
31+
32+
#[derive(Debug)]
33+
pub struct IcebergCatalogList {
34+
catalogs: HashMap<String, Arc<IcebergCatalogProvider>>,
35+
}
36+
37+
impl IcebergCatalogList {
38+
pub async fn parse(path: &Path) -> anyhow::Result<Self> {
39+
let toml_table: TomlTable = toml::from_str(&read_to_string(path)?)?;
40+
Self::parse_table(&toml_table).await
41+
}
42+
43+
pub async fn parse_table(configs: &TomlTable) -> anyhow::Result<Self> {
44+
if let Value::Array(catalogs_config) =
45+
configs.get(CONFIG_NAME_CATALOGS).ok_or_else(|| {
46+
anyhow::Error::msg(format!("{CONFIG_NAME_CATALOGS} entry not found in config"))
47+
})?
48+
{
49+
let mut catalogs = HashMap::with_capacity(catalogs_config.len());
50+
for config in catalogs_config {
51+
if let Value::Table(table_config) = config {
52+
let (name, catalog_provider) = IcebergCatalogList::parse_one(table_config)
53+
.await?;
54+
catalogs.insert(name, catalog_provider);
55+
} else {
56+
return Err(anyhow!("{CONFIG_NAME_CATALOGS} entry must be a table"));
57+
}
58+
}
59+
Ok(Self { catalogs })
60+
} else {
61+
Err(anyhow!("{CONFIG_NAME_CATALOGS} must be an array of table!"))
62+
}
63+
}
64+
65+
async fn parse_one(config: &TomlTable)
66+
-> anyhow::Result<(String, Arc<IcebergCatalogProvider>)> {
67+
let name = config
68+
.get("name")
69+
.ok_or_else(|| anyhow::anyhow!("name not found for catalog"))?
70+
.as_str()
71+
.ok_or_else(|| anyhow::anyhow!("name is not string"))?;
72+
73+
let r#type = config
74+
.get("type")
75+
.ok_or_else(|| anyhow::anyhow!("type not found for catalog"))?
76+
.as_str()
77+
.ok_or_else(|| anyhow::anyhow!("type is not string"))?;
78+
79+
if r#type != "rest" {
80+
return Err(anyhow::anyhow!("Only rest catalog is supported for now!"));
81+
}
82+
83+
let catalog_config = config
84+
.get("config")
85+
.ok_or_else(|| anyhow::anyhow!("config not found for catalog {name}"))?
86+
.as_table()
87+
.ok_or_else(|| anyhow::anyhow!("config is not table for catalog {name}"))?;
88+
89+
let uri = catalog_config
90+
.get("uri")
91+
.ok_or_else(|| anyhow::anyhow!("uri not found for catalog {name}"))?
92+
.as_str()
93+
.ok_or_else(|| anyhow::anyhow!("uri is not string"))?;
94+
95+
let warehouse = catalog_config
96+
.get("warehouse")
97+
.ok_or_else(|| anyhow::anyhow!("warehouse not found for catalog {name}"))?
98+
.as_str()
99+
.ok_or_else(|| anyhow::anyhow!("warehouse is not string for catalog {name}"))?;
100+
101+
let props_table = catalog_config
102+
.get("props")
103+
.ok_or_else(|| anyhow::anyhow!("props not found for catalog {name}"))?
104+
.as_table()
105+
.ok_or_else(|| anyhow::anyhow!("props is not table for catalog {name}"))?;
106+
107+
let mut props = HashMap::with_capacity(props_table.len());
108+
for (key, value) in props_table {
109+
let value_str = value.as_str()
110+
.ok_or_else(|| anyhow::anyhow!("props {key} is not string"))?;
111+
props.insert(key.to_string(), value_str.to_string());
112+
}
113+
114+
let rest_catalog_config = RestCatalogConfig::builder()
115+
.uri(uri.to_string())
116+
.warehouse(warehouse.to_string())
117+
.props(props)
118+
.build();
119+
120+
Ok((name.to_string(), Arc::new(IcebergCatalogProvider::try_new(Arc::new(
121+
RestCatalog::new(rest_catalog_config))).await?)))
122+
}
123+
}
124+
125+
impl CatalogProviderList for IcebergCatalogList {
126+
fn as_any(&self) -> &dyn Any {
127+
self
128+
}
129+
130+
fn register_catalog(
131+
&self,
132+
_name: String,
133+
_catalog: Arc<dyn CatalogProvider>,
134+
) -> Option<Arc<dyn CatalogProvider>> {
135+
error!("Registering catalog is not supported yet");
136+
None
137+
}
138+
139+
fn catalog_names(&self) -> Vec<String> {
140+
self.catalogs.keys().cloned().collect()
141+
}
142+
143+
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
144+
self.catalogs.get(name)
145+
.map(|c| c.clone() as Arc<dyn CatalogProvider>)
146+
}
147+
}

crates/integrations/cli/src/lib.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#![doc = include_str!("../README.md")]
19+
pub const ICEBERG_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
20+
21+
mod catalog;
22+
pub use catalog::*;

crates/integrations/cli/src/main.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::path::PathBuf;
19+
use std::process::ExitCode;
20+
use std::str::FromStr;
21+
use std::sync::Arc;
22+
23+
use clap::Parser;
24+
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
25+
use datafusion::prelude::{SessionConfig, SessionContext};
26+
use datafusion_cli::print_format::PrintFormat;
27+
use datafusion_cli::print_options::{MaxRows, PrintOptions};
28+
use datafusion_cli::exec;
29+
use iceberg_cli::{IcebergCatalogList, ICEBERG_CLI_VERSION};
30+
31+
#[derive(Debug, Parser, PartialEq)]
32+
#[clap(author, version, about, long_about= None)]
33+
struct Args {
34+
#[clap(
35+
short = 'r',
36+
long,
37+
help = "Parse catalog config instead of using ~/.icebergrc",
38+
)]
39+
rc: Option<String>,
40+
41+
#[clap(long, value_enum, default_value_t = PrintFormat::Automatic)]
42+
format: PrintFormat,
43+
44+
#[clap(
45+
short,
46+
long,
47+
help = "Reduce printing other than the results and work quietly"
48+
)]
49+
quiet: bool,
50+
51+
#[clap(
52+
long,
53+
help = "The max number of rows to display for 'Table' format\n[possible values: numbers(0/10/...), inf(no limit)]",
54+
default_value = "40"
55+
)]
56+
maxrows: MaxRows,
57+
58+
#[clap(long, help = "Enables console syntax highlighting")]
59+
color: bool,
60+
}
61+
62+
#[tokio::main]
63+
/// Calls [`main_inner`], then handles printing errors and returning the correct exit code
64+
pub async fn main() -> ExitCode {
65+
if let Err(e) = main_inner().await {
66+
println!("Error: {e}");
67+
return ExitCode::FAILURE;
68+
}
69+
70+
ExitCode::SUCCESS
71+
}
72+
73+
async fn main_inner() -> anyhow::Result<()> {
74+
env_logger::init();
75+
let args = Args::parse();
76+
77+
if !args.quiet {
78+
println!("DataFusion CLI v{}", ICEBERG_CLI_VERSION);
79+
}
80+
81+
let session_config = SessionConfig::from_env()?.with_information_schema(true);
82+
83+
let mut rt_builder = RuntimeEnvBuilder::new();
84+
85+
let runtime_env = rt_builder.build_arc()?;
86+
87+
// enable dynamic file query
88+
let ctx = SessionContext::new_with_config_rt(session_config, runtime_env).enable_url_table();
89+
ctx.refresh_catalogs().await?;
90+
91+
let mut print_options = PrintOptions {
92+
format: args.format,
93+
quiet: args.quiet,
94+
maxrows: args.maxrows,
95+
color: args.color,
96+
};
97+
98+
let rc = match args.rc {
99+
Some(file) => PathBuf::from_str(&file)?,
100+
None => dirs::home_dir()
101+
.map(|h| h.join(".icebergrc"))
102+
.ok_or_else(|| anyhow::anyhow!("cannot find home directory"))?,
103+
};
104+
105+
let catalogs = Arc::new(IcebergCatalogList::parse(&rc).await?);
106+
ctx.register_catalog_list(catalogs);
107+
108+
Ok(exec::exec_from_repl(&ctx, &mut print_options)
109+
.await?)
110+
}

0 commit comments

Comments
 (0)