From c6291aa13159111fa9bcdd61366e255768882b01 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Mon, 5 May 2025 23:01:44 +0900 Subject: [PATCH 1/4] Support retrieving the latest Iceberg table on table scan (#11) * Allow resolving the current snapshot ID to use on a scan from a callback function * Use table_fn * Fix * Just pass a reference to the catalog * make public * Just take table ident * lint --- crates/integrations/datafusion/src/schema.rs | 7 ++- .../integrations/datafusion/src/table/mod.rs | 44 ++++++++++++++----- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 3be6da426..452801797 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -24,7 +24,7 @@ use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; use datafusion::error::Result as DFResult; use futures::future::try_join_all; -use iceberg::{Catalog, NamespaceIdent, Result}; +use iceberg::{Catalog, NamespaceIdent, Result, TableIdent}; use crate::table::IcebergTableProvider; @@ -64,7 +64,10 @@ impl IcebergSchemaProvider { let providers = try_join_all( table_names .iter() - .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) + .map(|name| { + let table_ident = TableIdent::new(namespace.clone(), name.clone()); + IcebergTableProvider::try_new(client.clone(), table_ident) + }) .collect::>(), ) .await?; diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 00c9e1322..dcb3d5fc6 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -24,18 +24,18 @@ use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::catalog::Session; use datafusion::datasource::{TableProvider, TableType}; -use datafusion::error::Result as DFResult; +use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; use iceberg::arrow::schema_to_arrow_schema; use iceberg::table::Table; -use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent}; use crate::physical_plan::scan::IcebergTableScan; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct IcebergTableProvider { /// A table in the catalog. table: Table, @@ -43,6 +43,18 @@ pub struct IcebergTableProvider { snapshot_id: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, + /// A reference to the catalog that this table provider belongs to. + catalog: Option>, +} + +impl std::fmt::Debug for IcebergTableProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IcebergTableProvider") + .field("table", &self.table) + .field("snapshot_id", &self.snapshot_id) + .field("schema", &self.schema) + .finish_non_exhaustive() + } } impl IcebergTableProvider { @@ -51,24 +63,21 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + catalog: None, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using the given client and table name to fetch an actual [`Table`] /// in the provided namespace. - pub(crate) async fn try_new( - client: Arc, - namespace: NamespaceIdent, - name: impl Into, - ) -> Result { - let ident = TableIdent::new(namespace, name.into()); - let table = client.load_table(&ident).await?; + pub async fn try_new(client: Arc, table_name: TableIdent) -> Result { + let table = client.load_table(&table_name).await?; let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { table, snapshot_id: None, + catalog: Some(client), schema, }) } @@ -80,6 +89,7 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: None, + catalog: None, schema, }) } @@ -104,6 +114,7 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: Some(snapshot_id), + catalog: None, schema, }) } @@ -130,8 +141,19 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { + // Get the latest table metadata from the catalog if it exists + let table = if let Some(catalog) = &self.catalog { + catalog + .load_table(self.table.identifier()) + .await + .map_err(|e| { + DataFusionError::Execution(format!("Error getting Iceberg table metadata: {e}")) + })? + } else { + self.table.clone() + }; Ok(Arc::new(IcebergTableScan::new( - self.table.clone(), + table, self.snapshot_id, self.schema.clone(), projection, From 866d5a4a929201469bafd034e99206f6d17b001d Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Wed, 7 May 2025 12:03:50 +0900 Subject: [PATCH 2/4] Add integration test for new DataFusion table provider creation method --- crates/catalog/memory/src/catalog.rs | 13 +++++++ .../integrations/datafusion/src/table/mod.rs | 12 +----- .../tests/integration_datafusion_test.rs | 39 ++++++++++++++++++- ...-c49f-4e40-9236-a50fd0884b5d.metadata.json | 1 + 4 files changed, 52 insertions(+), 13 deletions(-) create mode 100644 crates/integrations/datafusion/tests/test_data/scan_snapshot_update/test.db/test_table/metadata/00000-754ae971-c49f-4e40-9236-a50fd0884b5d.metadata.json diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index cf4ad7216..65aaf9d4d 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -53,6 +53,19 @@ impl MemoryCatalog { warehouse_location, } } + + /// Register an existing table in the memory catalog. + pub async fn register_existing_table( + &self, + table_ident: &TableIdent, + metadata_location: String, + ) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.insert_new_table(table_ident, metadata_location.clone())?; + + Ok(()) + } } #[async_trait] diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index dcb3d5fc6..c12772c4d 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -35,7 +35,7 @@ use crate::physical_plan::scan::IcebergTableScan; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct IcebergTableProvider { /// A table in the catalog. table: Table, @@ -47,16 +47,6 @@ pub struct IcebergTableProvider { catalog: Option>, } -impl std::fmt::Debug for IcebergTableProvider { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("IcebergTableProvider") - .field("table", &self.table) - .field("snapshot_id", &self.snapshot_id) - .field("schema", &self.schema) - .finish_non_exhaustive() - } -} - impl IcebergTableProvider { pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { IcebergTableProvider { diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 715635e06..4aa105ca1 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -27,9 +27,9 @@ use datafusion::execution::context::SessionContext; use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; use iceberg::io::FileIOBuilder; use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type}; -use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; +use iceberg::{Catalog, NamespaceIdent, Result, TableCreation, TableIdent}; use iceberg_catalog_memory::MemoryCatalog; -use iceberg_datafusion::IcebergCatalogProvider; +use iceberg_datafusion::{IcebergCatalogProvider, IcebergTableProvider}; use tempfile::TempDir; fn temp_path() -> String { @@ -295,3 +295,38 @@ async fn test_table_predict_pushdown() -> Result<()> { assert!(s.value(1).trim().contains(expected)); Ok(()) } + +#[tokio::test] +async fn test_table_scan_snapshot() -> Result<()> { + let iceberg_catalog = get_iceberg_catalog(); + let namespace = NamespaceIdent::new("test".to_string()); + set_test_namespace(&iceberg_catalog, &namespace).await?; + + let current_dir = std::env::current_dir().unwrap(); + let metadata_path = current_dir.join("tests/test_data/scan_snapshot_update/test.db/test_table/metadata/00000-754ae971-c49f-4e40-9236-a50fd0884b5d.metadata.json"); + + let table_ident = TableIdent::new(namespace, "test_table".to_string()); + iceberg_catalog + .register_existing_table(&table_ident, metadata_path.display().to_string()) + .await?; + + let client = Arc::new(iceberg_catalog); + let table = Arc::new( + IcebergTableProvider::try_new(Arc::clone(&client) as Arc, table_ident.clone()) + .await?, + ); + + let ctx = SessionContext::new(); + ctx.register_table("df_test", table) + .expect("failed to register table"); + let records = ctx + .sql("select * from df_test") + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(0, records.len()); + + Ok(()) +} diff --git a/crates/integrations/datafusion/tests/test_data/scan_snapshot_update/test.db/test_table/metadata/00000-754ae971-c49f-4e40-9236-a50fd0884b5d.metadata.json b/crates/integrations/datafusion/tests/test_data/scan_snapshot_update/test.db/test_table/metadata/00000-754ae971-c49f-4e40-9236-a50fd0884b5d.metadata.json new file mode 100644 index 000000000..5e66dfd65 --- /dev/null +++ b/crates/integrations/datafusion/tests/test_data/scan_snapshot_update/test.db/test_table/metadata/00000-754ae971-c49f-4e40-9236-a50fd0884b5d.metadata.json @@ -0,0 +1 @@ +{"location":"./tests/test_data/scan_snapshot_update/test.db/test_table","table-uuid":"e94dbe67-55ae-40e6-b0ff-8b48ebabf550","last-updated-ms":1746586823335,"last-column-id":2,"schemas":[{"type":"struct","fields":[{"id":1,"name":"id","type":"string","required":true},{"id":2,"name":"comment","type":"string","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"id"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"write.format.default":"parquet","write.parquet.compression-codec":"snappy"},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"statistics":[],"format-version":2,"last-sequence-number":0} \ No newline at end of file From 8efab2ce4ff886e40b440886f070198f31e09407 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Fri, 9 May 2025 11:48:05 +0900 Subject: [PATCH 3/4] Adds a refresh_table_metadata function --- .../integrations/datafusion/src/table/mod.rs | 51 +++++++++++++------ 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index c12772c4d..0313a7d5a 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -24,12 +24,13 @@ use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::catalog::Session; use datafusion::datasource::{TableProvider, TableType}; -use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::error::Result as DFResult; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; use iceberg::arrow::schema_to_arrow_schema; use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent}; +use tokio::sync::RwLock; use crate::physical_plan::scan::IcebergTableScan; @@ -38,7 +39,9 @@ use crate::physical_plan::scan::IcebergTableScan; #[derive(Debug, Clone)] pub struct IcebergTableProvider { /// A table in the catalog. - table: Table, + table: Arc>, + /// The identifier to the table in the catalog. + table_identifier: TableIdent, /// Table snapshot id that will be queried via this provider. snapshot_id: Option, /// A reference-counted arrow `Schema`. @@ -49,8 +52,10 @@ pub struct IcebergTableProvider { impl IcebergTableProvider { pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { + let table_identifier = table.identifier().clone(); IcebergTableProvider { - table, + table: Arc::new(RwLock::new(table)), + table_identifier, snapshot_id: None, schema, catalog: None, @@ -65,7 +70,8 @@ impl IcebergTableProvider { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { - table, + table: Arc::new(RwLock::new(table)), + table_identifier: table_name, snapshot_id: None, catalog: Some(client), schema, @@ -75,9 +81,11 @@ impl IcebergTableProvider { /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. pub async fn try_new_from_table(table: Table) -> Result { + let table_identifier = table.identifier().clone(); let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { - table, + table: Arc::new(RwLock::new(table)), + table_identifier, snapshot_id: None, catalog: None, schema, @@ -87,6 +95,7 @@ impl IcebergTableProvider { /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result { + let table_identifier = table.identifier().clone(); let snapshot = table .metadata() .snapshot_by_id(snapshot_id) @@ -102,12 +111,30 @@ impl IcebergTableProvider { let schema = snapshot.schema(table.metadata())?; let schema = Arc::new(schema_to_arrow_schema(&schema)?); Ok(IcebergTableProvider { - table, + table: Arc::new(RwLock::new(table)), + table_identifier, snapshot_id: Some(snapshot_id), catalog: None, schema, }) } + + /// Refreshes the table metadata to the latest snapshot. + /// + /// Requires that this TableProvider was created with a + /// reference to the catalog to load the updated table from. + pub async fn refresh_table_metadata(&self) -> Result { + let Some(catalog) = &self.catalog else { + return Err(Error::new(ErrorKind::Unexpected, format!("Table provider could not refresh table metadata because no catalog client was provided"))); + }; + + let updated_table = catalog.load_table(&self.table_identifier).await?; + + let mut table_guard = self.table.write().await; + *table_guard = updated_table.clone(); + + Ok(updated_table) + } } #[async_trait] @@ -132,15 +159,9 @@ impl TableProvider for IcebergTableProvider { _limit: Option, ) -> DFResult> { // Get the latest table metadata from the catalog if it exists - let table = if let Some(catalog) = &self.catalog { - catalog - .load_table(self.table.identifier()) - .await - .map_err(|e| { - DataFusionError::Execution(format!("Error getting Iceberg table metadata: {e}")) - })? - } else { - self.table.clone() + let table = match self.refresh_table_metadata().await.ok() { + Some(table) => table, + None => self.table.read().await.clone(), }; Ok(Arc::new(IcebergTableScan::new( table, From afeb0c1740a541cb9b9e4435a1487f277df1f04d Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Fri, 9 May 2025 15:59:54 +0900 Subject: [PATCH 4/4] Clippy --- crates/integrations/datafusion/src/table/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 0313a7d5a..57b3319a3 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -125,7 +125,7 @@ impl IcebergTableProvider { /// reference to the catalog to load the updated table from. pub async fn refresh_table_metadata(&self) -> Result
{ let Some(catalog) = &self.catalog else { - return Err(Error::new(ErrorKind::Unexpected, format!("Table provider could not refresh table metadata because no catalog client was provided"))); + return Err(Error::new(ErrorKind::Unexpected, "Table provider could not refresh table metadata because no catalog client was provided".to_string())); }; let updated_table = catalog.load_table(&self.table_identifier).await?;