Skip to content

Commit 3d137bc

Browse files
committed
Add datafusion cli for iceberg
Signed-off-by: Ray Liu <[email protected]>
1 parent b7e4635 commit 3d137bc

File tree

8 files changed

+588
-6
lines changed

8 files changed

+588
-6
lines changed

Cargo.lock

Lines changed: 324 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,11 @@ bitvec = "1.0.1"
5959
bytes = "1.6"
6060
chrono = "0.4.38"
6161
ctor = "0.2.8"
62+
clap = { version = "4.5.32", features = ["derive", "cargo"] }
6263
datafusion = "45"
64+
datafusion-cli = "45"
6365
derive_builder = "0.20"
66+
dirs = "6.0.0"
6467
env_logger = "0.11.0"
6568
fnv = "1.0.7"
6669
futures = "0.3"
@@ -95,6 +98,7 @@ tempfile = "3.18"
9598
thrift = "0.17.0"
9699
tokio = { version = "1.36", default-features = false }
97100
typed-builder = "0.20"
101+
toml = "0.8.20"
98102
url = "2.5.4"
99103
uuid = { version = "1.13.1", features = ["v7"] }
100104
volo-thrift = "0.10"

crates/integrations/cli/Cargo.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "iceberg-cli"
3+
description = "Apache iceberg client"
4+
version.workspace = true
5+
edition.workspace = true
6+
homepage.workspace = true
7+
repository.workspace = true
8+
license.workspace = true
9+
rust-version.workspace = true
10+
readme = "README.md"
11+
12+
[dependencies]
13+
clap = {workspace = true}
14+
datafusion-cli = {workspace = true}
15+
datafusion = {workspace = true}
16+
tokio = {workspace = true}
17+
env_logger = {workspace = true}
18+
anyhow = {workspace = true}
19+
iceberg-datafusion = {workspace = true}
20+
toml = {workspace = true}
21+
iceberg-catalog-rest = {workspace = true}
22+
log = {workspace = true}
23+
dirs = {workspace = true}

crates/integrations/cli/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Introduction
2+
3+
Iceberg CLI (`iceberg-cli`) is a small command line utility that runs SQL queries against tables,
4+
which is backed by the DataFusion engine.
5+
6+
# Configuration
7+
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
use std::any::Any;
2+
use std::collections::HashMap;
3+
use std::fs::read_to_string;
4+
use std::path::Path;
5+
use std::sync::Arc;
6+
use anyhow::anyhow;
7+
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
8+
use iceberg_datafusion::IcebergCatalogProvider;
9+
use toml::{Table as TomlTable, Value};
10+
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
11+
use log::error;
12+
13+
const CONFIG_NAME_CATALOGS: &str = "catalogs";
14+
15+
#[derive(Debug)]
16+
pub struct IcebergCatalogList {
17+
catalogs: HashMap<String, Arc<IcebergCatalogProvider>>,
18+
}
19+
20+
impl IcebergCatalogList {
21+
pub async fn parse(path: &Path) -> anyhow::Result<Self> {
22+
let toml_table: TomlTable = toml::from_str(&read_to_string(path)?)?;
23+
Self::parse_table(&toml_table).await
24+
}
25+
26+
pub async fn parse_table(configs: &TomlTable) -> anyhow::Result<Self> {
27+
if let Value::Array(catalogs_config) =
28+
configs.get(CONFIG_NAME_CATALOGS).ok_or_else(|| {
29+
anyhow::Error::msg(format!("{CONFIG_NAME_CATALOGS} entry not found in config"))
30+
})?
31+
{
32+
let mut catalogs = HashMap::with_capacity(catalogs_config.len());
33+
for config in catalogs_config {
34+
if let Value::Table(table_config) = config {
35+
let (name, catalog_provider) = IcebergCatalogList::parse_one(table_config)
36+
.await?;
37+
catalogs.insert(name, catalog_provider);
38+
} else {
39+
return Err(anyhow!("{CONFIG_NAME_CATALOGS} entry must be a table"));
40+
}
41+
}
42+
Ok(Self { catalogs })
43+
} else {
44+
Err(anyhow!("{CONFIG_NAME_CATALOGS} must be an array of table!"))
45+
}
46+
}
47+
48+
async fn parse_one(config: &TomlTable)
49+
-> anyhow::Result<(String, Arc<IcebergCatalogProvider>)> {
50+
let name = config
51+
.get("name")
52+
.ok_or_else(|| anyhow::anyhow!("name not found for catalog"))?
53+
.as_str()
54+
.ok_or_else(|| anyhow::anyhow!("name is not string"))?;
55+
56+
let typ = config
57+
.get("type")
58+
.ok_or_else(|| anyhow::anyhow!("type not found for catalog"))?
59+
.as_str()
60+
.ok_or_else(|| anyhow::anyhow!("type is not string"))?;
61+
62+
if typ != "rest" {
63+
return Err(anyhow::anyhow!("Only rest catalog is supported for now!"));
64+
}
65+
66+
let catalog_config = config
67+
.get("config")
68+
.ok_or_else(|| anyhow::anyhow!("config not found for catalog {name}"))?
69+
.as_table()
70+
.ok_or_else(|| anyhow::anyhow!("config is not table for catalog {name}"))?;
71+
72+
let uri = catalog_config
73+
.get("uri")
74+
.ok_or_else(|| anyhow::anyhow!("uri not found for catalog {name}"))?
75+
.as_str()
76+
.ok_or_else(|| anyhow::anyhow!("uri is not string"))?;
77+
78+
let warehouse = catalog_config
79+
.get("warehouse")
80+
.ok_or_else(|| anyhow::anyhow!("warehouse not found for catalog {name}"))?
81+
.as_str()
82+
.ok_or_else(|| anyhow::anyhow!("warehouse is not string for catalog {name}"))?;
83+
84+
let props_table = catalog_config
85+
.get("props")
86+
.ok_or_else(|| anyhow::anyhow!("props not found for catalog {name}"))?
87+
.as_table()
88+
.ok_or_else(|| anyhow::anyhow!("props is not table for catalog {name}"))?;
89+
90+
let mut props = HashMap::with_capacity(props_table.len());
91+
for (key, value) in props_table {
92+
let value_str = value.as_str()
93+
.ok_or_else(|| anyhow::anyhow!("props {key} is not string"))?;
94+
props.insert(key.to_string(), value_str.to_string());
95+
}
96+
97+
let rest_catalog_config = RestCatalogConfig::builder()
98+
.uri(uri.to_string())
99+
.warehouse(warehouse.to_string())
100+
.props(props)
101+
.build();
102+
103+
Ok((name.to_string(), Arc::new(IcebergCatalogProvider::try_new(Arc::new(
104+
RestCatalog::new(rest_catalog_config))).await?)))
105+
}
106+
}
107+
108+
impl CatalogProviderList for IcebergCatalogList {
109+
fn as_any(&self) -> &dyn Any {
110+
self
111+
}
112+
113+
fn register_catalog(
114+
&self,
115+
_name: String,
116+
_catalog: Arc<dyn CatalogProvider>,
117+
) -> Option<Arc<dyn CatalogProvider>> {
118+
error!("Registering catalog is not supported yet");
119+
None
120+
}
121+
122+
fn catalog_names(&self) -> Vec<String> {
123+
self.catalogs.keys().cloned().collect()
124+
}
125+
126+
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
127+
self.catalogs.get(name)
128+
.map(|c| c.clone() as Arc<dyn CatalogProvider>)
129+
}
130+
}

