diff --git a/Cargo.lock b/Cargo.lock index 4411365b0..c4458eb09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3313,6 +3313,15 @@ dependencies = [ "volo-thrift", ] +[[package]] +name = "iceberg-catalog-loader" +version = "0.4.0" +dependencies = [ + "iceberg", + "iceberg-catalog-rest", + "tokio", +] + [[package]] name = "iceberg-catalog-memory" version = "0.4.0" diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml new file mode 100644 index 000000000..f1d38c90a --- /dev/null +++ b/crates/catalog/loader/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "iceberg-catalog-loader" +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Catalog Loader API" +keywords = ["iceberg", "catalog"] +license = { workspace = true } +repository = { workspace = true } + +[dependencies] +iceberg = { workspace = true } +iceberg-catalog-rest = {workspace = true} +tokio = { workspace = true } diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs new file mode 100644 index 000000000..336d7defd --- /dev/null +++ b/crates/catalog/loader/src/lib.rs @@ -0,0 +1,64 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; +use iceberg_catalog_rest::RestCatalogBuilder; + +type BoxedCatalogBuilderFuture = Pin>>>>; + +pub trait BoxedCatalogBuilder { + fn name(&mut self, name: String); + fn uri(&mut self, uri: String); + fn warehouse(&mut self, warehouse: String); + fn with_prop(&mut self, key: String, value: String); + + fn build(self: Box) -> BoxedCatalogBuilderFuture; +} + +impl BoxedCatalogBuilder for T { + fn name(&mut self, name: String) { + self.name(name); + } + + fn uri(&mut self, uri: String) { + self.uri(uri); + } + + fn warehouse(&mut self, warehouse: String) { + self.warehouse(warehouse); + } + + fn with_prop(&mut self, key: String, value: String) { + self.with_prop(key, value); + } + + fn build(self: Box) -> BoxedCatalogBuilderFuture { + let builder = *self; + Box::pin(async move { Ok(Arc::new(builder.build().await.unwrap()) as Arc) }) + } +} + +pub fn load(r#type: &str) -> Result> { + match r#type { + "rest" => Ok(Box::new(RestCatalogBuilder::default()) as Box), + _ => Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported catalog type: {}", r#type), + )), + } +} + +#[cfg(test)] +mod tests { + use crate::load; + + #[tokio::test] + async fn test_load() { + let mut catalog = load("rest").unwrap(); + catalog.name("rest".to_string()); + catalog.with_prop("key".to_string(), "value".to_string()); + + catalog.build().await.unwrap(); + } +} diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 7bf54d4d8..bd58e7d6e 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -18,14 +18,15 @@ //! This module contains the iceberg REST catalog implementation. use std::collections::HashMap; +use std::future::Future; use std::str::FromStr; use async_trait::async_trait; use iceberg::io::FileIO; use iceberg::table::Table; use iceberg::{ - Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, - TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, + TableCreation, TableIdent, }; use itertools::Itertools; use reqwest::header::{ @@ -51,6 +52,8 @@ const PATH_V1: &str = "v1"; /// Rest catalog configuration. #[derive(Clone, Debug, TypedBuilder)] pub struct RestCatalogConfig { + #[builder(default, setter(strip_option))] + name: Option, uri: String, #[builder(default, setter(strip_option(fallback = warehouse_opt)))] @@ -223,6 +226,74 @@ impl RestCatalogConfig { } } +/// Builder for [`RestCatalog`]. +#[derive(Debug)] +pub struct RestCatalogBuilder(RestCatalogConfig); + +impl Default for RestCatalogBuilder { + fn default() -> Self { + Self(RestCatalogConfig { + name: None, + uri: "".to_string(), + warehouse: None, + props: HashMap::new(), + client: None, + }) + } +} + +impl CatalogBuilder for RestCatalogBuilder { + type C = RestCatalog; + + fn name(&mut self, name: impl Into) -> &mut Self { + self.0.name = Some(name.into()); + self + } + + fn uri(&mut self, uri: impl Into) -> &mut Self { + self.0.uri = uri.into(); + self + } + + fn warehouse(&mut self, warehouse: impl Into) -> &mut Self { + self.0.warehouse = Some(warehouse.into()); + self + } + + fn with_prop(&mut self, key: impl Into, value: impl Into) -> &mut Self { + self.0.props.insert(key.into(), value.into()); + self + } + + fn build(self) -> impl Future> { + let result = { + if self.0.name.is_none() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog name is required", + )) + } else if self.0.uri.is_empty() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog uri is required", + )) + } else { + Ok(RestCatalog::new(self.0)) + } + }; + + std::future::ready(result) + } +} + +impl RestCatalogBuilder { + /// Configures the catalog with a custom HTTP client. + pub fn with_client(&mut self, client: Client) -> &mut Self { + self.0.client = Some(client); + self + } +} + #[derive(Debug)] struct RestContext { client: HttpClient, @@ -2257,4 +2328,17 @@ mod tests { config_mock.assert_async().await; update_table_mock.assert_async().await; } + + #[tokio::test] + async fn test_create_rest_catalog() { + let mut builder = RestCatalogBuilder::default(); + builder + .name("test") + .uri("http://localhost:8080") + .with_client(Client::new()) + .with_prop("a", "b"); + let catalog = builder.build().await; + + assert!(catalog.is_ok()); + } } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index e57152abc..147eacbdb 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Display}; +use std::future::Future; use std::mem::take; use std::ops::Deref; @@ -96,6 +97,22 @@ pub trait Catalog: Debug + Sync + Send { async fn update_table(&self, commit: TableCommit) -> Result; } +/// Common interface for all catalog builders. +pub trait CatalogBuilder: Default + Debug + Send + Sync { + /// The catalog type that this builder creates. + type C: Catalog; + /// Configure the name of the catalog. + fn name(&mut self, name: impl Into) -> &mut Self; + /// Configure uri of the catalog. + fn uri(&mut self, uri: impl Into) -> &mut Self; + /// Configure the warehouse location of the catalog. + fn warehouse(&mut self, warehouse: impl Into) -> &mut Self; + /// Configure properties of the catalog. + fn with_prop(&mut self, key: impl Into, value: impl Into) -> &mut Self; + /// Create the catalog + fn build(self) -> impl Future>; +} + /// NamespaceIdent represents the identifier of a namespace in the catalog. /// /// The namespace identifier is a list of strings, where each string is a diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 556ff3e02..f94b2b113 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -62,10 +62,7 @@ pub use error::{Error, ErrorKind, Result}; mod catalog; -pub use catalog::{ - Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, - TableUpdate, ViewCreation, -}; +pub use catalog::*; pub mod table;