diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index cf4ad7216..65aaf9d4d 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -53,6 +53,19 @@ impl MemoryCatalog { warehouse_location, } } + + /// Register an existing table in the memory catalog. + pub async fn register_existing_table( + &self, + table_ident: &TableIdent, + metadata_location: String, + ) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.insert_new_table(table_ident, metadata_location.clone())?; + + Ok(()) + } } #[async_trait] diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 3be6da426..452801797 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -24,7 +24,7 @@ use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; use datafusion::error::Result as DFResult; use futures::future::try_join_all; -use iceberg::{Catalog, NamespaceIdent, Result}; +use iceberg::{Catalog, NamespaceIdent, Result, TableIdent}; use crate::table::IcebergTableProvider; @@ -64,7 +64,10 @@ impl IcebergSchemaProvider { let providers = try_join_all( table_names .iter() - .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) + .map(|name| { + let table_ident = TableIdent::new(namespace.clone(), name.clone()); + IcebergTableProvider::try_new(client.clone(), table_ident) + }) .collect::>(), ) .await?; diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 00c9e1322..57b3319a3 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -29,7 +29,8 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; use iceberg::arrow::schema_to_arrow_schema; use iceberg::table::Table; -use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent}; +use tokio::sync::RwLock; use crate::physical_plan::scan::IcebergTableScan; @@ -38,37 +39,41 @@ use crate::physical_plan::scan::IcebergTableScan; #[derive(Debug, Clone)] pub struct IcebergTableProvider { /// A table in the catalog. - table: Table, + table: Arc>, + /// The identifier to the table in the catalog. + table_identifier: TableIdent, /// Table snapshot id that will be queried via this provider. snapshot_id: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, + /// A reference to the catalog that this table provider belongs to. + catalog: Option>, } impl IcebergTableProvider { pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { + let table_identifier = table.identifier().clone(); IcebergTableProvider { - table, + table: Arc::new(RwLock::new(table)), + table_identifier, snapshot_id: None, schema, + catalog: None, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using the given client and table name to fetch an actual [`Table`] /// in the provided namespace. - pub(crate) async fn try_new( - client: Arc, - namespace: NamespaceIdent, - name: impl Into, - ) -> Result { - let ident = TableIdent::new(namespace, name.into()); - let table = client.load_table(&ident).await?; + pub async fn try_new(client: Arc, table_name: TableIdent) -> Result { + let table = client.load_table(&table_name).await?; let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { - table, + table: Arc::new(RwLock::new(table)), + table_identifier: table_name, snapshot_id: None, + catalog: Some(client), schema, }) } @@ -76,10 +81,13 @@ impl IcebergTableProvider { /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. pub async fn try_new_from_table(table: Table) -> Result { + let table_identifier = table.identifier().clone(); let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { - table, + table: Arc::new(RwLock::new(table)), + table_identifier, snapshot_id: None, + catalog: None, schema, }) } @@ -87,6 +95,7 @@ impl IcebergTableProvider { /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result { + let table_identifier = table.identifier().clone(); let snapshot = table .metadata() .snapshot_by_id(snapshot_id) @@ -102,11 +111,30 @@ impl IcebergTableProvider { let schema = snapshot.schema(table.metadata())?; let schema = Arc::new(schema_to_arrow_schema(&schema)?); Ok(IcebergTableProvider { - table, + table: Arc::new(RwLock::new(table)), + table_identifier, snapshot_id: Some(snapshot_id), + catalog: None, schema, }) } + + /// Refreshes the table metadata to the latest snapshot. + /// + /// Requires that this TableProvider was created with a + /// reference to the catalog to load the updated table from. + pub async fn refresh_table_metadata(&self) -> Result { + let Some(catalog) = &self.catalog else { + return Err(Error::new(ErrorKind::Unexpected, "Table provider could not refresh table metadata because no catalog client was provided".to_string())); + }; + + let updated_table = catalog.load_table(&self.table_identifier).await?; + + let mut table_guard = self.table.write().await; + *table_guard = updated_table.clone(); + + Ok(updated_table) + } } #[async_trait] @@ -130,8 +158,13 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { + // Get the latest table metadata from the catalog if it exists + let table = match self.refresh_table_metadata().await.ok() { + Some(table) => table, + None => self.table.read().await.clone(), + }; Ok(Arc::new(IcebergTableScan::new( - self.table.clone(), + table, self.snapshot_id, self.schema.clone(), projection, diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 715635e06..4aa105ca1 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -27,9 +27,9 @@ use datafusion::execution::context::SessionContext; use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; use iceberg::io::FileIOBuilder; use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type}; -use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; +use iceberg::{Catalog, NamespaceIdent, Result, TableCreation, TableIdent}; use iceberg_catalog_memory::MemoryCatalog; -use iceberg_datafusion::IcebergCatalogProvider; +use iceberg_datafusion::{IcebergCatalogProvider, IcebergTableProvider}; use tempfile::TempDir; fn temp_path() -> String { @@ -295,3 +295,38 @@ async fn test_table_predict_pushdown() -> Result<()> { assert!(s.value(1).trim().contains(expected)); Ok(()) } + +#[tokio::test] +async fn test_table_scan_snapshot() -> Result<()> { + let iceberg_catalog = get_iceberg_catalog(); + let namespace = NamespaceIdent::new("test".to_string()); + set_test_namespace(&iceberg_catalog, &namespace).await?; + + let current_dir = std::env::current_dir().unwrap(); + let metadata_path = current_dir.join("tests/test_data/scan_snapshot_update/test.db/test_table/metadata/00000-754ae971-c49f-4e40-9236-a50fd0884b5d.metadata.json"); + + let table_ident = TableIdent::new(namespace, "test_table".to_string()); + iceberg_catalog + .register_existing_table(&table_ident, metadata_path.display().to_string()) + .await?; + + let client = Arc::new(iceberg_catalog); + let table = Arc::new( + IcebergTableProvider::try_new(Arc::clone(&client) as Arc, table_ident.clone()) + .await?, + ); + + let ctx = SessionContext::new(); + ctx.register_table("df_test", table) + .expect("failed to register table"); + let records = ctx + .sql("select * from df_test") + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(0, records.len()); + + Ok(()) +} diff --git a/crates/integrations/datafusion/tests/test_data/scan_snapshot_update/test.db/test_table/metadata/00000-754ae971-c49f-4e40-9236-a50fd0884b5d.metadata.json b/crates/integrations/datafusion/tests/test_data/scan_snapshot_update/test.db/test_table/metadata/00000-754ae971-c49f-4e40-9236-a50fd0884b5d.metadata.json new file mode 100644 index 000000000..5e66dfd65 --- /dev/null +++ b/crates/integrations/datafusion/tests/test_data/scan_snapshot_update/test.db/test_table/metadata/00000-754ae971-c49f-4e40-9236-a50fd0884b5d.metadata.json @@ -0,0 +1 @@ +{"location":"./tests/test_data/scan_snapshot_update/test.db/test_table","table-uuid":"e94dbe67-55ae-40e6-b0ff-8b48ebabf550","last-updated-ms":1746586823335,"last-column-id":2,"schemas":[{"type":"struct","fields":[{"id":1,"name":"id","type":"string","required":true},{"id":2,"name":"comment","type":"string","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"id"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"write.format.default":"parquet","write.parquet.compression-codec":"snappy"},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"statistics":[],"format-version":2,"last-sequence-number":0} \ No newline at end of file