From 63551bbda17518e28655452034cbd1676729c1b4 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Mon, 25 Nov 2024 10:58:48 +0800 Subject: [PATCH] [flink][hotfix] Wait for consumer reset before job close (#4578) --- .../apache/paimon/flink/BranchSqlITCase.java | 13 +++++++-- .../paimon/flink/CatalogTableITCase.java | 12 +++++++- .../flink/ContinuousFileStoreITCase.java | 9 +++++- .../flink/action/ConsumerActionITCase.java | 28 ++++++++++++++----- 4 files changed, 51 insertions(+), 11 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index c25d99cb4459..2566fbe92e4c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.util.ArrayList; @@ -440,6 +441,7 @@ public void testBranchTagsTable() throws Exception { } @Test + @Timeout(60) public void testBranchConsumersTable() throws Exception { sql("CREATE TABLE t (a INT, b INT)"); sql("INSERT INTO t VALUES (1, 2), (3,4)"); @@ -451,11 +453,18 @@ public void testBranchConsumersTable() throws Exception { "SELECT * FROM t$branch_b1 /*+ OPTIONS('consumer-id'='id1','consumer.expiration-time'='3h') */")); sql("INSERT INTO t$branch_b1 VALUES (5, 6), (7, 8)"); assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(5, 6), Row.of(7, 8)); + List branchResult; + do { + branchResult = collectResult("SELECT * FROM t$branch_b1$consumers"); + if (!branchResult.isEmpty()) { + break; + } + Thread.sleep(1000); + } while (true); iterator.close(); assertThat(collectResult("SELECT * FROM t$consumers")).isEmpty(); - assertThat(collectResult("SELECT * FROM t$branch_b1$consumers")) - .containsExactlyInAnyOrder("+I[id1, 2]"); + assertThat(branchResult).containsExactlyInAnyOrder("+I[id1, 2]"); assertThat(collectResult("SELECT * FROM t$consumers /*+ OPTIONS('branch'='b1') */")) .containsExactlyInAnyOrder("+I[id1, 2]"); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 8a3e068a72a0..2a855796d8d4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import javax.annotation.Nonnull; @@ -940,6 +941,7 @@ public void testTagsTable() throws Exception { } @Test + @Timeout(60) public void testConsumersTable() throws Exception { batchSql("CREATE TABLE T (a INT, b INT)"); batchSql("INSERT INTO T VALUES (1, 2)"); @@ -952,9 +954,17 @@ public void testConsumersTable() throws Exception { batchSql("INSERT INTO T VALUES (5, 6), (7, 8)"); assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1, 2), Row.of(3, 4)); + + List result; + do { + result = sql("SELECT * FROM T$consumers"); + if (!result.isEmpty()) { + break; + } + Thread.sleep(1000); + } while (true); iterator.close(); - List result = sql("SELECT * FROM T$consumers"); assertThat(result).hasSize(1); assertThat(result.get(0).getField(0)).isEqualTo("my1"); assertThat((Long) result.get(0).getField(1)).isGreaterThanOrEqualTo(3); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index 2e15697511dd..b44885832804 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -120,7 +120,14 @@ public void testConsumerId() throws Exception { assertThat(iterator.collect(2)) .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6")); - Thread.sleep(1000); + List result; + do { + result = sql("SELECT * FROM %s$consumers", table); + if (!result.isEmpty()) { + break; + } + Thread.sleep(1000); + } while (true); iterator.close(); iterator = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java index e2243ddf269a..6fb8c81eb744 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java @@ -26,8 +26,11 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.table.api.TableException; +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -46,6 +49,7 @@ public class ConsumerActionITCase extends ActionITCaseBase { @ParameterizedTest + @Timeout(60) @ValueSource(strings = {"action", "procedure_indexed", "procedure_named"}) public void testResetConsumer(String invoker) throws Exception { init(warehouse); @@ -72,18 +76,22 @@ public void testResetConsumer(String invoker) throws Exception { writeData(rowData(3L, BinaryString.fromString("Paimon"))); // use consumer streaming read table - testStreamingRead( + BlockingIterator iterator = + testStreamingRead( "SELECT * FROM `" + tableName + "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */", Arrays.asList( changelogRow("+I", 1L, "Hi"), changelogRow("+I", 2L, "Hello"), - changelogRow("+I", 3L, "Paimon"))) - .close(); + changelogRow("+I", 3L, "Paimon"))); - Thread.sleep(1000); ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location()); + while (!consumerManager.consumer("myid").isPresent()) { + Thread.sleep(1000); + } + iterator.close(); + Optional consumer1 = consumerManager.consumer("myid"); assertThat(consumer1).isPresent(); assertThat(consumer1.get().nextSnapshot()).isEqualTo(4); @@ -191,6 +199,7 @@ public void testResetConsumer(String invoker) throws Exception { } @ParameterizedTest + @Timeout(60) @ValueSource(strings = {"action", "procedure_indexed", "procedure_named"}) public void testResetBranchConsumer(String invoker) throws Exception { init(warehouse); @@ -222,18 +231,23 @@ public void testResetBranchConsumer(String invoker) throws Exception { String branchTableName = tableName + "$branch_b1"; // use consumer streaming read table - testStreamingRead( + BlockingIterator iterator = + testStreamingRead( "SELECT * FROM `" + branchTableName + "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */", Arrays.asList( changelogRow("+I", 1L, "Hi"), changelogRow("+I", 2L, "Hello"), - changelogRow("+I", 3L, "Paimon"))) - .close(); + changelogRow("+I", 3L, "Paimon"))); ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location(), branchName); + while (!consumerManager.consumer("myid").isPresent()) { + Thread.sleep(1000); + } + iterator.close(); + Optional consumer1 = consumerManager.consumer("myid"); assertThat(consumer1).isPresent(); assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);