Skip to content

Add Refs metadata table #602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,31 @@ readable_metrics: [
[6.0989]]
```

### References

To show a table's known snapshot references:

```python
table.inspect.refs()
```

```
pyarrow.Table
name: string not null
type: string not null
snapshot_id: int64 not null
max_reference_age_in_ms: int64
min_snapshots_to_keep: int32
max_snapshot_age_in_ms: int64
----
name: [["main","testTag"]]
type: [["BRANCH","TAG"]]
snapshot_id: [[2278002651076891950,2278002651076891950]]
max_reference_age_in_ms: [[null,604800000]]
min_snapshots_to_keep: [[null,10]]
max_snapshot_age_in_ms: [[null,604800000]]
```

## Add Files

Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
Expand Down
26 changes: 26 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3423,6 +3423,32 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
schema=entries_schema,
)

def refs(self) -> "pa.Table":
import pyarrow as pa

ref_schema = pa.schema([
pa.field('name', pa.string(), nullable=False),
pa.field('type', pa.dictionary(pa.int32(), pa.string()), nullable=False),
pa.field('snapshot_id', pa.int64(), nullable=False),
pa.field('max_reference_age_in_ms', pa.int64(), nullable=True),
pa.field('min_snapshots_to_keep', pa.int32(), nullable=True),
pa.field('max_snapshot_age_in_ms', pa.int64(), nullable=True),
])

ref_results = []
for ref in self.tbl.metadata.refs:
if snapshot_ref := self.tbl.metadata.refs.get(ref):
ref_results.append({
'name': ref,
'type': snapshot_ref.snapshot_ref_type.upper(),
'snapshot_id': snapshot_ref.snapshot_id,
'max_reference_age_in_ms': snapshot_ref.max_ref_age_ms,
'min_snapshots_to_keep': snapshot_ref.min_snapshots_to_keep,
'max_snapshot_age_in_ms': snapshot_ref.max_snapshot_age_ms,
})

return pa.Table.from_pylist(ref_results, schema=ref_schema)

def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
import pyarrow as pa

Expand Down
60 changes: 60 additions & 0 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,66 @@ def test_inspect_entries_partitioned(spark: SparkSession, session_catalog: Catal
assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None, 'dt_month': 612}


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_refs(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.table_metadata_refs"
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})

# write data to create snapshot
tbl.overwrite(arrow_table_with_null)

# create a test branch
spark.sql(
f"""
ALTER TABLE {identifier} CREATE BRANCH IF NOT EXISTS testBranch RETAIN 7 DAYS WITH SNAPSHOT RETENTION 2 SNAPSHOTS
"""
)

# create a test tag against current snapshot
current_snapshot = tbl.current_snapshot()
assert current_snapshot is not None
current_snapshot_id = current_snapshot.snapshot_id

spark.sql(
f"""
ALTER TABLE {identifier} CREATE TAG testTag AS OF VERSION {current_snapshot_id} RETAIN 180 DAYS
"""
)

df = tbl.refresh().inspect.refs()

assert df.column_names == [
'name',
'type',
'snapshot_id',
'max_reference_age_in_ms',
'min_snapshots_to_keep',
'max_snapshot_age_in_ms',
]

assert [name.as_py() for name in df['name']] == ['testBranch', 'main', 'testTag']
assert [ref_type.as_py() for ref_type in df['type']] == ['BRANCH', 'BRANCH', 'TAG']

for snapshot_id in df['snapshot_id']:
assert isinstance(snapshot_id.as_py(), int)

for int_column in ['max_reference_age_in_ms', 'min_snapshots_to_keep', 'max_snapshot_age_in_ms']:
for value in df[int_column]:
assert isinstance(value.as_py(), int) or not value.as_py()

lhs = spark.table(f"{identifier}.refs").toPandas()
rhs = df.to_pandas()
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
# NaN != NaN in Python
continue
assert left == right, f"Difference in column {column}: {left} != {right}"


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_partitions_unpartitioned(
Expand Down