diff --git a/Cargo.lock b/Cargo.lock index 13062b6de..71ac85bd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3395,6 +3395,7 @@ dependencies = [ "datafusion-cli", "dirs", "fs-err", + "iceberg-catalog-glue", "iceberg-catalog-rest", "iceberg-datafusion", "mimalloc", diff --git a/Cargo.toml b/Cargo.toml index 185b5fc01..2ad61556e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ futures = "0.3" hive_metastore = "0.1" http = "1.1" iceberg = { version = "0.4.0", path = "./crates/iceberg" } +iceberg-catalog-glue = { version = "0.4.0", path = "./crates/catalog/glue" } iceberg-catalog-memory = { version = "0.4.0", path = "./crates/catalog/memory" } iceberg-catalog-rest = { version = "0.4.0", path = "./crates/catalog/rest" } iceberg-datafusion = { version = "0.4.0", path = "./crates/integrations/datafusion" } diff --git a/crates/integrations/cli/Cargo.toml b/crates/integrations/cli/Cargo.toml index a9c6e4543..177f5af42 100644 --- a/crates/integrations/cli/Cargo.toml +++ b/crates/integrations/cli/Cargo.toml @@ -34,6 +34,7 @@ tokio = {workspace = true} anyhow = {workspace = true} iceberg-datafusion = {workspace = true} toml = {workspace = true} +iceberg-catalog-glue = {workspace = true} iceberg-catalog-rest = {workspace = true} tracing = {workspace = true} tracing-subscriber = {workspace = true} diff --git a/crates/integrations/cli/src/catalog.rs b/crates/integrations/cli/src/catalog.rs index e9450f13f..856e3da5d 100644 --- a/crates/integrations/cli/src/catalog.rs +++ b/crates/integrations/cli/src/catalog.rs @@ -16,13 +16,14 @@ // under the License. use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Arc; use anyhow::anyhow; use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use fs_err::read_to_string; +use iceberg_catalog_glue::{GlueCatalog, GlueCatalogConfig}; use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use iceberg_datafusion::IcebergCatalogProvider; use toml::{Table as TomlTable, Value}; @@ -77,10 +78,6 @@ impl IcebergCatalogList { .as_str() .ok_or_else(|| anyhow::anyhow!("type is not string"))?; - if r#type != "rest" { - return Err(anyhow::anyhow!("Only rest catalog is supported for now!")); - } - let catalog_config = config .get("config") .ok_or_else(|| anyhow::anyhow!("config not found for catalog {name}"))? @@ -99,6 +96,16 @@ impl IcebergCatalogList { .as_str() .ok_or_else(|| anyhow::anyhow!("warehouse is not string for catalog {name}"))?; + let schemas: HashSet = HashSet::from_iter( + catalog_config + .get("schemas") + .and_then(|schemas| schemas.as_array()) + .into_iter() + .flatten() + .filter_map(|value| value.as_str()) + .map(String::from), + ); + let props_table = catalog_config .get("props") .ok_or_else(|| anyhow::anyhow!("props not found for catalog {name}"))? @@ -113,9 +120,47 @@ impl IcebergCatalogList { props.insert(key.to_string(), value_str.to_string()); } + match r#type { + "rest" => { + Self::rest_catalog_provider( + name.to_string(), + uri.to_string(), + warehouse.to_string(), + props, + ) + .await + } + "glue" => { + let catalog_id = catalog_config + .get("catalog_id") + .ok_or_else(|| anyhow::anyhow!("catalog_id not found for catalog {name}"))? + .as_str() + .ok_or_else(|| { + anyhow::anyhow!("catalog_id is not string for catalog {name}") + })?; + Self::glue_catalog_provider( + name.to_string(), + uri.to_string(), + catalog_id.to_string(), + warehouse.to_string(), + schemas, + props, + ) + .await + } + _ => Err(anyhow::anyhow!("Unsupported catalog type: {}", r#type)), + } + } + + async fn rest_catalog_provider( + name: String, + uri: String, + warehouse: String, + props: HashMap, + ) -> anyhow::Result<(String, Arc)> { let rest_catalog_config = RestCatalogConfig::builder() - .uri(uri.to_string()) - .warehouse(warehouse.to_string()) + .uri(uri) + .warehouse(warehouse) .props(props) .build(); @@ -127,6 +172,33 @@ impl IcebergCatalogList { ), )) } + + async fn glue_catalog_provider( + name: String, + uri: String, + catalog_id: String, + warehouse: String, + schemas: HashSet, + props: HashMap, + ) -> anyhow::Result<(String, Arc)> { + let glue_catalog_config = GlueCatalogConfig::builder() + .uri(uri.to_string()) + .catalog_id(catalog_id.to_string()) + .warehouse(warehouse.to_string()) + .props(props) + .build(); + + Ok(( + name.to_string(), + Arc::new( + IcebergCatalogProvider::try_new_with_schemas( + Arc::new(GlueCatalog::new(glue_catalog_config).await?), + schemas, + ) + .await?, + ), + )) + } } impl CatalogProviderList for IcebergCatalogList { diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index c3cbcc88b..6fc30489c 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use datafusion::catalog::{CatalogProvider, SchemaProvider}; @@ -47,16 +47,33 @@ impl IcebergCatalogProvider { /// attempts to create a schema provider for each namespace, and /// collects these providers into a `HashMap`. pub async fn try_new(client: Arc) -> Result { + Self::try_new_with_schemas(client, HashSet::new()).await + } + + pub async fn try_new_with_schemas( + client: Arc, + schemas: HashSet, + ) -> Result { // TODO: // Schemas and providers should be cached and evicted based on time // As of right now; schemas might become stale. - let schema_names: Vec<_> = client + let all_schema_names: Vec<_> = client .list_namespaces(None) .await? .iter() .flat_map(|ns| ns.as_ref().clone()) .collect(); + let schema_names = if schemas.is_empty() { + all_schema_names + } else { + // Apply filter + all_schema_names + .into_iter() + .filter(|name| schemas.contains(name)) + .collect() + }; + let providers = try_join_all( schema_names .iter() diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 3be6da426..cc8feb358 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -23,7 +23,7 @@ use async_trait::async_trait; use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; use datafusion::error::Result as DFResult; -use futures::future::try_join_all; +use futures::future::join_all; use iceberg::{Catalog, NamespaceIdent, Result}; use crate::table::IcebergTableProvider; @@ -61,13 +61,26 @@ impl IcebergSchemaProvider { .map(|tbl| tbl.name().to_string()) .collect(); - let providers = try_join_all( - table_names - .iter() - .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) - .collect::>(), - ) - .await?; + let providers = join_all(table_names.iter().map(|table_name| { + let client = client.clone(); + let namespace = namespace.clone(); + async move { + match IcebergTableProvider::try_new(client, namespace, table_name).await { + Ok(provider) => { + println!("Successfully loaded table {}", table_name); + Some(provider) + } + Err(e) => { + println!("Error loading table {} due to {}", table_name, e); + None + } + } + } + })) + .await + .into_iter() + .flatten() // Remove the None values + .collect::>(); let tables: HashMap> = table_names .into_iter()