From c30e43adf94a82ec1a225d3a1bf69fface592cfd Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+sungwy@users.noreply.github.com> Date: Thu, 26 Sep 2024 22:05:25 -0400 Subject: [PATCH] Bug Fix: Position Deletes + row_filter yields less data when the DataFile is large (#1141) * add more tests for position deletes * multiple append and deletes * test * fix * adopt nit --- pyiceberg/io/pyarrow.py | 16 +++++++--- tests/integration/test_deletes.py | 53 +++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 999813d0c2..aa27796081 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1238,10 +1238,13 @@ def _task_to_record_batches( for batch in batches: next_index = next_index + len(batch) current_index = next_index - len(batch) + output_batches = iter([batch]) if positional_deletes: # Create the mask of indices that we're interested in indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) batch = batch.take(indices) + output_batches = iter([batch]) + # Apply the user filter if pyarrow_filter is not None: # we need to switch back and forth between RecordBatch and Table @@ -1251,10 +1254,15 @@ def _task_to_record_batches( arrow_table = arrow_table.filter(pyarrow_filter) if len(arrow_table) == 0: continue - batch = arrow_table.to_batches()[0] - yield _to_requested_schema( - projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types - ) + output_batches = arrow_table.to_batches() + for output_batch in output_batches: + yield _to_requested_schema( + projected_schema, + file_project_schema, + output_batch, + downcast_ns_timestamp_to_us=True, + use_large_types=use_large_types, + ) def _task_to_table( diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index d95b299ae1..2cdf9916ee 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -292,6 +292,59 @@ def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSes assert len(reader.read_all()) == 0 +@pytest.mark.integration +@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write") +def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None: + identifier = "default.test_read_multiple_batches_in_task_with_position_deletes" + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number int + ) + USING iceberg + TBLPROPERTIES( + 'format-version' = 2, + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read' + ) + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + + arrow_table = pa.Table.from_arrays( + [ + pa.array(list(range(1, 1001)) * 100), + ], + schema=pa.schema([pa.field("number", pa.int32())]), + ) + + tbl.append(arrow_table) + + run_spark_commands( + spark, + [ + f""" + DELETE FROM {identifier} WHERE number in (1, 2, 3, 4) + """, + ], + ) + + tbl.refresh() + + reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader() + assert isinstance(reader, pa.RecordBatchReader) + pyiceberg_count = len(reader.read_all()) + expected_count = 46 * 100 + assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}" + + @pytest.mark.integration @pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write") def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestCatalog) -> None: