From 9a0423d1e36fd707baa4a1e314d62f366d16bd0d Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 28 Apr 2024 11:06:42 -0400 Subject: [PATCH 01/19] add metadata_entries table with tests --- pyiceberg/table/__init__.py | 30 +++++++++++++++++++++++++ pyiceberg/table/metadata.py | 8 +++++++ tests/integration/test_inspect_table.py | 20 +++++++++++++++++ 3 files changed, 58 insertions(+) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 13186c42cc..39650eaafb 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3537,6 +3537,36 @@ def update_partitions_map( schema=table_schema, ) + def metadata_log_entries(self) -> "pa.Table": + import pyarrow as pa + + from pyiceberg.table.snapshots import MetadataLogEntry + + table_schema = pa.schema([ + ("timestamp", pa.timestamp(unit='ms'), True), + ("file", pa.string(), True), + ("latest_snapshot_id", pa.int64(), False), + ("latest_schema_id", pa.int32(), False), + ("latest_sequence_number", pa.int64(), False), + ]) + + def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any]: + latest_snapshot = self.tbl.metadata._snapshot_as_of_timestamp_ms(metadata_entry.timestamp_ms) + return { + "timestamp": metadata_entry.timestamp_ms, + "file": metadata_entry.metadata_file, + "latest_snapshot_id": latest_snapshot.snapshot_id if latest_snapshot else None, + "latest_schema_id": latest_snapshot.schema_id if latest_snapshot else None, + "latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None, + } + + metadata_log_entries = self.tbl.metadata.metadata_log + [MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms)] + + return pa.Table.from_pylist( + [metadata_log_entry_to_row(entry) for entry in metadata_log_entries], + schema=table_schema, + ) + @dataclass(frozen=True) class TablePartition: diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index ba0c885758..2746a101d7 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -292,6 +292,13 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]: return self.snapshot_by_id(ref.snapshot_id) return None + def _snapshot_as_of_timestamp_ms(self, timestamp_ms: int) -> Optional[Snapshot]: + """ Return the snapshot that was current at the given timestamp or null if no such snapshot exists.""" + for entry in reversed(self.snapshot_log): + if entry.timestamp_ms <= timestamp_ms: + return self.snapshot_by_id(entry.snapshot_id) + return None + def current_snapshot(self) -> Optional[Snapshot]: """Get the current snapshot for this table, or None if there is no current snapshot.""" if self.current_snapshot_id is not None: @@ -520,6 +527,7 @@ def new_table_metadata( if table_uuid is None: table_uuid = uuid.uuid4() + # need to update metadata_log here # Remove format-version so it does not get persisted format_version = int(properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) if format_version == 1: diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index a884f9d4c0..9a41770f61 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -445,3 +445,23 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id) spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}") check_pyiceberg_df_equals_spark_df(df, spark_df) + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_metadata_log_entries( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.table_metadata_log_entries" + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + # Write some data + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + + df = tbl.inspect.metadata_log_entries() + spark_df = spark.sql(f"SELECT * FROM {identifier}.metadata_log_entries") + lhs = df.to_pandas() + rhs = spark_df.toPandas() + breakpoint() + assert lhs.equals(rhs), lhs.compare(rhs) From ecec57e1b823a29a76f1c15477cc6958e70105ee Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 28 Apr 2024 12:48:35 -0400 Subject: [PATCH 02/19] make test work --- pyiceberg/table/snapshots.py | 3 ++- tests/integration/test_inspect_table.py | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index f74ac4b7d4..76686865f1 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -226,7 +226,8 @@ def __eq__(self, other: Any) -> bool: class Snapshot(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None) - sequence_number: Optional[int] = Field(alias="sequence-number", default=None) + # cannot import `INITIAL_SEQUENCE_NUMBER` due to circular import + sequence_number: Optional[int] = Field(alias="sequence-number", default=0) timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000)) manifest_list: Optional[str] = Field( alias="manifest-list", description="Location of the snapshot's manifest list file", default=None diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 9a41770f61..8a5bac4e4e 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -451,6 +451,8 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non def test_inspect_metadata_log_entries( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: + from pandas.testing import assert_frame_equal + identifier = "default.table_metadata_log_entries" tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) @@ -463,5 +465,15 @@ def test_inspect_metadata_log_entries( spark_df = spark.sql(f"SELECT * FROM {identifier}.metadata_log_entries") lhs = df.to_pandas() rhs = spark_df.toPandas() - breakpoint() - assert lhs.equals(rhs), lhs.compare(rhs) + + # Timestamp in the last row of `metadata_log_entries` table is the based on when the table was read + # Therefore, the timestamp for pyiceberg dataframe and spark dataframe will be different + left_before_last, left_last = lhs[:-1], lhs[-1:] + right_before_last, right_last = rhs[:-1], rhs[-1:] + + assert_frame_equal(left_before_last, right_before_last, check_dtype=False) + for column in df.column_names: + for left, right in zip(left_last[column], right_last[column]): + if column == 'timestamp': + continue + assert left == right, f"Difference in column {column}: {left} != {right}" From 9e506c23408d9e3bc96c9e744a1454d0efe86c9b Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 28 Apr 2024 13:00:13 -0400 Subject: [PATCH 03/19] remove comment --- pyiceberg/table/metadata.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 2746a101d7..bbd141336b 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -527,7 +527,6 @@ def new_table_metadata( if table_uuid is None: table_uuid = uuid.uuid4() - # need to update metadata_log here # Remove format-version so it does not get persisted format_version = int(properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) if format_version == 1: From 9c77d57b4553b8ac6ea918dff9e8742a7e1aa1ea Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 28 Apr 2024 13:11:30 -0400 Subject: [PATCH 04/19] add doc --- mkdocs/docs/api.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 0bc23fb0dc..b38ac654e3 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -606,6 +606,29 @@ min_snapshots_to_keep: [[null,10]] max_snapshot_age_in_ms: [[null,604800000]] ``` +### Metadata Log Entries + +To show table metadata log entries: + +```python +table.inspect.metadata_log_entries() +``` + +``` +pyarrow.Table +timestamp: timestamp[ms] +file: string +latest_snapshot_id: int64 not null +latest_schema_id: int32 not null +latest_sequence_number: int64 not null +---- +timestamp: [[2024-04-28 17:03:00.214,2024-04-28 17:03:00.352,2024-04-28 17:03:00.445,2024-04-28 17:03:00.498]] +file: [["s3://warehouse/default/table_metadata_log_entries/metadata/00000-0b3b643b-0f3a-4787-83ad-601ba57b7319.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00001-f74e4b2c-0f89-4f55-822d-23d099fd7d54.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00002-97e31507-e4d9-4438-aff1-3c0c5304d271.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00003-6c8b7033-6ad8-4fe4-b64d-d70381aeaddc.metadata.json"]] +latest_snapshot_id: [[null,3958871664825505738,1289234307021405706,7640277914614648349]] +latest_schema_id: [[null,0,0,0]] +latest_sequence_number: [[null,0,0,0]] +``` + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. From b26f08f101efbfbecbec30677bb8e22d29c3b37d Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 28 Apr 2024 13:11:54 -0400 Subject: [PATCH 05/19] make lint --- pyiceberg/table/__init__.py | 4 +++- pyiceberg/table/metadata.py | 2 +- tests/integration/test_inspect_table.py | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 39650eaafb..5367fcbad4 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3560,7 +3560,9 @@ def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any "latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None, } - metadata_log_entries = self.tbl.metadata.metadata_log + [MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms)] + metadata_log_entries = self.tbl.metadata.metadata_log + [ + MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms) + ] return pa.Table.from_pylist( [metadata_log_entry_to_row(entry) for entry in metadata_log_entries], diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index bbd141336b..f285b6a79c 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -293,7 +293,7 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]: return None def _snapshot_as_of_timestamp_ms(self, timestamp_ms: int) -> Optional[Snapshot]: - """ Return the snapshot that was current at the given timestamp or null if no such snapshot exists.""" + """Return the snapshot that was current at the given timestamp or null if no such snapshot exists.""" for entry in reversed(self.snapshot_log): if entry.timestamp_ms <= timestamp_ms: return self.snapshot_by_id(entry.snapshot_id) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 8a5bac4e4e..2df3336a9f 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -446,6 +446,7 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}") check_pyiceberg_df_equals_spark_df(df, spark_df) + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_inspect_metadata_log_entries( From 58b0609da33129ae29116d73449226f9f63ce93e Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 28 Apr 2024 13:17:05 -0400 Subject: [PATCH 06/19] comment --- pyiceberg/table/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5367fcbad4..ba71a7bf97 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3560,6 +3560,7 @@ def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any "latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None, } + # imitates `addPreviousFile` from Java, might could move this to `metadata_log` constructor metadata_log_entries = self.tbl.metadata.metadata_log + [ MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms) ] From f7dd165487dce72137cb89ef143077078c837296 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 28 Apr 2024 13:26:28 -0400 Subject: [PATCH 07/19] comment --- tests/integration/test_inspect_table.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 2df3336a9f..76dae251a0 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -467,12 +467,13 @@ def test_inspect_metadata_log_entries( lhs = df.to_pandas() rhs = spark_df.toPandas() - # Timestamp in the last row of `metadata_log_entries` table is the based on when the table was read - # Therefore, the timestamp for pyiceberg dataframe and spark dataframe will be different + # Timestamp in the last row of `metadata_log_entries` table is based on when the table was read + # Therefore, the timestamp of the last row for pyiceberg dataframe and spark dataframe will be different left_before_last, left_last = lhs[:-1], lhs[-1:] right_before_last, right_last = rhs[:-1], rhs[-1:] assert_frame_equal(left_before_last, right_before_last, check_dtype=False) + # compare the last row, except for the timestamp for column in df.column_names: for left, right in zip(left_last[column], right_last[column]): if column == 'timestamp': From 4655c97034aef2ccb26ba836143025f1bdb8cbbb Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 29 Apr 2024 22:18:32 -0400 Subject: [PATCH 08/19] use pa.field and set nullable properly --- pyiceberg/table/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ba71a7bf97..5926618d72 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3543,11 +3543,11 @@ def metadata_log_entries(self) -> "pa.Table": from pyiceberg.table.snapshots import MetadataLogEntry table_schema = pa.schema([ - ("timestamp", pa.timestamp(unit='ms'), True), - ("file", pa.string(), True), - ("latest_snapshot_id", pa.int64(), False), - ("latest_schema_id", pa.int32(), False), - ("latest_sequence_number", pa.int64(), False), + pa.field("timestamp", pa.timestamp(unit='ms'), nullable=False), + pa.field("file", pa.string(), nullable=False), + pa.field("latest_snapshot_id", pa.int64(), nullable=True), + pa.field("latest_schema_id", pa.int32(), nullable=True), + pa.field("latest_sequence_number", pa.int64(), nullable=True), ]) def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any]: From 8613c2e503934fa0b59e34de15d22581c9460234 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 18 Jun 2024 18:45:44 -0700 Subject: [PATCH 09/19] use table snapshot_as_of_timestamp instead --- pyiceberg/table/__init__.py | 2 +- pyiceberg/table/metadata.py | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8d88c7b905..5243c7c44c 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3841,7 +3841,7 @@ def metadata_log_entries(self) -> "pa.Table": ]) def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any]: - latest_snapshot = self.tbl.metadata._snapshot_as_of_timestamp_ms(metadata_entry.timestamp_ms) + latest_snapshot = self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms) return { "timestamp": metadata_entry.timestamp_ms, "file": metadata_entry.metadata_file, diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 8f92c2cd3b..8c3c389318 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -292,13 +292,6 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]: return self.snapshot_by_id(ref.snapshot_id) return None - def _snapshot_as_of_timestamp_ms(self, timestamp_ms: int) -> Optional[Snapshot]: - """Return the snapshot that was current at the given timestamp or null if no such snapshot exists.""" - for entry in reversed(self.snapshot_log): - if entry.timestamp_ms <= timestamp_ms: - return self.snapshot_by_id(entry.snapshot_id) - return None - def current_snapshot(self) -> Optional[Snapshot]: """Get the current snapshot for this table, or None if there is no current snapshot.""" if self.current_snapshot_id is not None: From f45f2ea85ee79684e9af08fa012149ac08a2a606 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 18 Jun 2024 22:35:25 -0700 Subject: [PATCH 10/19] default sequence_number to 0 --- pyiceberg/table/snapshots.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 867013f3d2..842d42522a 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -58,6 +58,8 @@ CHANGED_PARTITION_PREFIX = "partitions." OPERATION = "operation" +INITIAL_SEQUENCE_NUMBER = 0 + class Operation(Enum): """Describes the operation. @@ -231,8 +233,7 @@ def __eq__(self, other: Any) -> bool: class Snapshot(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None) - # cannot import `INITIAL_SEQUENCE_NUMBER` due to circular import - sequence_number: Optional[int] = Field(alias="sequence-number", default=0) + sequence_number: Optional[int] = Field(alias="sequence-number", default=INITIAL_SEQUENCE_NUMBER) timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000)) manifest_list: Optional[str] = Field( alias="manifest-list", description="Location of the snapshot's manifest list file", default=None From 2d417da83e9c75a56549472581d1b31b61a2bd43 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 18 Jun 2024 22:35:36 -0700 Subject: [PATCH 11/19] improve test --- tests/integration/test_inspect_table.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 6b16930a83..2840fb0b16 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -538,6 +538,11 @@ def test_inspect_metadata_log_entries( from pandas.testing import assert_frame_equal identifier = "default.table_metadata_log_entries" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) # Write some data @@ -555,6 +560,7 @@ def test_inspect_metadata_log_entries( left_before_last, left_last = lhs[:-1], lhs[-1:] right_before_last, right_last = rhs[:-1], rhs[-1:] + # compare all rows except for the last row assert_frame_equal(left_before_last, right_before_last, check_dtype=False) # compare the last row, except for the timestamp for column in df.column_names: From 502e5d89c7e4813bf416241f8f6fd228da1e9201 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 18 Jun 2024 22:40:03 -0700 Subject: [PATCH 12/19] improve docs --- mkdocs/docs/api.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 139a508b2f..84604118c7 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -666,11 +666,11 @@ table.inspect.metadata_log_entries() ``` pyarrow.Table -timestamp: timestamp[ms] -file: string -latest_snapshot_id: int64 not null -latest_schema_id: int32 not null -latest_sequence_number: int64 not null +timestamp: timestamp[ms] not null +file: string not null +latest_snapshot_id: int64 +latest_schema_id: int32 +latest_sequence_number: int64 ---- timestamp: [[2024-04-28 17:03:00.214,2024-04-28 17:03:00.352,2024-04-28 17:03:00.445,2024-04-28 17:03:00.498]] file: [["s3://warehouse/default/table_metadata_log_entries/metadata/00000-0b3b643b-0f3a-4787-83ad-601ba57b7319.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00001-f74e4b2c-0f89-4f55-822d-23d099fd7d54.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00002-97e31507-e4d9-4438-aff1-3c0c5304d271.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00003-6c8b7033-6ad8-4fe4-b64d-d70381aeaddc.metadata.json"]] From 1cd5f932d16a586552a46b83ca31e75ca7a6eaad Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 18 Jun 2024 22:44:53 -0700 Subject: [PATCH 13/19] add comment --- pyiceberg/table/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5243c7c44c..0c467b9f35 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3850,7 +3850,8 @@ def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any "latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None, } - # imitates `addPreviousFile` from Java, might could move this to `metadata_log` constructor + # imitates `addPreviousFile` from Java + # https://github.com/apache/iceberg/blob/8248663a2a1ffddd2664ea37b45882455466f71c/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1450-L1451 metadata_log_entries = self.tbl.metadata.metadata_log + [ MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms) ] From 3fe675dae9640d2d2e43055bfdc37bdab9dd3cbe Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 18 Jun 2024 23:16:05 -0700 Subject: [PATCH 14/19] fix tests with string output for sequence-number --- tests/table/test_metadata.py | 2 +- tests/table/test_snapshots.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index 0e2b91f24b..c22b1253fe 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -168,7 +168,7 @@ def test_updating_metadata(example_table_metadata_v2: Dict[str, Any]) -> None: def test_serialize_v1(example_table_metadata_v1: Dict[str, Any]) -> None: table_metadata = TableMetadataV1(**example_table_metadata_v1) table_metadata_json = table_metadata.model_dump_json() - expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}""" + expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"sequence-number":0,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}""" assert table_metadata_json == expected diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 2569a11dc2..bf353a19f1 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -83,7 +83,7 @@ def test_serialize_snapshot_without_sequence_number() -> None: schema_id=3, ) actual = snapshot.model_dump_json() - expected = """{"snapshot-id":25,"parent-snapshot-id":19,"timestamp-ms":1602638573590,"manifest-list":"s3:/a/b/c.avro","summary":{"operation":"append"},"schema-id":3}""" + expected = """{"snapshot-id":25,"parent-snapshot-id":19,"sequence-number":0,"timestamp-ms":1602638573590,"manifest-list":"s3:/a/b/c.avro","summary":{"operation":"append"},"schema-id":3}""" assert actual == expected From 6ce55f46e94a028832446c74def4119be3d97d07 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 24 Jun 2024 10:33:55 -0700 Subject: [PATCH 15/19] comment --- pyiceberg/table/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0c467b9f35..07c709fd45 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3850,8 +3850,8 @@ def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any "latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None, } - # imitates `addPreviousFile` from Java - # https://github.com/apache/iceberg/blob/8248663a2a1ffddd2664ea37b45882455466f71c/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1450-L1451 + # similar to MetadataLogEntriesTable in Java + # https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66 metadata_log_entries = self.tbl.metadata.metadata_log + [ MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms) ] From aa068ff4c12865fc7e9b0f324965be9a537d236a Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 24 Jun 2024 10:45:10 -0700 Subject: [PATCH 16/19] revert INITIAL_SEQUENCE_NUMBER changes --- pyiceberg/table/snapshots.py | 4 +--- tests/table/test_metadata.py | 2 +- tests/table/test_snapshots.py | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 842d42522a..d6a3ff1654 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -58,8 +58,6 @@ CHANGED_PARTITION_PREFIX = "partitions." OPERATION = "operation" -INITIAL_SEQUENCE_NUMBER = 0 - class Operation(Enum): """Describes the operation. @@ -233,7 +231,7 @@ def __eq__(self, other: Any) -> bool: class Snapshot(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None) - sequence_number: Optional[int] = Field(alias="sequence-number", default=INITIAL_SEQUENCE_NUMBER) + sequence_number: Optional[int] = Field(alias="sequence-number", default=None) timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000)) manifest_list: Optional[str] = Field( alias="manifest-list", description="Location of the snapshot's manifest list file", default=None diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index c22b1253fe..0e2b91f24b 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -168,7 +168,7 @@ def test_updating_metadata(example_table_metadata_v2: Dict[str, Any]) -> None: def test_serialize_v1(example_table_metadata_v1: Dict[str, Any]) -> None: table_metadata = TableMetadataV1(**example_table_metadata_v1) table_metadata_json = table_metadata.model_dump_json() - expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"sequence-number":0,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}""" + expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}""" assert table_metadata_json == expected diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index bf353a19f1..2569a11dc2 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -83,7 +83,7 @@ def test_serialize_snapshot_without_sequence_number() -> None: schema_id=3, ) actual = snapshot.model_dump_json() - expected = """{"snapshot-id":25,"parent-snapshot-id":19,"sequence-number":0,"timestamp-ms":1602638573590,"manifest-list":"s3:/a/b/c.avro","summary":{"operation":"append"},"schema-id":3}""" + expected = """{"snapshot-id":25,"parent-snapshot-id":19,"timestamp-ms":1602638573590,"manifest-list":"s3:/a/b/c.avro","summary":{"operation":"append"},"schema-id":3}""" assert actual == expected From ab74a3e068ddccf8d226003b4051c6001bcc4861 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 24 Jun 2024 11:30:13 -0700 Subject: [PATCH 17/19] exclude sequence_number for v1 --- pyiceberg/table/metadata.py | 10 ++++++++++ pyiceberg/table/snapshots.py | 4 +++- tests/table/test_snapshots.py | 4 ++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 8c3c389318..876284e472 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -311,6 +311,16 @@ def serialize_current_snapshot_id(self, current_snapshot_id: Optional[int]) -> O return -1 return current_snapshot_id + @field_serializer("snapshots") + def serialize_snapshots(self, snapshots: List[Snapshot]) -> List[Snapshot]: + # Snapshot field `sequence-number` should not be written for v1 metadata + def exclude_sequence_number(snapshot: Snapshot) -> Snapshot: + return snapshot.model_copy(update={"sequence_number": None}) + + if self.format_version == 1: + return [exclude_sequence_number(snapshot) for snapshot in snapshots] + return snapshots + def _generate_snapshot_id() -> int: """Generate a new Snapshot ID from a UUID. diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index d6a3ff1654..842d42522a 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -58,6 +58,8 @@ CHANGED_PARTITION_PREFIX = "partitions." OPERATION = "operation" +INITIAL_SEQUENCE_NUMBER = 0 + class Operation(Enum): """Describes the operation. @@ -231,7 +233,7 @@ def __eq__(self, other: Any) -> bool: class Snapshot(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None) - sequence_number: Optional[int] = Field(alias="sequence-number", default=None) + sequence_number: Optional[int] = Field(alias="sequence-number", default=INITIAL_SEQUENCE_NUMBER) timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000)) manifest_list: Optional[str] = Field( alias="manifest-list", description="Location of the snapshot's manifest list file", default=None diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 2569a11dc2..616ed6ded8 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -73,7 +73,7 @@ def test_serialize_snapshot(snapshot: Snapshot) -> None: ) -def test_serialize_snapshot_without_sequence_number() -> None: +def test_serialize_snapshot_with_default_sequence_number() -> None: snapshot = Snapshot( snapshot_id=25, parent_snapshot_id=19, @@ -83,7 +83,7 @@ def test_serialize_snapshot_without_sequence_number() -> None: schema_id=3, ) actual = snapshot.model_dump_json() - expected = """{"snapshot-id":25,"parent-snapshot-id":19,"timestamp-ms":1602638573590,"manifest-list":"s3:/a/b/c.avro","summary":{"operation":"append"},"schema-id":3}""" + expected = """{"snapshot-id":25,"parent-snapshot-id":19,"sequence-number":0,"timestamp-ms":1602638573590,"manifest-list":"s3:/a/b/c.avro","summary":{"operation":"append"},"schema-id":3}""" assert actual == expected From f274ef983c4f56fdfd8938353878ec019dcf11ed Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 24 Jun 2024 11:35:19 -0700 Subject: [PATCH 18/19] inline function --- pyiceberg/table/metadata.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 876284e472..1fea33010c 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -314,11 +314,8 @@ def serialize_current_snapshot_id(self, current_snapshot_id: Optional[int]) -> O @field_serializer("snapshots") def serialize_snapshots(self, snapshots: List[Snapshot]) -> List[Snapshot]: # Snapshot field `sequence-number` should not be written for v1 metadata - def exclude_sequence_number(snapshot: Snapshot) -> Snapshot: - return snapshot.model_copy(update={"sequence_number": None}) - if self.format_version == 1: - return [exclude_sequence_number(snapshot) for snapshot in snapshots] + return [snapshot.model_copy(update={"sequence_number": None}) for snapshot in snapshots] return snapshots From a338e1715aefad8ba459b044331a4aa51f63c84e Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 25 Jun 2024 10:42:04 -0700 Subject: [PATCH 19/19] test_serialize_snapshot_without_sequence_number --- tests/table/test_snapshots.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 616ed6ded8..fa3464052a 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -73,17 +73,18 @@ def test_serialize_snapshot(snapshot: Snapshot) -> None: ) -def test_serialize_snapshot_with_default_sequence_number() -> None: +def test_serialize_snapshot_without_sequence_number() -> None: snapshot = Snapshot( snapshot_id=25, parent_snapshot_id=19, + sequence_number=None, timestamp_ms=1602638573590, manifest_list="s3:/a/b/c.avro", summary=Summary(Operation.APPEND), schema_id=3, ) actual = snapshot.model_dump_json() - expected = """{"snapshot-id":25,"parent-snapshot-id":19,"sequence-number":0,"timestamp-ms":1602638573590,"manifest-list":"s3:/a/b/c.avro","summary":{"operation":"append"},"schema-id":3}""" + expected = """{"snapshot-id":25,"parent-snapshot-id":19,"timestamp-ms":1602638573590,"manifest-list":"s3:/a/b/c.avro","summary":{"operation":"append"},"schema-id":3}""" assert actual == expected