Skip to content

Commit

Permalink
Bug Fix: Position Deletes + row_filter yields less data when the Data…
Browse files Browse the repository at this point in the history
…File is large (#1141)

* add more tests for position deletes

* multiple append and deletes

* test

* fix

* adopt nit
  • Loading branch information
sungwy authored Sep 27, 2024
1 parent e5a58b3 commit c30e43a
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
16 changes: 12 additions & 4 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1238,10 +1238,13 @@ def _task_to_record_batches(
for batch in batches:
next_index = next_index + len(batch)
current_index = next_index - len(batch)
output_batches = iter([batch])
if positional_deletes:
# Create the mask of indices that we're interested in
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch))
batch = batch.take(indices)
output_batches = iter([batch])

# Apply the user filter
if pyarrow_filter is not None:
# we need to switch back and forth between RecordBatch and Table
Expand All @@ -1251,10 +1254,15 @@ def _task_to_record_batches(
arrow_table = arrow_table.filter(pyarrow_filter)
if len(arrow_table) == 0:
continue
batch = arrow_table.to_batches()[0]
yield _to_requested_schema(
projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types
)
output_batches = arrow_table.to_batches()
for output_batch in output_batches:
yield _to_requested_schema(
projected_schema,
file_project_schema,
output_batch,
downcast_ns_timestamp_to_us=True,
use_large_types=use_large_types,
)


def _task_to_table(
Expand Down
53 changes: 53 additions & 0 deletions tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,59 @@ def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSes
assert len(reader.read_all()) == 0


@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
identifier = "default.test_read_multiple_batches_in_task_with_position_deletes"

run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number int
)
USING iceberg
TBLPROPERTIES(
'format-version' = 2,
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""",
],
)

tbl = session_catalog.load_table(identifier)

arrow_table = pa.Table.from_arrays(
[
pa.array(list(range(1, 1001)) * 100),
],
schema=pa.schema([pa.field("number", pa.int32())]),
)

tbl.append(arrow_table)

run_spark_commands(
spark,
[
f"""
DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)
""",
],
)

tbl.refresh()

reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader()
assert isinstance(reader, pa.RecordBatchReader)
pyiceberg_count = len(reader.read_all())
expected_count = 46 * 100
assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}"


@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestCatalog) -> None:
Expand Down

0 comments on commit c30e43a

Please sign in to comment.