From 265d6131e991ea4262018e2ad80be13f75f84550 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Mon, 19 Feb 2024 15:50:46 +0800 Subject: [PATCH] update for comment --- .../table/PrimaryKeyFileStoreTableTest.java | 39 +++++++++++++++++++ .../spark/sql/LookupCompactionTest.scala | 16 -------- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 92172ae4f5fd..4fa7a1de6848 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.KeyValue; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -80,6 +81,7 @@ import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP; +import static org.apache.paimon.Snapshot.CommitKind.COMPACT; import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString; import static org.apache.paimon.io.DataFileTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; @@ -1165,6 +1167,43 @@ public void testTableQueryForNormal() throws Exception { innerTestTableQuery(table); } + @Test + public void testLookupWithDropDelete() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(CHANGELOG_PRODUCER, LOOKUP); + conf.set("num-levels", "2"); + }); + IOManager ioManager = IOManager.create(tablePath.toString()); + StreamTableWrite write = table.newWrite(commitUser).withIOManager(ioManager); + StreamTableCommit commit = table.newCommit(commitUser); + write.write(rowData(1, 1, 100L)); + write.write(rowData(1, 2, 200L)); + commit.commit(0, write.prepareCommit(true, 0)); + + // set num-levels = 2 to make sure that this delete can trigger compaction with drop delete + write.write(rowDataWithKind(RowKind.DELETE, 1, 1, 100L)); + commit.commit(1, write.prepareCommit(true, 0)); + write.close(); + commit.close(); + + Snapshot latestSnapshot = table.newSnapshotReader().snapshotManager().latestSnapshot(); + assertThat(latestSnapshot.commitKind()).isEqualTo(COMPACT); + assertThat(latestSnapshot.totalRecordCount()).isEqualTo(1); + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + binaryRow(1), + 0, + BATCH_ROW_TO_STRING)) + .isEqualTo( + Collections.singletonList( + "1|2|200|binary|varbinary|mapKey:mapVal|multiset")); + } + private void innerTestTableQuery(FileStoreTable table) throws Exception { IOManager ioManager = IOManager.create(tablePath.toString()); StreamTableWrite write = table.newWrite(commitUser).withIOManager(ioManager); diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/LookupCompactionTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/LookupCompactionTest.scala index c35cb2c461c0..4727df2ccd21 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/LookupCompactionTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/LookupCompactionTest.scala @@ -78,22 +78,6 @@ class LookupCompactionTest extends PaimonSparkTestBase { } } - test("Paimon lookup compaction: drop delete") { - - spark.sql( - s""" - |CREATE TABLE T (id INT, name STRING, count INT) - |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '1', 'changelog-producer' = 'lookup', 'num-levels' = '2') - |""".stripMargin) - val table = loadTable("T") - - spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 2)") - // set num-levels = 2 to make sure that this delete operation can trigger drop delete - spark.sql("DELETE FROM T WHERE id = 1") - - Assertions.assertEquals(1, table.snapshotManager().latestSnapshot().totalRecordCount()) - } - private def dataFileCount(files: Array[FileStatus]): Int = { files.count(f => f.getPath.getName.startsWith("data")) }