Skip to content

Commit

Permalink
Ignore partition fields that reference a dropped source-id
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Dec 3, 2024
1 parent 5bef1bf commit 9cb74a4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
5 changes: 4 additions & 1 deletion pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ def partition_type(self, schema: Schema) -> StructType:
"""
nested_fields = []
for field in self.fields:
source_type = schema.find_type(field.source_id)
try:
source_type = schema.find_type(field.source_id)
except ValueError:
continue
result_type = field.transform.result_type(source_type)
required = schema.find_field(field.source_id).required
nested_fields.append(NestedField(field.field_id, field.name, result_type, required=required))
Expand Down
40 changes: 40 additions & 0 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1570,3 +1570,43 @@ def test_abort_table_transaction_on_exception(

# Validate the transaction is aborted and no partial update is applied
assert len(tbl.scan().to_pandas()) == table_size # type: ignore


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_drop_field_from_partition_spec(session_catalog: Catalog, format_version: int) -> None:
identifier = f"default.drop_partition_field{format_version}"

try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

pa_table_with_column = pa.Table.from_pydict(
{
"foo": ["a", None, "z"],
"bar": [19, None, 25],
},
schema=pa.schema([
pa.field("foo", pa.string(), nullable=True),
pa.field("bar", pa.int32(), nullable=True),
]),
)

tbl = session_catalog.create_table(
identifier=identifier, schema=pa_table_with_column.schema, properties={"format-version": str(format_version)}
)

with tbl.update_spec() as spec:
spec.add_field("foo", IdentityTransform(), "foo_identity")

with tbl.transaction() as txn:
txn.append(pa_table_with_column)

with tbl.update_spec() as spec:
spec.remove_field("foo_identity")

with tbl.update_schema() as schema:
schema.delete_column("foo")

assert tbl.scan().to_arrow()["bar"].to_pylist() == [19, 25, None]

0 comments on commit 9cb74a4

Please sign in to comment.