Skip to content

Commit

Permalink
[fix](iceberg)Fix count(*) error with dangling delete problem (#44039)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Prevent 'dangling delete' problem after `rewrite_data_files` action.
ref:
https://iceberg.apache.org/docs/nightly/spark-procedures/#rewrite_position_delete_files.

Because we don’t know whether the user has performed a rewrite
operation, `total-records` will only be used directly when
equalitydelete and positiondelete are both 0.

Issue Number: close #42240
  • Loading branch information
wuwenchi authored and Your Name committed Nov 18, 2024
1 parent ea61206 commit d4d67ff
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 7 deletions.
12 changes: 10 additions & 2 deletions docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@ start-thriftserver.sh --driver-java-options "-Dderby.system.home=/tmp/derby"



ls /mnt/scripts/create_preinstalled_scripts/*.sql | xargs -n 1 -I {} bash -c '
ls /mnt/scripts/create_preinstalled_scripts/iceberg/*.sql | xargs -n 1 -I {} bash -c '
START_TIME=$(date +%s)
spark-sql --master spark://doris--spark-iceberg:7077 -f {}
spark-sql --master spark://doris--spark-iceberg:7077 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -f {}
END_TIME=$(date +%s)
EXECUTION_TIME=$((END_TIME - START_TIME))
echo "Script: {} executed in $EXECUTION_TIME seconds"
'

ls /mnt/scripts/create_preinstalled_scripts/paimon/*.sql | xargs -n 1 -I {} bash -c '
START_TIME=$(date +%s)
spark-sql --master spark://doris--spark-iceberg:7077 --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions -f {}
END_TIME=$(date +%s)
EXECUTION_TIME=$((END_TIME - START_TIME))
echo "Script: {} executed in $EXECUTION_TIME seconds"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use demo.test_db;

drop table if exists dangling_delete_after_write;
create table dangling_delete_after_write (
id BIGINT NOT NULL,
val STRING)
USING iceberg
TBLPROPERTIES (
'format' = 'iceberg/parquet',
'format-version' = '2',
'identifier-fields' = '[id]',
'upsert-enabled' = 'true',
'write.delete.mode' = 'merge-on-read',
'write.parquet.compression-codec' = 'zstd',
'write.update.mode' = 'merge-on-read',
'write.upsert.enabled' = 'true');

insert into dangling_delete_after_write values(1, 'abd');
update dangling_delete_after_write set val = 'def' where id = 1;
call demo.system.rewrite_data_files(table => 'demo.test_db.dangling_delete_after_write', options => map('min-input-files', '1'));
insert into dangling_delete_after_write values(2, 'xyz');
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,15 @@ private long getCountFromSnapshot() {
return 0;
}

// `TOTAL_POSITION_DELETES` is need to 0,
// because prevent 'dangling delete' problem after `rewrite_data_files`
// ref: https://iceberg.apache.org/docs/nightly/spark-procedures/#rewrite_position_delete_files
Map<String, String> summary = snapshot.summary();
if (summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) {
return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS))
- Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES));
} else {
if (!summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")
|| !summary.get(IcebergUtils.TOTAL_POSITION_DELETES).equals("0")) {
return -1;
}
return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@
-- !q08 --
1000

-- !q09 --
2

Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external
}
explain {
sql("""${sqlstr4}""")
contains """pushdown agg=COUNT (1000)"""
contains """pushdown agg=COUNT (-1)"""
}

// don't use push down count
Expand Down Expand Up @@ -98,6 +98,17 @@ suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external
contains """pushdown agg=NONE"""
}

// There has `dangling delete` after rewrite
sql """ set enable_count_push_down_for_external_table=true; """
sqlstr5 = """ select count(*) from ${catalog_name}.test_db.dangling_delete_after_write; """

qt_q09 """${sqlstr5}"""

explain {
sql("""${sqlstr5}""")
contains """pushdown agg=COUNT (-1)"""
}

} finally {
sql """ set enable_count_push_down_for_external_table=true; """
sql """drop catalog if exists ${catalog_name}"""
Expand Down

0 comments on commit d4d67ff

Please sign in to comment.