Skip to content

Commit

Permalink
update for comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Feb 19, 2024
1 parent 35c5fdd commit 265d613
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.KeyValue;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
Expand Down Expand Up @@ -80,6 +81,7 @@
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -1165,6 +1167,43 @@ public void testTableQueryForNormal() throws Exception {
innerTestTableQuery(table);
}

@Test
public void testLookupWithDropDelete() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(CHANGELOG_PRODUCER, LOOKUP);
conf.set("num-levels", "2");
});
IOManager ioManager = IOManager.create(tablePath.toString());
StreamTableWrite write = table.newWrite(commitUser).withIOManager(ioManager);
StreamTableCommit commit = table.newCommit(commitUser);
write.write(rowData(1, 1, 100L));
write.write(rowData(1, 2, 200L));
commit.commit(0, write.prepareCommit(true, 0));

// set num-levels = 2 to make sure that this delete can trigger compaction with drop delete
write.write(rowDataWithKind(RowKind.DELETE, 1, 1, 100L));
commit.commit(1, write.prepareCommit(true, 0));
write.close();
commit.close();

Snapshot latestSnapshot = table.newSnapshotReader().snapshotManager().latestSnapshot();
assertThat(latestSnapshot.commitKind()).isEqualTo(COMPACT);
assertThat(latestSnapshot.totalRecordCount()).isEqualTo(1);

assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader().read().dataSplits()),
binaryRow(1),
0,
BATCH_ROW_TO_STRING))
.isEqualTo(
Collections.singletonList(
"1|2|200|binary|varbinary|mapKey:mapVal|multiset"));
}

private void innerTestTableQuery(FileStoreTable table) throws Exception {
IOManager ioManager = IOManager.create(tablePath.toString());
StreamTableWrite write = table.newWrite(commitUser).withIOManager(ioManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,6 @@ class LookupCompactionTest extends PaimonSparkTestBase {
}
}

test("Paimon lookup compaction: drop delete") {

spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING, count INT)
|TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '1', 'changelog-producer' = 'lookup', 'num-levels' = '2')
|""".stripMargin)
val table = loadTable("T")

spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 2)")
// set num-levels = 2 to make sure that this delete operation can trigger drop delete
spark.sql("DELETE FROM T WHERE id = 1")

Assertions.assertEquals(1, table.snapshotManager().latestSnapshot().totalRecordCount())
}

private def dataFileCount(files: Array[FileStatus]): Int = {
files.count(f => f.getPath.getName.startsWith("data"))
}
Expand Down

0 comments on commit 265d613

Please sign in to comment.