Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Fix: use appropriate partition spec for delete #984

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3135,16 +3135,22 @@ def _write_delete_manifest() -> List[ManifestFile]:
# Check if we need to mark the files as deleted
deleted_entries = self._deleted_entries()
if len(deleted_entries) > 0:
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.spec(),
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
for delete_entry in deleted_entries:
writer.add_entry(delete_entry)
return [writer.to_manifest_file()]
deleted_manifests = []
partition_groups: Dict[int, List[ManifestEntry]] = defaultdict(list)
for deleted_entry in deleted_entries:
partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry)
for spec_id, entries in partition_groups.items():
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.specs()[spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
for entry in entries:
writer.add_entry(entry)
deleted_manifests.append(writer.to_manifest_file())
return deleted_manifests
else:
return []

Expand Down
91 changes: 90 additions & 1 deletion tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
from datetime import datetime
from typing import List

import pyarrow as pa
Expand All @@ -25,9 +26,11 @@
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import AlwaysTrue, EqualTo
from pyiceberg.manifest import ManifestEntryStatus
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.snapshots import Operation, Summary
from pyiceberg.types import FloatType, IntegerType, NestedField
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, TimestampType


def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
Expand Down Expand Up @@ -556,3 +559,89 @@ def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None:
assert 2.0 in result
assert 3.0 in result
assert 4.0 in result


@pytest.mark.integration
def test_delete_after_partition_evolution_from_unpartitioned(session_catalog: RestCatalog) -> None:
identifier = "default.test_delete_after_partition_evolution_from_unpartitioned"

arrow_table = pa.Table.from_arrays(
[
pa.array([2, 3, 4, 5, 6]),
],
names=["idx"],
)

try:
session_catalog.drop_table(identifier)
except NoSuchTableError:
pass

tbl = session_catalog.create_table(
identifier,
schema=Schema(
NestedField(1, "idx", LongType()),
),
)

tbl.append(arrow_table)

with tbl.transaction() as tx:
with tx.update_schema() as schema:
schema.rename_column("idx", "id")
with tx.update_spec() as spec:
spec.add_field("id", IdentityTransform())

# Append one more time to create data files with two partition specs
tbl.append(arrow_table.rename_columns(["id"]))

tbl.delete("id == 4")

# Expect 8 records: 10 records - 2
assert len(tbl.scan().to_arrow()) == 8


@pytest.mark.integration
def test_delete_after_partition_evolution_from_partitioned(session_catalog: RestCatalog) -> None:
identifier = "default.test_delete_after_partition_evolution_from_partitioned"

arrow_table = pa.Table.from_arrays(
[
pa.array([2, 3, 4, 5, 6]),
pa.array([
datetime(2021, 5, 19),
datetime(2022, 7, 25),
datetime(2023, 3, 22),
datetime(2024, 7, 17),
datetime(2025, 2, 22),
]),
],
names=["idx", "ts"],
)

try:
session_catalog.drop_table(identifier)
except NoSuchTableError:
pass

tbl = session_catalog.create_table(
identifier,
schema=Schema(NestedField(1, "idx", LongType()), NestedField(2, "ts", TimestampType())),
partition_spec=PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="ts")),
)

tbl.append(arrow_table)

with tbl.transaction() as tx:
with tx.update_schema() as schema:
schema.rename_column("idx", "id")
with tx.update_spec() as spec:
spec.add_field("id", IdentityTransform())

# Append one more time to create data files with two partition specs
tbl.append(arrow_table.rename_columns(["id", "ts"]))

tbl.delete("id == 4")

# Expect 8 records: 10 records - 2
assert len(tbl.scan().to_arrow()) == 8