Skip to content

Support retrieving the latest Iceberg table on table scan #1297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ impl MemoryCatalog {
warehouse_location,
}
}

/// Register an existing table in the memory catalog.
pub async fn register_existing_table(
&self,
table_ident: &TableIdent,
metadata_location: String,
) -> Result<()> {
let mut root_namespace_state = self.root_namespace_state.lock().await;

root_namespace_state.insert_new_table(table_ident, metadata_location.clone())?;

Ok(())
}
}

#[async_trait]
Expand Down
7 changes: 5 additions & 2 deletions crates/integrations/datafusion/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::catalog::SchemaProvider;
use datafusion::datasource::TableProvider;
use datafusion::error::Result as DFResult;
use futures::future::try_join_all;
use iceberg::{Catalog, NamespaceIdent, Result};
use iceberg::{Catalog, NamespaceIdent, Result, TableIdent};

use crate::table::IcebergTableProvider;

Expand Down Expand Up @@ -64,7 +64,10 @@ impl IcebergSchemaProvider {
let providers = try_join_all(
table_names
.iter()
.map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name))
.map(|name| {
let table_ident = TableIdent::new(namespace.clone(), name.clone());
IcebergTableProvider::try_new(client.clone(), table_ident)
})
.collect::<Vec<_>>(),
)
.await?;
Expand Down
61 changes: 47 additions & 14 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::table::Table;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent};
use tokio::sync::RwLock;

use crate::physical_plan::scan::IcebergTableScan;

Expand All @@ -38,55 +39,63 @@ use crate::physical_plan::scan::IcebergTableScan;
#[derive(Debug, Clone)]
pub struct IcebergTableProvider {
/// A table in the catalog.
table: Table,
table: Arc<RwLock<Table>>,
/// The identifier to the table in the catalog.
table_identifier: TableIdent,
/// Table snapshot id that will be queried via this provider.
snapshot_id: Option<i64>,
/// A reference-counted arrow `Schema`.
schema: ArrowSchemaRef,
/// A reference to the catalog that this table provider belongs to.
catalog: Option<Arc<dyn Catalog>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice, we need this for future transactions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it required? 👀

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it might make sense. I think the use-cases for a static table are fairly limited, and that could be a separate TableProvider. But it would be a breaking change for all users

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think breaking change should still be fine at this early stage

}

impl IcebergTableProvider {
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
let table_identifier = table.identifier().clone();
IcebergTableProvider {
table,
table: Arc::new(RwLock::new(table)),
table_identifier,
snapshot_id: None,
schema,
catalog: None,
}
}
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using the given client and table name to fetch an actual [`Table`]
/// in the provided namespace.
pub(crate) async fn try_new(
client: Arc<dyn Catalog>,
namespace: NamespaceIdent,
name: impl Into<String>,
) -> Result<Self> {
let ident = TableIdent::new(namespace, name.into());
let table = client.load_table(&ident).await?;
pub async fn try_new(client: Arc<dyn Catalog>, table_name: TableIdent) -> Result<Self> {
let table = client.load_table(&table_name).await?;

let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

Ok(IcebergTableProvider {
table,
table: Arc::new(RwLock::new(table)),
table_identifier: table_name,
snapshot_id: None,
catalog: Some(client),
schema,
})
}

/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
pub async fn try_new_from_table(table: Table) -> Result<Self> {
let table_identifier = table.identifier().clone();
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
Ok(IcebergTableProvider {
table,
table: Arc::new(RwLock::new(table)),
table_identifier,
snapshot_id: None,
catalog: None,
schema,
})
}

/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
let table_identifier = table.identifier().clone();
let snapshot = table
.metadata()
.snapshot_by_id(snapshot_id)
Expand All @@ -102,11 +111,30 @@ impl IcebergTableProvider {
let schema = snapshot.schema(table.metadata())?;
let schema = Arc::new(schema_to_arrow_schema(&schema)?);
Ok(IcebergTableProvider {
table,
table: Arc::new(RwLock::new(table)),
table_identifier,
snapshot_id: Some(snapshot_id),
catalog: None,
schema,
})
}

/// Refreshes the table metadata to the latest snapshot.
///
/// Requires that this TableProvider was created with a
/// reference to the catalog to load the updated table from.
pub async fn refresh_table_metadata(&self) -> Result<Table> {
let Some(catalog) = &self.catalog else {
return Err(Error::new(ErrorKind::Unexpected, "Table provider could not refresh table metadata because no catalog client was provided".to_string()));
};

let updated_table = catalog.load_table(&self.table_identifier).await?;

let mut table_guard = self.table.write().await;
*table_guard = updated_table.clone();

Ok(updated_table)
}
}

