From c4f1273a5ff0a42eca9f11397ba5d001a9acc4f7 Mon Sep 17 00:00:00 2001 From: LsomeYeah <94825748+LsomeYeah@users.noreply.github.com> Date: Thu, 7 Nov 2024 11:15:22 +0800 Subject: [PATCH] [core] throw exception if table is recreated when it still being read (#4445) This closes #4445. --- .../paimon/utils/NextSnapshotFetcher.java | 10 +++ .../flink/PrimaryKeyFileStoreTableITCase.java | 78 +++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java index 021673950d9c..d0a317df5379 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java @@ -45,8 +45,18 @@ public Snapshot getNextSnapshot(long nextSnapshotId) { } Long earliestSnapshotId = snapshotManager.earliestSnapshotId(); + Long latestSnapshotId = snapshotManager.latestSnapshotId(); // No snapshot now if (earliestSnapshotId == null || earliestSnapshotId <= nextSnapshotId) { + if ((earliestSnapshotId == null && nextSnapshotId > 1) + || (latestSnapshotId != null && nextSnapshotId > latestSnapshotId + 1)) { + throw new OutOfRangeException( + String.format( + "The next expected snapshot is too big! Most possible cause might be the table had been recreated." + + "The next snapshot id is %d, while the latest snapshot id is %s", + nextSnapshotId, latestSnapshotId)); + } + LOG.debug( "Next snapshot id {} does not exist, wait for the snapshot generation.", nextSnapshotId); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index b046ebeda0f8..027eada9224c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -37,6 +37,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.nio.file.Files; @@ -52,6 +54,7 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; /** Tests for changelog table with primary keys. */ public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase { @@ -335,6 +338,81 @@ public void testBatchJobWithConflictAndRestart() throws Exception { } } + @Timeout(60) + @ParameterizedTest() + @ValueSource(booleans = {false, true}) + public void testRecreateTableWithException(boolean isReloadData) throws Exception { + TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); + bEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + bEnv.executeSql("USE CATALOG testCatalog"); + bEnv.executeSql( + "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) " + + "PARTITIONED BY (pt) " + + "WITH (" + + " 'bucket' = '2'\n" + + " ,'continuous.discovery-interval' = '1s'\n" + + ")"); + + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .parallelism(4) + .checkpointIntervalMs(1000) + .build(); + sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + sEnv.executeSql("USE CATALOG testCatalog"); + CloseableIterator it = sEnv.executeSql("SELECT * FROM t").collect(); + + // first write + List values = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + values.add(String.format("(0, %d, %d)", i, i)); + values.add(String.format("(1, %d, %d)", i, i)); + } + bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await(); + List expected = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + expected.add(Row.ofKind(RowKind.INSERT, 0, i, i)); + expected.add(Row.ofKind(RowKind.INSERT, 1, i, i)); + } + assertStreamingResult(it, expected); + + // second write + values.clear(); + for (int i = 0; i < 10; i++) { + values.add(String.format("(0, %d, %d)", i, i + 1)); + values.add(String.format("(1, %d, %d)", i, i + 1)); + } + bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await(); + + // start a read job + for (int i = 0; i < 10; i++) { + expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i)); + expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i)); + expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 1)); + expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 1, i, i + 1)); + } + assertStreamingResult(it, expected.subList(20, 60)); + + // delete table and recreate a same table + bEnv.executeSql("DROP TABLE t"); + bEnv.executeSql( + "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) " + + "PARTITIONED BY (pt) " + + "WITH (" + + " 'bucket' = '2'\n" + + ")"); + + // if reload data, it will generate a new snapshot for recreated table + if (isReloadData) { + bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", values)).await(); + } + assertThatCode(it::next) + .rootCause() + .hasMessageContaining( + "The next expected snapshot is too big! Most possible cause might be the table had been recreated."); + } + @Test @Timeout(120) public void testChangelogCompactInBatchWrite() throws Exception {