diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index d84c82ec2a..d9affe5a48 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1047,6 +1047,67 @@ readable_metrics: [ To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively. +### Position deletes + +Inspect the positional delete files in the current snapshot of the table: + +```python +table.inspect.position_deletes() +``` + +```python +pyarrow.Table +file_path: string not null +pos: int64 not null +row: struct + child 0, id: int32 + child 1, data: large_string +partition: struct not null + child 0, data: large_string +spec_id: int64 +delete_file_path: string not null +---- +file_path: [[],[],[],["s3://warehouse/default/table_metadata_position_deletes/data/data=a/00000-1-acbf93b7-f760-4517-aa84-b9240902d3d2-0-00001.parquet"]] +pos: [[],[],[],[0]] +row: [ + -- is_valid: all not null + -- child 0 type: int32 +[] + -- child 1 type: large_string +[], + -- is_valid: all not null + -- child 0 type: int32 +[] + -- child 1 type: large_string +[], + -- is_valid: all not null + -- child 0 type: int32 +[] + -- child 1 type: large_string +[], + -- is_valid: [false] + -- child 0 type: int32 +[0] + -- child 1 type: large_string +[""]] +partition: [ + -- is_valid: all not null + -- child 0 type: large_string +[], + -- is_valid: all not null + -- child 0 type: large_string +[], + -- is_valid: all not null + -- child 0 type: large_string +[], + -- is_valid: all not null + -- child 0 type: large_string +["a"]] +spec_id: [[],[],[],[0]] +delete_file_path: [[],[],[],["s3://warehouse/default/table_metadata_position_deletes/data/data=a/00000-5-bc7a1d8a-fefe-4277-b4ac-8f1dd7badb7a-00001-deletes.parquet"]] + +``` + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1aaab32dbe..a207758821 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -123,6 +123,7 @@ DataFile, DataFileContent, FileFormat, + PositionDelete, ) from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec, partition_record_value from pyiceberg.schema import ( @@ -921,18 +922,31 @@ def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs) +def _read_delete_file(fs: FileSystem, data_file: DataFile) -> Iterator[PositionDelete]: + delete_fragment = _construct_fragment(fs, data_file, file_format_kwargs={"pre_buffer": True, "buffer_size": ONE_MEGABYTE}) + table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table() + for batch in table.to_batches(): + for i in range(len(batch)): + row = batch.column("row")[i].as_py() if "row" in batch.schema.names else None + yield PositionDelete( + file_path=batch.column("file_path")[i].as_py(), + pos=batch.column("pos")[i].as_py(), + row=row, # Setting row as None since it's optional and not needed for position deletes + ) + + def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: if data_file.file_format == FileFormat.PARQUET: - delete_fragment = _construct_fragment( - fs, - data_file, - file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE}, - ) - table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table() - table = table.unify_dictionaries() + deletes_by_file: Dict[str, List[int]] = {} + for delete in _read_delete_file(fs, data_file): + if delete.path not in deletes_by_file: + deletes_by_file[delete.path] = [] + deletes_by_file[delete.path].append(delete.pos) + + # Convert lists of positions to ChunkedArrays return { - file.as_py(): table.filter(pc.field("file_path") == file).column("pos") - for file in table.column("file_path").chunks[0].dictionary + file_path: pa.chunked_array([pa.array(positions, type=pa.int64())]) + for file_path, positions in deletes_by_file.items() } elif data_file.file_format == FileFormat.PUFFIN: _, _, path = PyArrowFileIO.parse_location(data_file.file_path) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 61cb87e3d8..69c8f4931c 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -321,6 +321,34 @@ def data_file_with_partition(partition_type: StructType, format_version: TableVe ) +class PositionDelete(Record): + __slots__ = ("file_path", "pos", "row") + path: str + pos: int + row: Optional[Record] + + def __setattr__(self, name: str, value: Any) -> None: + """Assign a key/value to a PositionDelete.""" + super().__setattr__(name, value) + + def __init__(self, file_path: str, pos: int, row: Optional[Record], *data: Any, **named_data: Any) -> None: + super().__init__(*data, **named_data) + self.path = file_path + self.pos = pos + self.row = row + + def __hash__(self) -> int: + """Return the hash of the file path.""" + return hash(self.path) + + def __eq__(self, other: Any) -> bool: + """Compare the PositionDelete with another object. + + If it is a PositionDelete, it will compare based on the file_path. + """ + return self.path == other.path if isinstance(other, PositionDelete) else False + + class DataFile(Record): @classmethod def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> DataFile: diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index cce5250ad5..f580082ac1 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -22,6 +22,7 @@ from pyiceberg.conversions import from_bytes from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Snapshot, ancestors_of from pyiceberg.types import PrimitiveType from pyiceberg.utils.concurrent import ExecutorFactory @@ -384,6 +385,28 @@ def _get_all_manifests_schema(self) -> "pa.Schema": all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False)) return all_manifests_schema + def _get_positional_deletes_schema(self, schema: Optional[Schema] = None, spec_id: Optional[int] = None) -> "pa.Schema": + import pyarrow as pa + + from pyiceberg.io.pyarrow import schema_to_pyarrow + + schema = schema or self.tbl.metadata.schema() + + partition_struct = self.tbl.metadata.spec_struct(spec_id=spec_id) + pa_partition_struct = schema_to_pyarrow(partition_struct) + pa_row_struct = schema_to_pyarrow(schema.as_struct()) + positional_delete_schema = pa.schema( + [ + pa.field("file_path", pa.string(), nullable=False), + pa.field("pos", pa.int64(), nullable=False), + pa.field("row", pa_row_struct, nullable=True), + pa.field("partition", pa_partition_struct, nullable=False), + pa.field("spec_id", pa.int64(), nullable=True), + pa.field("delete_file_path", pa.string(), nullable=False), + ] + ) + return positional_delete_schema + def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table": import pyarrow as pa @@ -453,6 +476,42 @@ def _partition_summaries_to_rows( schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(), ) + def _generate_positional_delete_table(self, manifest: ManifestFile, schema: Schema) -> "pa.Table": + import pyarrow as pa + + positional_deletes: List["pa.Table"] = [] + + position_deletes_schema = self._get_positional_deletes_schema(schema=schema, spec_id=manifest.partition_spec_id) + + if manifest.content == ManifestContent.DELETES: + for entry in manifest.fetch_manifest_entry(self.tbl.io): + if entry.data_file.content == DataFileContent.POSITION_DELETES: + from pyiceberg.io.pyarrow import _fs_from_file_path, _read_delete_file + + positional_delete_file = _read_delete_file( + _fs_from_file_path(self.tbl.io, entry.data_file.file_path), entry.data_file + ) + positional_deletes_records = [] + for record in positional_delete_file: + row = { + "file_path": record.path, + "pos": record.pos, + "row": record.row, + "partition": { + field.name: entry.data_file.partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + }, + "spec_id": manifest.partition_spec_id, + "delete_file_path": entry.data_file.file_path, + } + positional_deletes_records.append(row) + + positional_deletes.append(pa.Table.from_pylist(positional_deletes_records, position_deletes_schema)) + + if not positional_deletes: + return pa.Table.from_pylist([], position_deletes_schema) + return pa.concat_tables(positional_deletes) + def manifests(self) -> "pa.Table": return self._generate_manifests_table(self.tbl.current_snapshot()) @@ -704,3 +763,26 @@ def all_data_files(self) -> "pa.Table": def all_delete_files(self) -> "pa.Table": return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) + + def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table": + import pyarrow as pa + + snapshot = self._get_snapshot(snapshot_id) if snapshot_id else self.tbl.current_snapshot() + if not snapshot: + schema = self._get_positional_deletes_schema() + return pa.Table.from_pylist([], schema=schema) + + if snapshot.schema_id is None: + raise ValueError(f"Snapshot {snapshot.snapshot_id} does not have a schema id") + + schemas = self.tbl.schemas() + schema = schemas.get(snapshot.schema_id, None) + if not schema: + raise ValueError(f"Cannot find schema with id: {snapshot.schema_id}") + + executor = ExecutorFactory.get_or_create() + positional_deletes: Iterator["pa.Table"] = executor.map( + lambda manifest: self._generate_positional_delete_table(manifest, schema=schema), + snapshot.manifests(self.tbl.io), + ) + return pa.concat_tables(positional_deletes) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index f248700c02..59ef39fbec 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -279,6 +279,37 @@ def specs_struct(self) -> StructType: return StructType(*nested_fields) + def spec_struct(self, spec_id: Optional[int] = None) -> StructType: + """Produce for a spec_id a struct of PartitionSpecs. + + The partition fields should be optional: Partition fields may be added later, + in which case not all files would have the result field, and it may be null. + + :return: A StructType that represents a PartitionSpec of the table for a specific spec_id or latest. + """ + if spec_id is None: + spec = self.spec() + else: + specs = self.specs() + filtered_spec = list(filter(lambda spec: spec.spec_id == spec_id, specs.values())) + if not filtered_spec: + raise ValidationError(f"Spec with spec_id {spec_id} not found") + spec = filtered_spec[0] + # Collect all the fields + struct_fields = {field.field_id: field for field in spec.fields} + + schema = self.schema() + + nested_fields = [] + # Sort them by field_id in order to get a deterministic output + for field_id in sorted(struct_fields): + field = struct_fields[field_id] + source_type = schema.find_type(field.source_id) + result_type = field.transform.result_type(source_type) + nested_fields.append(NestedField(field_id=field.field_id, name=field.name, type=result_type, required=False)) + + return StructType(*nested_fields) + def new_snapshot_id(self) -> int: """Generate a new snapshot-id that's not in use.""" snapshot_id = _generate_snapshot_id() diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index e81050a81c..47efda47d7 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -1101,3 +1101,56 @@ def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog .reset_index() ) assert_frame_equal(lhs, rhs, check_dtype=False) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + from pandas.testing import assert_frame_equal + + identifier = "default.table_metadata_position_deletes" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + id int, + data string + ) + PARTITIONED BY (data) + TBLPROPERTIES ('write.update.mode'='merge-on-read', + 'write.delete.mode'='merge-on-read') + """ + ) + tbl = session_catalog.load_table(identifier) + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") + + spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") + + spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") + + spark.sql(f"DELETE FROM {identifier} WHERE id = 2") + + tbl.refresh() + df = tbl.inspect.position_deletes() + + assert df.column_names == ["file_path", "pos", "row", "partition", "spec_id", "delete_file_path"] + + int_cols = ["pos"] + string_cols = ["file_path", "delete_file_path"] + + for column in int_cols: + for value in df[column]: + assert isinstance(value.as_py(), int) + + for column in string_cols: + for value in df[column]: + assert isinstance(value.as_py(), str) + + lhs = spark.table(f"{identifier}.position_deletes").toPandas() + rhs = df.to_pandas() + assert_frame_equal(lhs, rhs, check_dtype=False)