From 645272177d9d7c01a895da969a34a993f52772ec Mon Sep 17 00:00:00 2001 From: zhangdong Date: Wed, 13 Nov 2024 17:05:24 +0800 Subject: [PATCH] [core] Partitions system table support time travel (#4511) --- .../paimon/table/system/PartitionsTable.java | 2 +- .../table/system/PartitionsTableTest.java | 24 ++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index 76c0768eeeac..4aaeee425479 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -176,7 +176,7 @@ public RecordReader createReader(Split split) throws IOException { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - List partitions = fileStoreTable.newSnapshotReader().partitionEntries(); + List partitions = fileStoreTable.newScan().listPartitionEntries(); RowDataToObjectArrayConverter converter = new RowDataToObjectArrayConverter( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java index a17dc75466a6..8d12dc707bf5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -76,7 +77,7 @@ public void before() throws Exception { partitionsTable = (PartitionsTable) catalog.getTable(filesTableId); // snapshot 1: append - write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 5)); + write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 3, 5)); write(table, GenericRow.of(1, 1, 3), GenericRow.of(1, 2, 4)); } @@ -85,19 +86,36 @@ public void before() throws Exception { public void testPartitionRecordCount() throws Exception { List expectedRow = new ArrayList<>(); expectedRow.add(GenericRow.of(BinaryString.fromString("[1]"), 2L)); - expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 2L)); + expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 1L)); + expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L)); // Only read partition and record count, record size may not stable. List result = read(partitionsTable, new int[][] {{0}, {1}}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } + @Test + public void testPartitionTimeTravel() throws Exception { + List expectedRow = new ArrayList<>(); + expectedRow.add(GenericRow.of(BinaryString.fromString("[1]"), 1L)); + expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L)); + + // Only read partition and record count, record size may not stable. + List result = + read( + partitionsTable.copy( + Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), "1")), + new int[][] {{0}, {1}}); + assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); + } + @Test public void testPartitionValue() throws Exception { write(table, GenericRow.of(2, 1, 3), GenericRow.of(3, 1, 4)); List expectedRow = new ArrayList<>(); expectedRow.add(GenericRow.of(BinaryString.fromString("[1]"), 4L, 3L)); - expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 2L, 2L)); + expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 1L, 1L)); + expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L, 1L)); List result = read(partitionsTable, new int[][] {{0}, {1}, {3}}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);