Skip to content

Commit

Permalink
Core: Use 'delete' if RowDelta only has delete files (apache#10123)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Apr 16, 2024
1 parent 97c5700 commit fc5b2b3
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 17 deletions.
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ protected BaseRowDelta self() {

@Override
protected String operation() {
if (addsDeleteFiles() && !addsDataFiles()) {
return DataOperations.DELETE;
}

return DataOperations.OVERWRITE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void addAndDeleteDeleteFiles() {

CommitReport report = reporter.lastCommitReport();
assertThat(report).isNotNull();
assertThat(report.operation()).isEqualTo("overwrite");
assertThat(report.operation()).isEqualTo("delete");
assertThat(report.snapshotId()).isEqualTo(1L);
assertThat(report.sequenceNumber()).isEqualTo(1L);
assertThat(report.tableName()).isEqualTo(tableName);
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ protected static List<Object> parameters() {
return Arrays.asList(new Object[] {2, "main"}, new Object[] {2, "testBranch"});
}

@TestTemplate
public void addOnlyDeleteFilesProducesDeleteOperation() {
SnapshotUpdate<?> rowDelta =
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES);

commit(table, rowDelta, branch);
Snapshot snap = latestSnapshot(table, branch);
assertThat(snap.sequenceNumber()).isEqualTo(1);
assertThat(snap.operation()).isEqualTo(DataOperations.DELETE);
assertThat(snap.deleteManifests(table.io())).hasSize(1);
}

@TestTemplate
public void testAddDeleteFile() {
SnapshotUpdate<?> rowDelta =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_RANGE;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -255,8 +256,9 @@ protected void validateMergeOnRead(
String changedPartitionCount,
String addedDeleteFiles,
String addedDataFiles) {
String operation = null == addedDataFiles && null != addedDeleteFiles ? DELETE : OVERWRITE;
validateSnapshot(
snapshot, OVERWRITE, changedPartitionCount, null, addedDeleteFiles, addedDataFiles);
snapshot, operation, changedPartitionCount, null, addedDeleteFiles, addedDataFiles);
}

protected void validateSnapshot(
Expand Down Expand Up @@ -286,9 +288,13 @@ protected void validateProperty(Snapshot snapshot, String property, Set<String>
}

protected void validateProperty(Snapshot snapshot, String property, String expectedValue) {
String actual = snapshot.summary().get(property);
Assert.assertEquals(
"Snapshot property " + property + " has unexpected value.", expectedValue, actual);
if (null == expectedValue) {
assertThat(snapshot.summary()).doesNotContainKey(property);
} else {
assertThat(snapshot.summary())
.as("Snapshot property " + property + " has unexpected value.")
.containsEntry(property, expectedValue);
}
}

protected void sleep(long millis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.spark.sql.functions.lit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.Arrays;
Expand Down Expand Up @@ -334,6 +336,31 @@ public void testDeleteNonExistingRecords() {
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget()));
}

@Test
public void deleteSingleRecordProducesDeleteOperation() throws NoSuchTableException {
createAndInitPartitionedTable();
append(tableName, new Employee(1, "eng"), new Employee(2, "eng"), new Employee(3, "eng"));

sql("DELETE FROM %s WHERE id = 2", tableName);

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(2);

Snapshot currentSnapshot = table.currentSnapshot();

if (mode(table) == COPY_ON_WRITE) {
// this is an OverwriteFiles and produces "overwrite"
validateCopyOnWrite(currentSnapshot, "1", "1", "1");
} else {
// this is a RowDelta that produces a "delete" instead of "overwrite"
validateMergeOnRead(currentSnapshot, "1", "1", null);
validateProperty(currentSnapshot, ADD_POS_DELETE_FILES_PROP, "1");
}

assertThat(sql("SELECT * FROM %s", tableName))
.containsExactlyInAnyOrder(row(1, "eng"), row(3, "eng"));
}

@Test
public void testDeleteWithoutCondition() {
createAndInitPartitionedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -476,7 +479,15 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception
dataDeletes,
deleteRowSchema);

table.newRowDelta().addDeletes(eqDeletes).commit();
DataFile dataFile =
DataFiles.builder(table.spec())
.withPath(temp.newFile().toString())
.withFileSizeInBytes(10)
.withRecordCount(1)
.withFormat(FileFormat.PARQUET)
.build();

table.newRowDelta().addRows(dataFile).addDeletes(eqDeletes).commit();

// check pre-condition - that the above Delete file write - actually resulted in snapshot of
// type OVERWRITE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,9 @@ protected void validateMergeOnRead(
String changedPartitionCount,
String addedDeleteFiles,
String addedDataFiles) {
String operation = null == addedDataFiles && null != addedDeleteFiles ? DELETE : OVERWRITE;
validateSnapshot(
snapshot, OVERWRITE, changedPartitionCount, null, addedDeleteFiles, addedDataFiles);
snapshot, operation, changedPartitionCount, null, addedDeleteFiles, addedDataFiles);
}

protected void validateSnapshot(
Expand Down Expand Up @@ -323,9 +324,13 @@ protected void validateProperty(Snapshot snapshot, String property, Set<String>
}

protected void validateProperty(Snapshot snapshot, String property, String expectedValue) {
String actual = snapshot.summary().get(property);
Assert.assertEquals(
"Snapshot property " + property + " has unexpected value.", expectedValue, actual);
if (null == expectedValue) {
assertThat(snapshot.summary()).doesNotContainKey(property);
} else {
assertThat(snapshot.summary())
.as("Snapshot property " + property + " has unexpected value.")
.containsEntry(property, expectedValue);
}
}

protected void sleep(long millis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
Expand All @@ -27,6 +28,7 @@
import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.spark.sql.functions.lit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.Arrays;
Expand Down Expand Up @@ -502,6 +504,31 @@ public void testDeleteNonExistingRecords() {
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget()));
}

@Test
public void deleteSingleRecordProducesDeleteOperation() throws NoSuchTableException {
createAndInitPartitionedTable();
append(tableName, new Employee(1, "eng"), new Employee(2, "eng"), new Employee(3, "eng"));

sql("DELETE FROM %s WHERE id = 2", tableName);

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(2);

Snapshot currentSnapshot = table.currentSnapshot();

if (mode(table) == COPY_ON_WRITE) {
// this is an OverwriteFiles and produces "overwrite"
validateCopyOnWrite(currentSnapshot, "1", "1", "1");
} else {
// this is a RowDelta that produces a "delete" instead of "overwrite"
validateMergeOnRead(currentSnapshot, "1", "1", null);
validateProperty(currentSnapshot, ADD_POS_DELETE_FILES_PROP, "1");
}

assertThat(sql("SELECT * FROM %s", tableName))
.containsExactlyInAnyOrder(row(1, "eng"), row(3, "eng"));
}

@Test
public void testDeleteWithoutCondition() {
createAndInitPartitionedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -483,7 +486,15 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception
dataDeletes,
deleteRowSchema);

table.newRowDelta().addDeletes(eqDeletes).commit();
DataFile dataFile =
DataFiles.builder(table.spec())
.withPath(temp.newFile().toString())
.withFileSizeInBytes(10)
.withRecordCount(1)
.withFormat(FileFormat.PARQUET)
.build();

table.newRowDelta().addRows(dataFile).addDeletes(eqDeletes).commit();

// check pre-condition - that the above Delete file write - actually resulted in snapshot of
// type OVERWRITE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,9 @@ protected void validateMergeOnRead(
String changedPartitionCount,
String addedDeleteFiles,
String addedDataFiles) {
String operation = null == addedDataFiles && null != addedDeleteFiles ? DELETE : OVERWRITE;
validateSnapshot(
snapshot, OVERWRITE, changedPartitionCount, null, addedDeleteFiles, addedDataFiles);
snapshot, operation, changedPartitionCount, null, addedDeleteFiles, addedDataFiles);
}