crates/integrations/cli/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
2+
#![doc = include_str!("../README.md")]
3+
pub const ICEBERG_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
4+
5+
mod catalog;
6+
pub use catalog::*;

crates/integrations/cli/src/main.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use std::path::PathBuf;
2+
use std::process::ExitCode;
3+
use std::str::FromStr;
4+
use std::sync::Arc;
5+
6+
use clap::Parser;
7+
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
8+
use datafusion::prelude::{SessionConfig, SessionContext};
9+
use datafusion_cli::print_format::PrintFormat;
10+
use datafusion_cli::print_options::{MaxRows, PrintOptions};
11+
use datafusion_cli::exec;
12+
use iceberg_cli::{IcebergCatalogList, ICEBERG_CLI_VERSION};
13+
14+
#[derive(Debug, Parser, PartialEq)]
15+
#[clap(author, version, about, long_about= None)]
16+
struct Args {
17+
#[clap(
18+
short = 'r',
19+
long,
20+
help = "Parse catalog config instead of using ~/.icebergrc",
21+
)]
22+
rc: Option<String>,
23+
24+
#[clap(long, value_enum, default_value_t = PrintFormat::Automatic)]
25+
format: PrintFormat,
26+
27+
#[clap(
28+
short,
29+
long,
30+
help = "Reduce printing other than the results and work quietly"
31+
)]
32+
quiet: bool,
33+
34+
#[clap(
35+
long,
36+
help = "The max number of rows to display for 'Table' format\n[possible values: numbers(0/10/...), inf(no limit)]",
37+
default_value = "40"
38+
)]
39+
maxrows: MaxRows,
40+
41+
#[clap(long, help = "Enables console syntax highlighting")]
42+
color: bool,
43+
}
44+
45+
#[tokio::main]
46+
/// Calls [`main_inner`], then handles printing errors and returning the correct exit code
47+
pub async fn main() -> ExitCode {
48+
if let Err(e) = main_inner().await {
49+
println!("Error: {e}");
50+
return ExitCode::FAILURE;
51+
}
52+
53+
ExitCode::SUCCESS
54+
}
55+
56+
async fn main_inner() -> anyhow::Result<()> {
57+
env_logger::init();
58+
let args = Args::parse();
59+
60+
if !args.quiet {
61+
println!("DataFusion CLI v{}", ICEBERG_CLI_VERSION);
62+
}
63+
64+
let session_config = SessionConfig::from_env()?.with_information_schema(true);
65+
66+
let mut rt_builder = RuntimeEnvBuilder::new();
67+
68+
let runtime_env = rt_builder.build_arc()?;
69+
70+
// enable dynamic file query
71+
let ctx = SessionContext::new_with_config_rt(session_config, runtime_env).enable_url_table();
72+
ctx.refresh_catalogs().await?;
73+
74+
let mut print_options = PrintOptions {
75+
format: args.format,
76+
quiet: args.quiet,
77+
maxrows: args.maxrows,
78+
color: args.color,
79+
};
80+
81+
let rc = match args.rc {
82+
Some(file) => PathBuf::from_str(&file)?,
83+
None => dirs::home_dir()
84+
.map(|h| h.join(".icebergrc"))
85+
.ok_or_else(|| anyhow::anyhow!("cannot find home directory"))?,
86+
};
87+
88+
let catalogs = Arc::new(IcebergCatalogList::parse(&rc).await?);
89+
ctx.register_catalog_list(catalogs);
90+
91+
Ok(exec::exec_from_repl(&ctx, &mut print_options)
92+
.await?)
93+
}

rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@
1818
# iceberg-rust use unstable rust to run linters, such as `clippy` and `rustfmt`. But this will not affect downstream users,
1919
# and only MSRV is required.
2020
[toolchain]
21-
channel = "nightly-2024-06-10"
21+
channel = "nightly-2025-03-10"
2222
components = ["rustfmt", "clippy"]

0 commit comments

Comments
 (0)