Skip to content

Commit

Permalink
Address review comments, validate deletion records
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvina committed Dec 8, 2024
1 parent 0c103ff commit 3b67fae
Showing 1 changed file with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.actions.AddFile;

import scala.Option;

import org.apache.xtable.GenericTable;
import org.apache.xtable.TestSparkDeltaTable;
import org.apache.xtable.model.TableChange;
Expand Down Expand Up @@ -85,12 +90,6 @@ public void testInsertsUpsertsAndDeletes() {
TestSparkDeltaTable testSparkDeltaTable =
new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false);

List<List<String>> allActiveFiles = new ArrayList<>();
List<TableChange> allTableChanges = new ArrayList<>();
List<Row> rows = testSparkDeltaTable.insertRows(50);
Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());

// enable deletion vectors for the test table
testSparkDeltaTable
.getSparkSession()
Expand All @@ -99,14 +98,22 @@ public void testInsertsUpsertsAndDeletes() {
+ tableName
+ " SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)");

List<List<String>> allActiveFiles = new ArrayList<>();
List<TableChange> allTableChanges = new ArrayList<>();
List<Row> rows = testSparkDeltaTable.insertRows(50);
Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());

List<Row> rows1 = testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(100L, testSparkDeltaTable.getNumRows());
validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0);

// upsert does not create delete vectors
testSparkDeltaTable.upsertRows(rows.subList(0, 20));
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(100L, testSparkDeltaTable.getNumRows());
validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0);

testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
Expand All @@ -121,6 +128,7 @@ public void testInsertsUpsertsAndDeletes() {
testSparkDeltaTable.deleteRows(rowsToDelete);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(135L, testSparkDeltaTable.getNumRows());
validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 15);

testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
Expand All @@ -133,6 +141,7 @@ public void testInsertsUpsertsAndDeletes() {
testSparkDeltaTable.deleteRows(rowsToDelete);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(178L, testSparkDeltaTable.getNumRows());
validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 22);

testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
Expand All @@ -150,6 +159,20 @@ public void testInsertsUpsertsAndDeletes() {
// DeltaConversionSource conversionSource =
// conversionSourceProvider.getConversionSourceInstance(tableConfig);
// InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
//
}

private void validateDeletedRecordCount(
DeltaLog deltaLog, int version, int deleteVectorFileCount, int deletionRecordCount) {
List<AddFile> allFiles =
deltaLog.getSnapshotAt(version, Option.empty()).allFiles().collectAsList();
List<AddFile> filesWithDeletionVectors =
allFiles.stream().filter(f -> f.deletionVector() != null).collect(Collectors.toList());

assertEquals(deleteVectorFileCount, filesWithDeletionVectors.size());
assertEquals(
deletionRecordCount,
filesWithDeletionVectors.stream()
.collect(Collectors.summarizingLong(AddFile::numDeletedRecords))
.getSum());
}
}

0 comments on commit 3b67fae

Please sign in to comment.