protected void validateSnapshot(
Expand Down Expand Up @@ -317,10 +318,13 @@ protected void validateProperty(Snapshot snapshot, String property, Set<String>
}

protected void validateProperty(Snapshot snapshot, String property, String expectedValue) {
String actual = snapshot.summary().get(property);
assertThat(actual)
.as("Snapshot property " + property + " has unexpected value.")
.isEqualTo(expectedValue);
if (null == expectedValue) {
assertThat(snapshot.summary()).doesNotContainKey(property);
} else {
assertThat(snapshot.summary())
.as("Snapshot property " + property + " has unexpected value.")
.containsEntry(property, expectedValue);
}
}

protected void sleep(long millis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
Expand Down Expand Up @@ -501,6 +502,31 @@ public void testDeleteNonExistingRecords() {
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget()));
}

@TestTemplate
public void deleteSingleRecordProducesDeleteOperation() throws NoSuchTableException {
createAndInitPartitionedTable();
append(tableName, new Employee(1, "eng"), new Employee(2, "eng"), new Employee(3, "eng"));

sql("DELETE FROM %s WHERE id = 2", tableName);

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(2);

Snapshot currentSnapshot = table.currentSnapshot();

if (mode(table) == COPY_ON_WRITE) {
// this is an OverwriteFiles and produces "overwrite"
validateCopyOnWrite(currentSnapshot, "1", "1", "1");
} else {
// this is a RowDelta that produces a "delete" instead of "overwrite"
validateMergeOnRead(currentSnapshot, "1", "1", null);
validateProperty(currentSnapshot, ADD_POS_DELETE_FILES_PROP, "1");
}

assertThat(sql("SELECT * FROM %s", tableName))
.containsExactlyInAnyOrder(row(1, "eng"), row(3, "eng"));
}

@TestTemplate
public void testDeleteWithoutCondition() {
createAndInitPartitionedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import java.util.stream.IntStream;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RewriteFiles;
Expand Down Expand Up @@ -484,7 +486,15 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception
dataDeletes,
deleteRowSchema);

table.newRowDelta().addDeletes(eqDeletes).commit();
DataFile dataFile =
DataFiles.builder(table.spec())
.withPath(File.createTempFile("junit", null, temp.toFile()).getPath())
.withFileSizeInBytes(10)
.withRecordCount(1)
.withFormat(FileFormat.PARQUET)
.build();

table.newRowDelta().addRows(dataFile).addDeletes(eqDeletes).commit();

// check pre-condition - that the above Delete file write - actually resulted in snapshot of
// type OVERWRITE
Expand Down

0 comments on commit fc5b2b3

Please sign in to comment.