Skip to content

Commit

Permalink
Bug: HiveCatalog's _commit_table refresh and update the metadata wi…
Browse files Browse the repository at this point in the history
…thin transaction (#607)

* make refresh and update metadata in a transaction

* fix integration tests
  • Loading branch information
HonahX committed Apr 16, 2024
1 parent 8d52993 commit fb95038
Showing 1 changed file with 20 additions and 18 deletions.
38 changes: 20 additions & 18 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,22 +369,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
current_table = self.load_table(identifier_tuple)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
base_metadata = current_table.metadata
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

# commit to hive
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
with self._client as open_client:
Expand All @@ -394,11 +379,28 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
if lock.state != LockState.ACQUIRED:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")

tbl = open_client.get_table(dbname=database_name, tbl_name=table_name)
tbl.parameters = _construct_parameters(
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
current_table = self._convert_hive_into_iceberg(hive_table, io)

base_metadata = current_table.metadata
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

hive_table.parameters = _construct_parameters(
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
)
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=tbl)
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
except NoSuchObjectException as e:
raise NoSuchTableError(f"Table does not exist: {table_name}") from e
finally:
Expand Down

0 comments on commit fb95038

Please sign in to comment.