From 7e593429bce84f76e24367140e5711b705aa8fdd Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 7 Apr 2024 16:21:08 -0700 Subject: [PATCH] fix name-mapping issue --- pyiceberg/io/pyarrow.py | 2 +- pyiceberg/table/__init__.py | 6 +----- pyiceberg/table/metadata.py | 8 ++++++++ tests/integration/test_add_files.py | 9 +++++++++ 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index b2be460926..ad5be587bd 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1091,7 +1091,7 @@ def project_table( deletes_per_file.get(task.file.file_path), case_sensitive, limit, - None, + table_metadata.name_mapping(), ) for task in tasks ] diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 27e21c762a..664a73e1c7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -103,7 +103,6 @@ ) from pyiceberg.table.name_mapping import ( NameMapping, - parse_mapping_from_json, update_mapping, ) from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef @@ -1307,10 +1306,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" - if name_mapping_json := self.properties.get(TableProperties.DEFAULT_NAME_MAPPING): - return parse_mapping_from_json(name_mapping_json) - else: - return None + return self.metadata.name_mapping() def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 65f4b7a429..ba0c885758 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -35,6 +35,7 @@ from pyiceberg.exceptions import ValidationError from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec, assign_fresh_partition_spec_ids from pyiceberg.schema import Schema, assign_fresh_schema_ids +from pyiceberg.table.name_mapping import NameMapping, parse_mapping_from_json from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import MetadataLogEntry, Snapshot, SnapshotLogEntry from pyiceberg.table.sorting import ( @@ -237,6 +238,13 @@ def schema(self) -> Schema: """Return the schema for this table.""" return next(schema for schema in self.schemas if schema.schema_id == self.current_schema_id) + def name_mapping(self) -> Optional[NameMapping]: + """Return the table's field-id NameMapping.""" + if name_mapping_json := self.properties.get("schema.name-mapping.default"): + return parse_mapping_from_json(name_mapping_json) + else: + return None + def spec(self) -> PartitionSpec: """Return the partition spec of this table.""" return next(spec for spec in self.partition_specs if spec.spec_id == self.default_spec_id) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 7c17618280..0de5d5f4ce 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -158,6 +158,9 @@ def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: for col in df.columns: assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" + # check that the table can be read by pyiceberg + assert len(tbl.scan().to_arrow()) == 5, "Expected 5 rows" + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) @@ -255,6 +258,9 @@ def test_add_files_to_unpartitioned_table_with_schema_updates( value_count = 1 if col == "quux" else 6 assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null" + # check that the table can be read by pyiceberg + assert len(tbl.scan().to_arrow()) == 6, "Expected 6 rows" + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) @@ -324,6 +330,9 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca assert [row.file_count for row in partition_rows] == [5] assert [(row.partition.baz, row.partition.qux_month) for row in partition_rows] == [(123, 650)] + # check that the table can be read by pyiceberg + assert len(tbl.scan().to_arrow()) == 5, "Expected 5 rows" + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2])