diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 6bbd9abea1..84604118c7 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -656,6 +656,29 @@ partition_summaries: [[ -- is_valid: all not null ["test"]]] ``` +### Metadata Log Entries + +To show table metadata log entries: + +```python +table.inspect.metadata_log_entries() +``` + +``` +pyarrow.Table +timestamp: timestamp[ms] not null +file: string not null +latest_snapshot_id: int64 +latest_schema_id: int32 +latest_sequence_number: int64 +---- +timestamp: [[2024-04-28 17:03:00.214,2024-04-28 17:03:00.352,2024-04-28 17:03:00.445,2024-04-28 17:03:00.498]] +file: [["s3://warehouse/default/table_metadata_log_entries/metadata/00000-0b3b643b-0f3a-4787-83ad-601ba57b7319.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00001-f74e4b2c-0f89-4f55-822d-23d099fd7d54.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00002-97e31507-e4d9-4438-aff1-3c0c5304d271.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00003-6c8b7033-6ad8-4fe4-b64d-d70381aeaddc.metadata.json"]] +latest_snapshot_id: [[null,3958871664825505738,1289234307021405706,7640277914614648349]] +latest_schema_id: [[null,0,0,0]] +latest_sequence_number: [[null,0,0,0]] +``` + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 9a10fc6bf5..07c709fd45 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3827,6 +3827,40 @@ def _partition_summaries_to_rows( schema=manifest_schema, ) + def metadata_log_entries(self) -> "pa.Table": + import pyarrow as pa + + from pyiceberg.table.snapshots import MetadataLogEntry + + table_schema = pa.schema([ + pa.field("timestamp", pa.timestamp(unit="ms"), nullable=False), + pa.field("file", pa.string(), nullable=False), + pa.field("latest_snapshot_id", pa.int64(), nullable=True), + pa.field("latest_schema_id", pa.int32(), nullable=True), + pa.field("latest_sequence_number", pa.int64(), nullable=True), + ]) + + def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any]: + latest_snapshot = self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms) + return { + "timestamp": metadata_entry.timestamp_ms, + "file": metadata_entry.metadata_file, + "latest_snapshot_id": latest_snapshot.snapshot_id if latest_snapshot else None, + "latest_schema_id": latest_snapshot.schema_id if latest_snapshot else None, + "latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None, + } + + # similar to MetadataLogEntriesTable in Java + # https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66 + metadata_log_entries = self.tbl.metadata.metadata_log + [ + MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms) + ] + + return pa.Table.from_pylist( + [metadata_log_entry_to_row(entry) for entry in metadata_log_entries], + schema=table_schema, + ) + @dataclass(frozen=True) class TablePartition: diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 8c3c389318..1fea33010c 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -311,6 +311,13 @@ def serialize_current_snapshot_id(self, current_snapshot_id: Optional[int]) -> O return -1 return current_snapshot_id + @field_serializer("snapshots") + def serialize_snapshots(self, snapshots: List[Snapshot]) -> List[Snapshot]: + # Snapshot field `sequence-number` should not be written for v1 metadata + if self.format_version == 1: + return [snapshot.model_copy(update={"sequence_number": None}) for snapshot in snapshots] + return snapshots + def _generate_snapshot_id() -> int: """Generate a new Snapshot ID from a UUID. diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index d6a3ff1654..842d42522a 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -58,6 +58,8 @@ CHANGED_PARTITION_PREFIX = "partitions." OPERATION = "operation" +INITIAL_SEQUENCE_NUMBER = 0 + class Operation(Enum): """Describes the operation. @@ -231,7 +233,7 @@ def __eq__(self, other: Any) -> bool: class Snapshot(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None) - sequence_number: Optional[int] = Field(alias="sequence-number", default=None) + sequence_number: Optional[int] = Field(alias="sequence-number", default=INITIAL_SEQUENCE_NUMBER) timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000)) manifest_list: Optional[str] = Field( alias="manifest-list", description="Location of the snapshot's manifest list file", default=None diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 1f2b9a3ead..2840fb0b16 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -528,3 +528,43 @@ def test_inspect_manifests(spark: SparkSession, session_catalog: Catalog, format for column in df.column_names: for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): assert left == right, f"Difference in column {column}: {left} != {right}" + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_metadata_log_entries( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + from pandas.testing import assert_frame_equal + + identifier = "default.table_metadata_log_entries" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + # Write some data + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + + df = tbl.inspect.metadata_log_entries() + spark_df = spark.sql(f"SELECT * FROM {identifier}.metadata_log_entries") + lhs = df.to_pandas() + rhs = spark_df.toPandas() + + # Timestamp in the last row of `metadata_log_entries` table is based on when the table was read + # Therefore, the timestamp of the last row for pyiceberg dataframe and spark dataframe will be different + left_before_last, left_last = lhs[:-1], lhs[-1:] + right_before_last, right_last = rhs[:-1], rhs[-1:] + + # compare all rows except for the last row + assert_frame_equal(left_before_last, right_before_last, check_dtype=False) + # compare the last row, except for the timestamp + for column in df.column_names: + for left, right in zip(left_last[column], right_last[column]): + if column == "timestamp": + continue + assert left == right, f"Difference in column {column}: {left} != {right}" diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 2569a11dc2..fa3464052a 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -77,6 +77,7 @@ def test_serialize_snapshot_without_sequence_number() -> None: snapshot = Snapshot( snapshot_id=25, parent_snapshot_id=19, + sequence_number=None, timestamp_ms=1602638573590, manifest_list="s3:/a/b/c.avro", summary=Summary(Operation.APPEND),