From ce4b7878d005551da416d080f93f8f3009cdb299 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 7 Apr 2025 19:17:31 -0400 Subject: [PATCH] feat: Support tag and branch operations --- crates/iceberg/src/spec/snapshot.rs | 366 ++++++++++++++++++++------ crates/iceberg/src/transaction/mod.rs | 5 + 2 files changed, 296 insertions(+), 75 deletions(-) diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 922e7bab9..099592cb3 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -30,7 +30,8 @@ use super::table_metadata::SnapshotLog; use crate::error::{timestamp_ms_to_utc, Result}; use crate::io::FileIO; use crate::spec::{ManifestList, SchemaId, SchemaRef, StructType, TableMetadata}; -use crate::{Error, ErrorKind}; +use crate::transaction::Transaction; +use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; /// The ref name of the main branch of the table. pub const MAIN_BRANCH: &str = "main"; @@ -216,6 +217,295 @@ impl Snapshot { } } +/// `ManageSnapshots` is an API for managing snapshots. +#[allow(dead_code)] +pub struct ManageSnapshots<'a> { + transaction: &'a Transaction<'a>, + updates: Vec, + requirements: Vec, +} + +impl<'a> ManageSnapshots<'a> { + /// Creates new instance of `ManageSnapshots` + pub fn new(transaction: &'a Transaction) -> Self { + Self { + transaction, + updates: Vec::new(), + requirements: Vec::new(), + } + } + + /// Sets a snapshot reference and returns the update and requirement. + /// + /// # Arguments + /// + /// * `snapshot_id` - The snapshot ID that the reference will point to. + /// * `ref_name` - The name of the snapshot reference. + /// * `update_type` - The type of the reference ("tag" or "branch"). + /// * `max_ref_age_ms` - Optional maximum age (in milliseconds) for the reference. + /// * `max_snapshot_age_ms` - Optional maximum age (in milliseconds) for the snapshot. + /// * `min_snapshots_to_keep` - Optional minimum number of snapshots to keep. + /// + /// # Returns + /// + /// A `Result` containing a tuple, (`TableUpdate`, `TableRequirement`), + /// or an error if an invalid `update_type` is provided. + pub fn set_ref_snapshot( + &self, + snapshot_id: i64, + ref_name: String, + update_type: &str, + max_ref_age_ms: Option, + max_snapshot_age_ms: Option, + min_snapshots_to_keep: Option, + ) -> Result<(TableUpdate, TableRequirement)> { + let retention = match update_type { + "tag" => SnapshotRetention::Tag { max_ref_age_ms }, + "branch" => SnapshotRetention::Branch { + min_snapshots_to_keep, + max_snapshot_age_ms, + max_ref_age_ms, + }, + other => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Snapshot type, '{}', does not exist", other), + )) + } + }; + + let new_ref = SnapshotReference::new(snapshot_id, retention); + + let update = TableUpdate::SetSnapshotRef { + ref_name: ref_name.clone(), + reference: new_ref, + }; + + let current_snapshot_id = self + .transaction + .table() + .metadata() + .refs + .get(&ref_name) + .map(|r| r.snapshot_id); + let requirement = TableRequirement::RefSnapshotIdMatch { + r#ref: ref_name, + snapshot_id: current_snapshot_id, + }; + + Ok((update, requirement)) + } + + /// Removes a snapshot reference. + /// + /// # Arguments + /// + /// * `ref_name` - The name of the snapshot reference to remove. + /// + /// # Returns + /// + /// A mutable reference to `ManageSnapshots` for method chaining on success, + /// or an error if the reference does not exist. + pub fn remove_ref_snapshot(&mut self, ref_name: String) -> Result<&mut Self> { + let snapshot_id = match self.transaction.table().metadata().refs.get(&ref_name) { + Some(r) => r.snapshot_id, + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Reference '{}' does not exist", ref_name), + )); + } + }; + + let update = TableUpdate::RemoveSnapshotRef { + ref_name: ref_name.clone(), + }; + + let requirement = TableRequirement::RefSnapshotIdMatch { + r#ref: ref_name, + snapshot_id: Some(snapshot_id), + }; + + self.updates.push(update); + self.requirements.push(requirement); + Ok(self) + } + + /// Creates a new tag + /// + /// # Arguments + /// + /// * `snapshot_id` - The snapshot ID to tag. + /// * `tag_name` - The name of the tag. + /// * `max_ref_age_ms` - Optional maximum age (in milliseconds) for the tag reference + /// + /// # Returns + /// + /// A mutable reference to `ManageSnapshots` for method chaining, + /// or an error if the operation fails. + pub fn create_tag( + &mut self, + snapshot_id: i64, + tag_name: String, + max_ref_age_ms: Option, + ) -> Result<&mut Self> { + let (update, requirement) = self.set_ref_snapshot( + snapshot_id, + tag_name.clone(), + "tag", + max_ref_age_ms, + None, + None, + )?; + + self.updates.push(update); + self.requirements.push(requirement); + + Ok(self) + } + + /// Removes a tag. + /// + /// # Arguments + /// + /// * `tag_name` - The name of the tag to remove. + /// + /// # Returns + /// + /// A mutable reference to `ManageSnapshots` for method chaining, + /// or an error if the tag reference does not exist. + pub fn remove_tag(&mut self, tag_name: String) -> Result<&mut Self> { + self.remove_ref_snapshot(tag_name) + } + + /// Creates a new branch + /// + /// # Arguments + /// + /// * `snapshot_id` - The snapshot ID the branch will point to. + /// * `branch_name` - The name of the branch to create. + /// * `max_ref_age_ms` - Optional maximum age (in milliseconds) for the branch reference. + /// * `max_snapshot_age_ms` - Optional maximum age (in milliseconds) for the snapshots. + /// * `min_snapshots_to_keep` - Optional minimum number of snapshots to retain. + /// + /// # Returns + /// + /// A mutable reference to `ManageSnapshots` for method chaining, + /// or an error if the operation fails. + pub fn create_branch( + &mut self, + snapshot_id: i64, + branch_name: String, + max_ref_age_ms: Option, + max_snapshot_age_ms: Option, + min_snapshots_to_keep: Option, + ) -> Result<&mut Self> { + let (update, requirement) = self + .set_ref_snapshot( + snapshot_id, + branch_name.clone(), + "branch", + max_ref_age_ms, + max_snapshot_age_ms, + min_snapshots_to_keep, + ) + .unwrap(); + + self.updates.push(update); + self.requirements.push(requirement); + + Ok(self) + } + + /// Removes a branch. + /// + /// # Arguments + /// + /// * `branch_name` - The name of the branch to remove. + /// + /// # Returns + /// + /// A mutable reference to `ManageSnapshots` for method chaining, + /// or an error if the branch reference does not exist. + pub fn remove_branch(&mut self, branch_name: String) -> Result<&mut Self> { + self.remove_ref_snapshot(branch_name) + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "kebab-case")] +/// Iceberg tables keep track of branches and tags using snapshot references. +pub struct SnapshotReference { + /// A reference’s snapshot ID. The tagged snapshot or latest snapshot of a branch. + pub snapshot_id: i64, + #[serde(flatten)] + /// Snapshot retention policy + pub retention: SnapshotRetention, +} + +impl SnapshotReference { + /// Returns true if the snapshot reference is a branch. + pub fn is_branch(&self) -> bool { + matches!(self.retention, SnapshotRetention::Branch { .. }) + } +} + +impl SnapshotReference { + /// Create new snapshot reference + pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self { + SnapshotReference { + snapshot_id, + retention, + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "lowercase", tag = "type")] +/// The snapshot expiration procedure removes snapshots from table metadata and applies the table’s retention policy. +pub enum SnapshotRetention { + #[serde(rename_all = "kebab-case")] + /// Branches are mutable named references that can be updated by committing a new snapshot as + /// the branch’s referenced snapshot using the Commit Conflict Resolution and Retry procedures. + Branch { + /// A positive number for the minimum number of snapshots to keep in a branch while expiring snapshots. + /// Defaults to table property history.expire.min-snapshots-to-keep. + #[serde(skip_serializing_if = "Option::is_none")] + min_snapshots_to_keep: Option, + /// A positive number for the max age of snapshots to keep when expiring, including the latest snapshot. + /// Defaults to table property history.expire.max-snapshot-age-ms. + #[serde(skip_serializing_if = "Option::is_none")] + max_snapshot_age_ms: Option, + /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots. + /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires. + #[serde(skip_serializing_if = "Option::is_none")] + max_ref_age_ms: Option, + }, + #[serde(rename_all = "kebab-case")] + /// Tags are labels for individual snapshots. + Tag { + /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots. + /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires. + #[serde(skip_serializing_if = "Option::is_none")] + max_ref_age_ms: Option, + }, +} + +impl SnapshotRetention { + /// Create a new branch retention policy + pub fn branch( + min_snapshots_to_keep: Option, + max_snapshot_age_ms: Option, + max_ref_age_ms: Option, + ) -> Self { + SnapshotRetention::Branch { + min_snapshots_to_keep, + max_snapshot_age_ms, + max_ref_age_ms, + } + } +} + pub(super) mod _serde { /// This is a helper module that defines types to help with serialization/deserialization. /// For deserialization the input first gets read into either the [SnapshotV1] or [SnapshotV2] struct @@ -328,80 +618,6 @@ pub(super) mod _serde { } } -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] -#[serde(rename_all = "kebab-case")] -/// Iceberg tables keep track of branches and tags using snapshot references. -pub struct SnapshotReference { - /// A reference’s snapshot ID. The tagged snapshot or latest snapshot of a branch. - pub snapshot_id: i64, - #[serde(flatten)] - /// Snapshot retention policy - pub retention: SnapshotRetention, -} - -impl SnapshotReference { - /// Returns true if the snapshot reference is a branch. - pub fn is_branch(&self) -> bool { - matches!(self.retention, SnapshotRetention::Branch { .. }) - } -} - -impl SnapshotReference { - /// Create new snapshot reference - pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self { - SnapshotReference { - snapshot_id, - retention, - } - } -} - -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] -#[serde(rename_all = "lowercase", tag = "type")] -/// The snapshot expiration procedure removes snapshots from table metadata and applies the table’s retention policy. -pub enum SnapshotRetention { - #[serde(rename_all = "kebab-case")] - /// Branches are mutable named references that can be updated by committing a new snapshot as - /// the branch’s referenced snapshot using the Commit Conflict Resolution and Retry procedures. - Branch { - /// A positive number for the minimum number of snapshots to keep in a branch while expiring snapshots. - /// Defaults to table property history.expire.min-snapshots-to-keep. - #[serde(skip_serializing_if = "Option::is_none")] - min_snapshots_to_keep: Option, - /// A positive number for the max age of snapshots to keep when expiring, including the latest snapshot. - /// Defaults to table property history.expire.max-snapshot-age-ms. - #[serde(skip_serializing_if = "Option::is_none")] - max_snapshot_age_ms: Option, - /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots. - /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires. - #[serde(skip_serializing_if = "Option::is_none")] - max_ref_age_ms: Option, - }, - #[serde(rename_all = "kebab-case")] - /// Tags are labels for individual snapshots. - Tag { - /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots. - /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires. - #[serde(skip_serializing_if = "Option::is_none")] - max_ref_age_ms: Option, - }, -} - -impl SnapshotRetention { - /// Create a new branch retention policy - pub fn branch( - min_snapshots_to_keep: Option, - max_snapshot_age_ms: Option, - max_ref_age_ms: Option, - ) -> Self { - SnapshotRetention::Branch { - min_snapshots_to_keep, - max_snapshot_age_ms, - max_ref_age_ms, - } - } -} - #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index d3c7bc3f9..3c10c2cec 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -52,6 +52,11 @@ impl<'a> Transaction<'a> { } } + /// Returns `Table` reference from `Transaction` + pub fn table(&self) -> &Table { + self.table + } + fn append_updates(&mut self, updates: Vec) -> Result<()> { for update in &updates { for up in &self.updates {