#[async_trait]
Expand All @@ -130,8 +158,13 @@ impl TableProvider for IcebergTableProvider {
filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
// Get the latest table metadata from the catalog if it exists
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can change to a refresh function similar to python + java implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean just extract this logic out into a refresh function that also updates self.table?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was thinking of having a refresh function inside the table struct that can refresh metadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed an update that uses a refresh function

let table = match self.refresh_table_metadata().await.ok() {
Some(table) => table,
None => self.table.read().await.clone(),
};
Ok(Arc::new(IcebergTableScan::new(
self.table.clone(),
table,
self.snapshot_id,
self.schema.clone(),
projection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use datafusion::execution::context::SessionContext;
use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use iceberg::io::FileIOBuilder;
use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type};
use iceberg::{Catalog, NamespaceIdent, Result, TableCreation};
use iceberg::{Catalog, NamespaceIdent, Result, TableCreation, TableIdent};
use iceberg_catalog_memory::MemoryCatalog;
use iceberg_datafusion::IcebergCatalogProvider;
use iceberg_datafusion::{IcebergCatalogProvider, IcebergTableProvider};
use tempfile::TempDir;

fn temp_path() -> String {
Expand Down Expand Up @@ -295,3 +295,38 @@ async fn test_table_predict_pushdown() -> Result<()> {
assert!(s.value(1).trim().contains(expected));
Ok(())
}

#[tokio::test]
async fn test_table_scan_snapshot() -> Result<()> {
let iceberg_catalog = get_iceberg_catalog();
let namespace = NamespaceIdent::new("test".to_string());
set_test_namespace(&iceberg_catalog, &namespace).await?;

let current_dir = std::env::current_dir().unwrap();
let metadata_path = current_dir.join("tests/test_data/scan_snapshot_update/test.db/test_table/metadata/00000-754ae971-c49f-4e40-9236-a50fd0884b5d.metadata.json");

let table_ident = TableIdent::new(namespace, "test_table".to_string());
iceberg_catalog
.register_existing_table(&table_ident, metadata_path.display().to_string())
.await?;

let client = Arc::new(iceberg_catalog);
let table = Arc::new(
IcebergTableProvider::try_new(Arc::clone(&client) as Arc<dyn Catalog>, table_ident.clone())
.await?,
);

let ctx = SessionContext::new();
ctx.register_table("df_test", table)
.expect("failed to register table");
let records = ctx
.sql("select * from df_test")
.await
.unwrap()
.collect()
.await
.unwrap();
assert_eq!(0, records.len());

Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"location":"./tests/test_data/scan_snapshot_update/test.db/test_table","table-uuid":"e94dbe67-55ae-40e6-b0ff-8b48ebabf550","last-updated-ms":1746586823335,"last-column-id":2,"schemas":[{"type":"struct","fields":[{"id":1,"name":"id","type":"string","required":true},{"id":2,"name":"comment","type":"string","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"id"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"write.format.default":"parquet","write.parquet.compression-codec":"snappy"},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"statistics":[],"format-version":2,"last-sequence-number":0}
Loading