From f162400af948628ab6f25ddf6b148ed87662328e Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 21 Apr 2025 18:34:12 +0800 Subject: [PATCH 1/5] Experiment implementation for catalog builder --- crates/catalog/rest/src/catalog.rs | 88 +++++++++++++++++++++++++++++- crates/iceberg/src/catalog/mod.rs | 17 ++++++ crates/iceberg/src/lib.rs | 4 +- 3 files changed, 105 insertions(+), 4 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 7bf54d4d8..a324ef3b0 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -18,14 +18,15 @@ //! This module contains the iceberg REST catalog implementation. use std::collections::HashMap; +use std::future::Future; use std::str::FromStr; use async_trait::async_trait; use iceberg::io::FileIO; use iceberg::table::Table; use iceberg::{ - Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, - TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, + TableCreation, TableIdent, }; use itertools::Itertools; use reqwest::header::{ @@ -51,6 +52,8 @@ const PATH_V1: &str = "v1"; /// Rest catalog configuration. #[derive(Clone, Debug, TypedBuilder)] pub struct RestCatalogConfig { + #[builder(default, setter(strip_option))] + name: Option, uri: String, #[builder(default, setter(strip_option(fallback = warehouse_opt)))] @@ -223,6 +226,74 @@ impl RestCatalogConfig { } } +/// Builder for [`RestCatalog`]. +#[derive(Debug)] +pub struct RestCatalogBuilder(RestCatalogConfig); + +impl Default for RestCatalogBuilder { + fn default() -> Self { + Self(RestCatalogConfig { + name: None, + uri: "".to_string(), + warehouse: None, + props: HashMap::new(), + client: None, + }) + } +} + +impl CatalogBuilder for RestCatalogBuilder { + type C = RestCatalog; + + fn name(mut self, name: impl Into) -> Self { + self.0.name = Some(name.into()); + self + } + + fn uri(mut self, uri: impl Into) -> Self { + self.0.uri = uri.into(); + self + } + + fn warehouse(mut self, warehouse: impl Into) -> Self { + self.0.warehouse = Some(warehouse.into()); + self + } + + fn with_prop(mut self, key: impl Into, value: impl Into) -> Self { + self.0.props.insert(key.into(), value.into()); + self + } + + fn build(self) -> impl Future> { + let result = { + if self.0.name.is_none() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog name is required", + )) + } else if self.0.uri.is_empty() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog uri is required", + )) + } else { + Ok(RestCatalog::new(self.0)) + } + }; + + std::future::ready(result) + } +} + +impl RestCatalogBuilder { + /// Configures the catalog with a custom HTTP client. + pub fn with_client(mut self, client: Client) -> Self { + self.0.client = Some(client); + self + } +} + #[derive(Debug)] struct RestContext { client: HttpClient, @@ -2257,4 +2328,17 @@ mod tests { config_mock.assert_async().await; update_table_mock.assert_async().await; } + + #[tokio::test] + async fn test_create_rest_catalog() { + let catalog = RestCatalogBuilder::default() + .name("test") + .uri("http://localhost:8080") + .with_client(Client::new()) + .with_prop("a", "b") + .build() + .await; + + assert!(catalog.is_ok()); + } } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index e57152abc..d385bffe6 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Display}; +use std::future::Future; use std::mem::take; use std::ops::Deref; @@ -96,6 +97,22 @@ pub trait Catalog: Debug + Sync + Send { async fn update_table(&self, commit: TableCommit) -> Result; } +/// Common interface for all catalog builders. +pub trait CatalogBuilder: Default + Debug + Send + Sync { + /// The catalog type that this builder creates. + type C: Catalog; + /// Configure name of the catalog. + fn name(self, name: impl Into) -> Self; + /// Configure uri of the catalog. + fn uri(self, uri: impl Into) -> Self; + /// Configure warehouse location of the catalog. + fn warehouse(self, warehouse: impl Into) -> Self; + /// Configure properties of the catalog. + fn with_prop(self, key: impl Into, value: impl Into) -> Self; + /// Create the catalog + fn build(self) -> impl Future>; +} + /// NamespaceIdent represents the identifier of a namespace in the catalog. /// /// The namespace identifier is a list of strings, where each string is a diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 556ff3e02..9c8e9c12f 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -63,8 +63,8 @@ pub use error::{Error, ErrorKind, Result}; mod catalog; pub use catalog::{ - Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, - TableUpdate, ViewCreation, + Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, + TableRequirement, TableUpdate, ViewCreation, }; pub mod table; From f6c5937a186184909c7689d65288c8bcd74794c2 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 25 Apr 2025 11:06:58 +0800 Subject: [PATCH 2/5] Loader example --- Cargo.lock | 8 +++++ crates/catalog/loader/Cargo.toml | 16 ++++++++++ crates/catalog/loader/src/lib.rs | 52 ++++++++++++++++++++++++++++++++ crates/iceberg/src/lib.rs | 5 +-- 4 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 crates/catalog/loader/Cargo.toml create mode 100644 crates/catalog/loader/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 4411365b0..5d935fd4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3313,6 +3313,14 @@ dependencies = [ "volo-thrift", ] +[[package]] +name = "iceberg-catalog-loader" +version = "0.4.0" +dependencies = [ + "iceberg", + "iceberg-catalog-rest", +] + [[package]] name = "iceberg-catalog-memory" version = "0.4.0" diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml new file mode 100644 index 000000000..058f56920 --- /dev/null +++ b/crates/catalog/loader/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "iceberg-catalog-loader" +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Catalog Loader API" +keywords = ["iceberg", "catalog"] +license = { workspace = true } +repository = { workspace = true } + +[dependencies] +iceberg = { workspace = true } +iceberg-catalog-rest = {workspace = true} diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs new file mode 100644 index 000000000..6fa58c3a6 --- /dev/null +++ b/crates/catalog/loader/src/lib.rs @@ -0,0 +1,52 @@ +use std::sync::Arc; +use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; +use iceberg_catalog_rest::RestCatalogBuilder; + + +pub enum CatalogBuilderDef { + Rest(RestCatalogBuilder), +} + +pub fn load(r#type: &str) -> Result { + match r#type { + "rest" => Ok(CatalogBuilderDef::Rest(RestCatalogBuilder::default())), + _ => Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported catalog type: {}", r#type), + )), + } +} + +impl CatalogBuilderDef { + pub fn name(self, name: impl Into) -> Self { + match self { + CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.name(name)), + } + } + + pub fn uri(self, uri: impl Into) -> Self { + match self { + CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.uri(uri)), + } + } + + pub fn warehouse(self, warehouse: impl Into) -> Self { + match self { + CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.warehouse(warehouse)), + } + } + + pub fn with_prop(self, key: impl Into, value: impl Into) -> Self { + match self { + CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.with_prop(key, value)), + } + } + + pub async fn build(self) -> Result> { + match self { + CatalogBuilderDef::Rest(builder) => builder.build() + .await + .map(|c| Arc::new(c) as Arc), + } + } +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 9c8e9c12f..f94b2b113 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -62,10 +62,7 @@ pub use error::{Error, ErrorKind, Result}; mod catalog; -pub use catalog::{ - Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, - TableRequirement, TableUpdate, ViewCreation, -}; +pub use catalog::*; pub mod table; From a569f7d0ac0a4bb1299d7826ef378f4179693361 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 25 Apr 2025 11:08:05 +0800 Subject: [PATCH 3/5] format --- crates/catalog/loader/src/lib.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index 6fa58c3a6..559c06f92 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -1,8 +1,8 @@ use std::sync::Arc; + use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; use iceberg_catalog_rest::RestCatalogBuilder; - pub enum CatalogBuilderDef { Rest(RestCatalogBuilder), } @@ -23,28 +23,33 @@ impl CatalogBuilderDef { CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.name(name)), } } - + pub fn uri(self, uri: impl Into) -> Self { match self { CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.uri(uri)), } } - + pub fn warehouse(self, warehouse: impl Into) -> Self { match self { - CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.warehouse(warehouse)), + CatalogBuilderDef::Rest(builder) => { + CatalogBuilderDef::Rest(builder.warehouse(warehouse)) + } } } - + pub fn with_prop(self, key: impl Into, value: impl Into) -> Self { match self { - CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.with_prop(key, value)), + CatalogBuilderDef::Rest(builder) => { + CatalogBuilderDef::Rest(builder.with_prop(key, value)) + } } } - + pub async fn build(self) -> Result> { match self { - CatalogBuilderDef::Rest(builder) => builder.build() + CatalogBuilderDef::Rest(builder) => builder + .build() .await .map(|c| Arc::new(c) as Arc), } From 2d494d51f09c4ba341ac7ac85eabb1f7f941543a Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 30 Apr 2025 14:53:50 +0800 Subject: [PATCH 4/5] Keep both --- crates/catalog/loader/src/lib.rs | 66 +++++++++++++----------------- crates/catalog/rest/src/catalog.rs | 18 ++++---- crates/iceberg/src/catalog/mod.rs | 12 +++--- 3 files changed, 43 insertions(+), 53 deletions(-) diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index 559c06f92..434a4fb37 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -1,57 +1,47 @@ +use std::future::Future; use std::sync::Arc; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; use iceberg_catalog_rest::RestCatalogBuilder; -pub enum CatalogBuilderDef { - Rest(RestCatalogBuilder), +pub trait BoxedCatalogBuilder { + fn name(&mut self, name: String); + fn uri(&mut self, uri: String); + fn warehouse(&mut self, warehouse: String); + fn with_prop(&mut self, key: String, value: String); + + fn build(self: Box) -> Box>>>; } -pub fn load(r#type: &str) -> Result { - match r#type { - "rest" => Ok(CatalogBuilderDef::Rest(RestCatalogBuilder::default())), - _ => Err(Error::new( - ErrorKind::FeatureUnsupported, - format!("Unsupported catalog type: {}", r#type), - )), +impl BoxedCatalogBuilder for T { + fn name(&mut self, name: String) { + self.name(name); } -} -impl CatalogBuilderDef { - pub fn name(self, name: impl Into) -> Self { - match self { - CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.name(name)), - } + fn uri(&mut self, uri: String) { + self.uri(uri); } - pub fn uri(self, uri: impl Into) -> Self { - match self { - CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.uri(uri)), - } + fn warehouse(&mut self, warehouse: String) { + self.warehouse(warehouse); } - pub fn warehouse(self, warehouse: impl Into) -> Self { - match self { - CatalogBuilderDef::Rest(builder) => { - CatalogBuilderDef::Rest(builder.warehouse(warehouse)) - } - } + fn with_prop(&mut self, key: String, value: String) { + self.with_prop(key, value); } - pub fn with_prop(self, key: impl Into, value: impl Into) -> Self { - match self { - CatalogBuilderDef::Rest(builder) => { - CatalogBuilderDef::Rest(builder.with_prop(key, value)) - } - } + fn build(self: Box) -> Box>>> { + let builder = *self; + Box::new(async move { Ok(Arc::new(builder.build().await.unwrap()) as Arc) }) } +} - pub async fn build(self) -> Result> { - match self { - CatalogBuilderDef::Rest(builder) => builder - .build() - .await - .map(|c| Arc::new(c) as Arc), - } +pub fn load(r#type: &str) -> Result> { + match r#type { + "rest" => Ok(Box::new(RestCatalogBuilder::default()) as Box), + _ => Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported catalog type: {}", r#type), + )), } } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index a324ef3b0..bd58e7d6e 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -245,22 +245,22 @@ impl Default for RestCatalogBuilder { impl CatalogBuilder for RestCatalogBuilder { type C = RestCatalog; - fn name(mut self, name: impl Into) -> Self { + fn name(&mut self, name: impl Into) -> &mut Self { self.0.name = Some(name.into()); self } - fn uri(mut self, uri: impl Into) -> Self { + fn uri(&mut self, uri: impl Into) -> &mut Self { self.0.uri = uri.into(); self } - fn warehouse(mut self, warehouse: impl Into) -> Self { + fn warehouse(&mut self, warehouse: impl Into) -> &mut Self { self.0.warehouse = Some(warehouse.into()); self } - fn with_prop(mut self, key: impl Into, value: impl Into) -> Self { + fn with_prop(&mut self, key: impl Into, value: impl Into) -> &mut Self { self.0.props.insert(key.into(), value.into()); self } @@ -288,7 +288,7 @@ impl CatalogBuilder for RestCatalogBuilder { impl RestCatalogBuilder { /// Configures the catalog with a custom HTTP client. - pub fn with_client(mut self, client: Client) -> Self { + pub fn with_client(&mut self, client: Client) -> &mut Self { self.0.client = Some(client); self } @@ -2331,13 +2331,13 @@ mod tests { #[tokio::test] async fn test_create_rest_catalog() { - let catalog = RestCatalogBuilder::default() + let mut builder = RestCatalogBuilder::default(); + builder .name("test") .uri("http://localhost:8080") .with_client(Client::new()) - .with_prop("a", "b") - .build() - .await; + .with_prop("a", "b"); + let catalog = builder.build().await; assert!(catalog.is_ok()); } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index d385bffe6..147eacbdb 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -101,14 +101,14 @@ pub trait Catalog: Debug + Sync + Send { pub trait CatalogBuilder: Default + Debug + Send + Sync { /// The catalog type that this builder creates. type C: Catalog; - /// Configure name of the catalog. - fn name(self, name: impl Into) -> Self; + /// Configure the name of the catalog. + fn name(&mut self, name: impl Into) -> &mut Self; /// Configure uri of the catalog. - fn uri(self, uri: impl Into) -> Self; - /// Configure warehouse location of the catalog. - fn warehouse(self, warehouse: impl Into) -> Self; + fn uri(&mut self, uri: impl Into) -> &mut Self; + /// Configure the warehouse location of the catalog. + fn warehouse(&mut self, warehouse: impl Into) -> &mut Self; /// Configure properties of the catalog. - fn with_prop(self, key: impl Into, value: impl Into) -> Self; + fn with_prop(&mut self, key: impl Into, value: impl Into) -> &mut Self; /// Create the catalog fn build(self) -> impl Future>; } From f4a2efea385230cd330026705316e16331a1c612 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 30 Apr 2025 15:10:55 +0800 Subject: [PATCH 5/5] Fix clippy --- Cargo.lock | 1 + crates/catalog/loader/Cargo.toml | 1 + crates/catalog/loader/src/lib.rs | 23 ++++++++++++++++++++--- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d935fd4c..c4458eb09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3319,6 +3319,7 @@ version = "0.4.0" dependencies = [ "iceberg", "iceberg-catalog-rest", + "tokio", ] [[package]] diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml index 058f56920..f1d38c90a 100644 --- a/crates/catalog/loader/Cargo.toml +++ b/crates/catalog/loader/Cargo.toml @@ -14,3 +14,4 @@ repository = { workspace = true } [dependencies] iceberg = { workspace = true } iceberg-catalog-rest = {workspace = true} +tokio = { workspace = true } diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index 434a4fb37..336d7defd 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -1,16 +1,19 @@ use std::future::Future; +use std::pin::Pin; use std::sync::Arc; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; use iceberg_catalog_rest::RestCatalogBuilder; +type BoxedCatalogBuilderFuture = Pin>>>>; + pub trait BoxedCatalogBuilder { fn name(&mut self, name: String); fn uri(&mut self, uri: String); fn warehouse(&mut self, warehouse: String); fn with_prop(&mut self, key: String, value: String); - fn build(self: Box) -> Box>>>; + fn build(self: Box) -> BoxedCatalogBuilderFuture; } impl BoxedCatalogBuilder for T { @@ -30,9 +33,9 @@ impl BoxedCatalogBuilder for T { self.with_prop(key, value); } - fn build(self: Box) -> Box>>> { + fn build(self: Box) -> BoxedCatalogBuilderFuture { let builder = *self; - Box::new(async move { Ok(Arc::new(builder.build().await.unwrap()) as Arc) }) + Box::pin(async move { Ok(Arc::new(builder.build().await.unwrap()) as Arc) }) } } @@ -45,3 +48,17 @@ pub fn load(r#type: &str) -> Result> { )), } } + +#[cfg(test)] +mod tests { + use crate::load; + + #[tokio::test] + async fn test_load() { + let mut catalog = load("rest").unwrap(); + catalog.name("rest".to_string()); + catalog.with_prop("key".to_string(), "value".to_string()); + + catalog.build().await.unwrap(); + } +}