From 3b67fae52f0605daebf9c40d6dc688388ecd4e9d Mon Sep 17 00:00:00 2001 From: Ashvin Agrawal Date: Sun, 8 Dec 2024 10:21:48 -0800 Subject: [PATCH] Address review comments, validate deletion records --- .../delta/ITDeltaDeleteVectorConvert.java | 37 +++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java index d0564ddaf..d1d33bf82 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java @@ -35,6 +35,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.apache.spark.sql.delta.DeltaLog; +import org.apache.spark.sql.delta.actions.AddFile; + +import scala.Option; + import org.apache.xtable.GenericTable; import org.apache.xtable.TestSparkDeltaTable; import org.apache.xtable.model.TableChange; @@ -85,12 +90,6 @@ public void testInsertsUpsertsAndDeletes() { TestSparkDeltaTable testSparkDeltaTable = new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); - List> allActiveFiles = new ArrayList<>(); - List allTableChanges = new ArrayList<>(); - List rows = testSparkDeltaTable.insertRows(50); - Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp(); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); - // enable deletion vectors for the test table testSparkDeltaTable .getSparkSession() @@ -99,14 +98,22 @@ public void testInsertsUpsertsAndDeletes() { + tableName + " SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + List> allActiveFiles = new ArrayList<>(); + List allTableChanges = new ArrayList<>(); + List rows = testSparkDeltaTable.insertRows(50); + Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp(); + allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + List rows1 = testSparkDeltaTable.insertRows(50); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); assertEquals(100L, testSparkDeltaTable.getNumRows()); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0); // upsert does not create delete vectors testSparkDeltaTable.upsertRows(rows.subList(0, 20)); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); assertEquals(100L, testSparkDeltaTable.getNumRows()); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0); testSparkDeltaTable.insertRows(50); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); @@ -121,6 +128,7 @@ public void testInsertsUpsertsAndDeletes() { testSparkDeltaTable.deleteRows(rowsToDelete); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); assertEquals(135L, testSparkDeltaTable.getNumRows()); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 15); testSparkDeltaTable.insertRows(50); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); @@ -133,6 +141,7 @@ public void testInsertsUpsertsAndDeletes() { testSparkDeltaTable.deleteRows(rowsToDelete); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); assertEquals(178L, testSparkDeltaTable.getNumRows()); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 22); testSparkDeltaTable.insertRows(50); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); @@ -150,6 +159,20 @@ public void testInsertsUpsertsAndDeletes() { // DeltaConversionSource conversionSource = // conversionSourceProvider.getConversionSourceInstance(tableConfig); // InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); - // + } + + private void validateDeletedRecordCount( + DeltaLog deltaLog, int version, int deleteVectorFileCount, int deletionRecordCount) { + List allFiles = + deltaLog.getSnapshotAt(version, Option.empty()).allFiles().collectAsList(); + List filesWithDeletionVectors = + allFiles.stream().filter(f -> f.deletionVector() != null).collect(Collectors.toList()); + + assertEquals(deleteVectorFileCount, filesWithDeletionVectors.size()); + assertEquals( + deletionRecordCount, + filesWithDeletionVectors.stream() + .collect(Collectors.summarizingLong(AddFile::numDeletedRecords)) + .getSum()); } }