Skip to content

Commit aa46543

Browse files
authored
Bug: HiveCatalog's _commit_table refresh and update the metadata within transaction (#607)
* make refresh and update metadata in a transaction * fix integration tests
1 parent 4b91105 commit aa46543

File tree

1 file changed

+20
-18
lines changed

1 file changed

+20
-18
lines changed

pyiceberg/catalog/hive.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -372,22 +372,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
372372
identifier_tuple = self.identifier_to_tuple_without_catalog(
373373
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
374374
)
375-
current_table = self.load_table(identifier_tuple)
376375
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
377-
base_metadata = current_table.metadata
378-
for requirement in table_request.requirements:
379-
requirement.validate(base_metadata)
380-
381-
updated_metadata = update_table_metadata(base_metadata, table_request.updates)
382-
if updated_metadata == base_metadata:
383-
# no changes, do nothing
384-
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
385-
386-
# write new metadata
387-
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
388-
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
389-
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
390-
391376
# commit to hive
392377
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
393378
with self._client as open_client:
@@ -397,11 +382,28 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
397382
if lock.state != LockState.ACQUIRED:
398383
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
399384

400-
tbl = open_client.get_table(dbname=database_name, tbl_name=table_name)
401-
tbl.parameters = _construct_parameters(
385+
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
386+
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
387+
current_table = self._convert_hive_into_iceberg(hive_table, io)
388+
389+
base_metadata = current_table.metadata
390+
for requirement in table_request.requirements:
391+
requirement.validate(base_metadata)
392+
393+
updated_metadata = update_table_metadata(base_metadata, table_request.updates)
394+
if updated_metadata == base_metadata:
395+
# no changes, do nothing
396+
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
397+
398+
# write new metadata
399+
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
400+
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
401+
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
402+
403+
hive_table.parameters = _construct_parameters(
402404
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
403405
)
404-
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=tbl)
406+
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
405407
except NoSuchObjectException as e:
406408
raise NoSuchTableError(f"Table does not exist: {table_name}") from e
407409
finally:

0 commit comments

Comments
 (0)