From 547f40780bfd274c218178e7f78712bf4f1e3052 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 7 May 2025 22:21:47 -0700 Subject: [PATCH 1/2] feat: Support Glue catalog for iceberg-playground --- Cargo.lock | 1 + Cargo.toml | 1 + crates/integrations/cli/Cargo.toml | 1 + crates/integrations/cli/src/catalog.rs | 65 +++++++++++++++++-- crates/integrations/datafusion/src/catalog.rs | 18 ++++- crates/integrations/datafusion/src/schema.rs | 30 ++++++--- 6 files changed, 100 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 13062b6de..71ac85bd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3395,6 +3395,7 @@ dependencies = [ "datafusion-cli", "dirs", "fs-err", + "iceberg-catalog-glue", "iceberg-catalog-rest", "iceberg-datafusion", "mimalloc", diff --git a/Cargo.toml b/Cargo.toml index 185b5fc01..2ad61556e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ futures = "0.3" hive_metastore = "0.1" http = "1.1" iceberg = { version = "0.4.0", path = "./crates/iceberg" } +iceberg-catalog-glue = { version = "0.4.0", path = "./crates/catalog/glue" } iceberg-catalog-memory = { version = "0.4.0", path = "./crates/catalog/memory" } iceberg-catalog-rest = { version = "0.4.0", path = "./crates/catalog/rest" } iceberg-datafusion = { version = "0.4.0", path = "./crates/integrations/datafusion" } diff --git a/crates/integrations/cli/Cargo.toml b/crates/integrations/cli/Cargo.toml index a9c6e4543..177f5af42 100644 --- a/crates/integrations/cli/Cargo.toml +++ b/crates/integrations/cli/Cargo.toml @@ -34,6 +34,7 @@ tokio = {workspace = true} anyhow = {workspace = true} iceberg-datafusion = {workspace = true} toml = {workspace = true} +iceberg-catalog-glue = {workspace = true} iceberg-catalog-rest = {workspace = true} tracing = {workspace = true} tracing-subscriber = {workspace = true} diff --git a/crates/integrations/cli/src/catalog.rs b/crates/integrations/cli/src/catalog.rs index e9450f13f..e5f4c9ac7 100644 --- a/crates/integrations/cli/src/catalog.rs +++ b/crates/integrations/cli/src/catalog.rs @@ -16,13 +16,14 @@ // under the License. use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Arc; use anyhow::anyhow; use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use fs_err::read_to_string; +use iceberg_catalog_glue::{GlueCatalog, GlueCatalogConfig}; use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use iceberg_datafusion::IcebergCatalogProvider; use toml::{Table as TomlTable, Value}; @@ -77,10 +78,6 @@ impl IcebergCatalogList { .as_str() .ok_or_else(|| anyhow::anyhow!("type is not string"))?; - if r#type != "rest" { - return Err(anyhow::anyhow!("Only rest catalog is supported for now!")); - } - let catalog_config = config .get("config") .ok_or_else(|| anyhow::anyhow!("config not found for catalog {name}"))? @@ -98,6 +95,14 @@ impl IcebergCatalogList { .ok_or_else(|| anyhow::anyhow!("warehouse not found for catalog {name}"))? .as_str() .ok_or_else(|| anyhow::anyhow!("warehouse is not string for catalog {name}"))?; + + let schemas: HashSet = HashSet::from_iter(catalog_config + .get("schemas") + .and_then(|schemas| schemas.as_array()) + .into_iter() + .flatten() + .filter_map(|value| value.as_str()) + .map(String::from)); let props_table = catalog_config .get("props") @@ -112,9 +117,53 @@ impl IcebergCatalogList { .ok_or_else(|| anyhow::anyhow!("props {key} is not string"))?; props.insert(key.to_string(), value_str.to_string()); } - + + match r#type { + "rest" => Self::rest_catalog_provider(name.to_string(), uri.to_string(), warehouse.to_string(), props).await, + "glue" => { + let catalog_id = catalog_config + .get("catalog_id") + .ok_or_else(|| anyhow::anyhow!("catalog_id not found for catalog {name}"))? + .as_str() + .ok_or_else(|| anyhow::anyhow!("catalog_id is not string for catalog {name}"))?; + Self::glue_catalog_provider(name.to_string(), uri.to_string(), catalog_id.to_string(), warehouse.to_string(), schemas, props).await + }, + _ => Err(anyhow::anyhow!("Unsupported catalog type: {}", r#type)), + } + } + + async fn rest_catalog_provider( + name: String, + uri: String, + warehouse: String, + props: HashMap + ) -> anyhow::Result<(String, Arc)> { let rest_catalog_config = RestCatalogConfig::builder() + .uri(uri) + .warehouse(warehouse) + .props(props) + .build(); + + Ok(( + name.to_string(), + Arc::new( + IcebergCatalogProvider::try_new(Arc::new(RestCatalog::new(rest_catalog_config))) + .await?, + ), + )) + } + + async fn glue_catalog_provider( + name: String, + uri: String, + catalog_id: String, + warehouse: String, + schemas: HashSet, + props: HashMap + ) -> anyhow::Result<(String, Arc)> { + let glue_catalog_config = GlueCatalogConfig::builder() .uri(uri.to_string()) + .catalog_id(catalog_id.to_string()) .warehouse(warehouse.to_string()) .props(props) .build(); @@ -122,7 +171,9 @@ impl IcebergCatalogList { Ok(( name.to_string(), Arc::new( - IcebergCatalogProvider::try_new(Arc::new(RestCatalog::new(rest_catalog_config))) + IcebergCatalogProvider::try_new_with_schemas( + Arc::new(GlueCatalog::new(glue_catalog_config).await?), + schemas) .await?, ), )) diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index c3cbcc88b..e3076d352 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -17,6 +17,7 @@ use std::any::Any; use std::collections::HashMap; +use std::collections::HashSet; use std::sync::Arc; use datafusion::catalog::{CatalogProvider, SchemaProvider}; @@ -46,16 +47,31 @@ impl IcebergCatalogProvider { /// This method retrieves the list of namespace names /// attempts to create a schema provider for each namespace, and /// collects these providers into a `HashMap`. + /// pub async fn try_new(client: Arc) -> Result { + Self::try_new_with_schemas(client, HashSet::new()).await + } + + pub async fn try_new_with_schemas(client: Arc, schemas: HashSet) -> Result { // TODO: // Schemas and providers should be cached and evicted based on time // As of right now; schemas might become stale. - let schema_names: Vec<_> = client + let all_schema_names: Vec<_> = client .list_namespaces(None) .await? .iter() .flat_map(|ns| ns.as_ref().clone()) .collect(); + + let schema_names = if schemas.is_empty() { + all_schema_names + } else { + // Apply filter + all_schema_names + .into_iter() + .filter(|name| schemas.contains(name)) + .collect() + }; let providers = try_join_all( schema_names diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 3be6da426..bab979713 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -23,7 +23,7 @@ use async_trait::async_trait; use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; use datafusion::error::Result as DFResult; -use futures::future::try_join_all; +use futures::future::join_all; use iceberg::{Catalog, NamespaceIdent, Result}; use crate::table::IcebergTableProvider; @@ -61,13 +61,27 @@ impl IcebergSchemaProvider { .map(|tbl| tbl.name().to_string()) .collect(); - let providers = try_join_all( - table_names - .iter() - .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) - .collect::>(), - ) - .await?; + let providers = join_all( + table_names.iter().map(|table_name| { + let client = client.clone(); + let namespace = namespace.clone(); + async move { + match IcebergTableProvider::try_new(client, namespace, table_name).await { + Ok(provider) => { + println!("Successfully loaded table {}", table_name); + Some(provider) + } + Err(e) => { + println!("Error loading table {} due to {}", table_name, e); + None + } + } + } + }) + ).await + .into_iter() + .flatten() // Remove the None values + .collect::>(); let tables: HashMap> = table_names .into_iter() From 43c76fe41b9d3f0b11cc08055c7624db4898f153 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 7 May 2025 22:31:43 -0700 Subject: [PATCH 2/2] fix fmt --- crates/integrations/cli/src/catalog.rs | 65 ++++++++++++------- crates/integrations/datafusion/src/catalog.rs | 13 ++-- crates/integrations/datafusion/src/schema.rs | 37 +++++------ 3 files changed, 68 insertions(+), 47 deletions(-) diff --git a/crates/integrations/cli/src/catalog.rs b/crates/integrations/cli/src/catalog.rs index e5f4c9ac7..856e3da5d 100644 --- a/crates/integrations/cli/src/catalog.rs +++ b/crates/integrations/cli/src/catalog.rs @@ -95,14 +95,16 @@ impl IcebergCatalogList { .ok_or_else(|| anyhow::anyhow!("warehouse not found for catalog {name}"))? .as_str() .ok_or_else(|| anyhow::anyhow!("warehouse is not string for catalog {name}"))?; - - let schemas: HashSet = HashSet::from_iter(catalog_config - .get("schemas") - .and_then(|schemas| schemas.as_array()) - .into_iter() - .flatten() - .filter_map(|value| value.as_str()) - .map(String::from)); + + let schemas: HashSet = HashSet::from_iter( + catalog_config + .get("schemas") + .and_then(|schemas| schemas.as_array()) + .into_iter() + .flatten() + .filter_map(|value| value.as_str()) + .map(String::from), + ); let props_table = catalog_config .get("props") @@ -117,33 +119,51 @@ impl IcebergCatalogList { .ok_or_else(|| anyhow::anyhow!("props {key} is not string"))?; props.insert(key.to_string(), value_str.to_string()); } - + match r#type { - "rest" => Self::rest_catalog_provider(name.to_string(), uri.to_string(), warehouse.to_string(), props).await, + "rest" => { + Self::rest_catalog_provider( + name.to_string(), + uri.to_string(), + warehouse.to_string(), + props, + ) + .await + } "glue" => { let catalog_id = catalog_config .get("catalog_id") .ok_or_else(|| anyhow::anyhow!("catalog_id not found for catalog {name}"))? .as_str() - .ok_or_else(|| anyhow::anyhow!("catalog_id is not string for catalog {name}"))?; - Self::glue_catalog_provider(name.to_string(), uri.to_string(), catalog_id.to_string(), warehouse.to_string(), schemas, props).await - }, + .ok_or_else(|| { + anyhow::anyhow!("catalog_id is not string for catalog {name}") + })?; + Self::glue_catalog_provider( + name.to_string(), + uri.to_string(), + catalog_id.to_string(), + warehouse.to_string(), + schemas, + props, + ) + .await + } _ => Err(anyhow::anyhow!("Unsupported catalog type: {}", r#type)), } } - + async fn rest_catalog_provider( - name: String, + name: String, uri: String, - warehouse: String, - props: HashMap + warehouse: String, + props: HashMap, ) -> anyhow::Result<(String, Arc)> { let rest_catalog_config = RestCatalogConfig::builder() .uri(uri) .warehouse(warehouse) .props(props) .build(); - + Ok(( name.to_string(), Arc::new( @@ -152,14 +172,14 @@ impl IcebergCatalogList { ), )) } - + async fn glue_catalog_provider( name: String, uri: String, catalog_id: String, warehouse: String, schemas: HashSet, - props: HashMap + props: HashMap, ) -> anyhow::Result<(String, Arc)> { let glue_catalog_config = GlueCatalogConfig::builder() .uri(uri.to_string()) @@ -173,8 +193,9 @@ impl IcebergCatalogList { Arc::new( IcebergCatalogProvider::try_new_with_schemas( Arc::new(GlueCatalog::new(glue_catalog_config).await?), - schemas) - .await?, + schemas, + ) + .await?, ), )) } diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index e3076d352..6fc30489c 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -16,8 +16,7 @@ // under the License. use std::any::Any; -use std::collections::HashMap; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use datafusion::catalog::{CatalogProvider, SchemaProvider}; @@ -47,12 +46,14 @@ impl IcebergCatalogProvider { /// This method retrieves the list of namespace names /// attempts to create a schema provider for each namespace, and /// collects these providers into a `HashMap`. - /// pub async fn try_new(client: Arc) -> Result { Self::try_new_with_schemas(client, HashSet::new()).await } - - pub async fn try_new_with_schemas(client: Arc, schemas: HashSet) -> Result { + + pub async fn try_new_with_schemas( + client: Arc, + schemas: HashSet, + ) -> Result { // TODO: // Schemas and providers should be cached and evicted based on time // As of right now; schemas might become stale. @@ -62,7 +63,7 @@ impl IcebergCatalogProvider { .iter() .flat_map(|ns| ns.as_ref().clone()) .collect(); - + let schema_names = if schemas.is_empty() { all_schema_names } else { diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index bab979713..cc8feb358 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -61,27 +61,26 @@ impl IcebergSchemaProvider { .map(|tbl| tbl.name().to_string()) .collect(); - let providers = join_all( - table_names.iter().map(|table_name| { - let client = client.clone(); - let namespace = namespace.clone(); - async move { - match IcebergTableProvider::try_new(client, namespace, table_name).await { - Ok(provider) => { - println!("Successfully loaded table {}", table_name); - Some(provider) - } - Err(e) => { - println!("Error loading table {} due to {}", table_name, e); - None - } + let providers = join_all(table_names.iter().map(|table_name| { + let client = client.clone(); + let namespace = namespace.clone(); + async move { + match IcebergTableProvider::try_new(client, namespace, table_name).await { + Ok(provider) => { + println!("Successfully loaded table {}", table_name); + Some(provider) + } + Err(e) => { + println!("Error loading table {} due to {}", table_name, e); + None } } - }) - ).await - .into_iter() - .flatten() // Remove the None values - .collect::>(); + } + })) + .await + .into_iter() + .flatten() // Remove the None values + .collect::>(); let tables: HashMap> = table_names .into_iter()