Skip to content

Commit

Permalink
fix name-mapping issue
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX committed Apr 7, 2024
1 parent aadc89c commit 7e59342
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ def project_table(
deletes_per_file.get(task.file.file_path),
case_sensitive,
limit,
None,
table_metadata.name_mapping(),
)
for task in tasks
]
Expand Down
6 changes: 1 addition & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
)
from pyiceberg.table.name_mapping import (
NameMapping,
parse_mapping_from_json,
update_mapping,
)
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
Expand Down Expand Up @@ -1307,10 +1306,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive

def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
if name_mapping_json := self.properties.get(TableProperties.DEFAULT_NAME_MAPPING):
return parse_mapping_from_json(name_mapping_json)
else:
return None
return self.metadata.name_mapping()

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Expand Down
8 changes: 8 additions & 0 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from pyiceberg.exceptions import ValidationError
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec, assign_fresh_partition_spec_ids
from pyiceberg.schema import Schema, assign_fresh_schema_ids
from pyiceberg.table.name_mapping import NameMapping, parse_mapping_from_json
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
from pyiceberg.table.snapshots import MetadataLogEntry, Snapshot, SnapshotLogEntry
from pyiceberg.table.sorting import (
Expand Down Expand Up @@ -237,6 +238,13 @@ def schema(self) -> Schema:
"""Return the schema for this table."""
return next(schema for schema in self.schemas if schema.schema_id == self.current_schema_id)

def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
if name_mapping_json := self.properties.get("schema.name-mapping.default"):
return parse_mapping_from_json(name_mapping_json)
else:
return None

def spec(self) -> PartitionSpec:
"""Return the partition spec of this table."""
return next(spec for spec in self.partition_specs if spec.spec_id == self.default_spec_id)
Expand Down
9 changes: 9 additions & 0 deletions tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog:
for col in df.columns:
assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null"

# check that the table can be read by pyiceberg
assert len(tbl.scan().to_arrow()) == 5, "Expected 5 rows"


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
Expand Down Expand Up @@ -255,6 +258,9 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(
value_count = 1 if col == "quux" else 6
assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null"

# check that the table can be read by pyiceberg
assert len(tbl.scan().to_arrow()) == 6, "Expected 6 rows"


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
Expand Down Expand Up @@ -324,6 +330,9 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca
assert [row.file_count for row in partition_rows] == [5]
assert [(row.partition.baz, row.partition.qux_month) for row in partition_rows] == [(123, 650)]

# check that the table can be read by pyiceberg
assert len(tbl.scan().to_arrow()) == 5, "Expected 5 rows"


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
Expand Down

0 comments on commit 7e59342

Please sign in to comment.