Skip to content

Commit

Permalink
[flink][hotfix] Wait for consumer reset before job close (#4578)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub authored Nov 25, 2024
1 parent b20907e commit 63551bb
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -440,6 +441,7 @@ public void testBranchTagsTable() throws Exception {
}

@Test
@Timeout(60)
public void testBranchConsumersTable() throws Exception {
sql("CREATE TABLE t (a INT, b INT)");
sql("INSERT INTO t VALUES (1, 2), (3,4)");
Expand All @@ -451,11 +453,18 @@ public void testBranchConsumersTable() throws Exception {
"SELECT * FROM t$branch_b1 /*+ OPTIONS('consumer-id'='id1','consumer.expiration-time'='3h') */"));
sql("INSERT INTO t$branch_b1 VALUES (5, 6), (7, 8)");
assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(5, 6), Row.of(7, 8));
List<String> branchResult;
do {
branchResult = collectResult("SELECT * FROM t$branch_b1$consumers");
if (!branchResult.isEmpty()) {
break;
}
Thread.sleep(1000);
} while (true);
iterator.close();

assertThat(collectResult("SELECT * FROM t$consumers")).isEmpty();
assertThat(collectResult("SELECT * FROM t$branch_b1$consumers"))
.containsExactlyInAnyOrder("+I[id1, 2]");
assertThat(branchResult).containsExactlyInAnyOrder("+I[id1, 2]");
assertThat(collectResult("SELECT * FROM t$consumers /*+ OPTIONS('branch'='b1') */"))
.containsExactlyInAnyOrder("+I[id1, 2]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -940,6 +941,7 @@ public void testTagsTable() throws Exception {
}

@Test
@Timeout(60)
public void testConsumersTable() throws Exception {
batchSql("CREATE TABLE T (a INT, b INT)");
batchSql("INSERT INTO T VALUES (1, 2)");
Expand All @@ -952,9 +954,17 @@ public void testConsumersTable() throws Exception {

batchSql("INSERT INTO T VALUES (5, 6), (7, 8)");
assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1, 2), Row.of(3, 4));

List<Row> result;
do {
result = sql("SELECT * FROM T$consumers");
if (!result.isEmpty()) {
break;
}
Thread.sleep(1000);
} while (true);
iterator.close();

List<Row> result = sql("SELECT * FROM T$consumers");
assertThat(result).hasSize(1);
assertThat(result.get(0).getField(0)).isEqualTo("my1");
assertThat((Long) result.get(0).getField(1)).isGreaterThanOrEqualTo(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,14 @@ public void testConsumerId() throws Exception {
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));

Thread.sleep(1000);
List<Row> result;
do {
result = sql("SELECT * FROM %s$consumers", table);
if (!result.isEmpty()) {
break;
}
Thread.sleep(1000);
} while (true);
iterator.close();

iterator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BlockingIterator;

import org.apache.flink.table.api.TableException;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand All @@ -46,6 +49,7 @@
public class ConsumerActionITCase extends ActionITCaseBase {

@ParameterizedTest
@Timeout(60)
@ValueSource(strings = {"action", "procedure_indexed", "procedure_named"})
public void testResetConsumer(String invoker) throws Exception {
init(warehouse);
Expand All @@ -72,18 +76,22 @@ public void testResetConsumer(String invoker) throws Exception {
writeData(rowData(3L, BinaryString.fromString("Paimon")));

// use consumer streaming read table
testStreamingRead(
BlockingIterator<Row, Row> iterator =
testStreamingRead(
"SELECT * FROM `"
+ tableName
+ "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
Arrays.asList(
changelogRow("+I", 1L, "Hi"),
changelogRow("+I", 2L, "Hello"),
changelogRow("+I", 3L, "Paimon")))
.close();
changelogRow("+I", 3L, "Paimon")));

Thread.sleep(1000);
ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location());
while (!consumerManager.consumer("myid").isPresent()) {
Thread.sleep(1000);
}
iterator.close();

Optional<Consumer> consumer1 = consumerManager.consumer("myid");
assertThat(consumer1).isPresent();
assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
Expand Down Expand Up @@ -191,6 +199,7 @@ public void testResetConsumer(String invoker) throws Exception {
}

@ParameterizedTest
@Timeout(60)
@ValueSource(strings = {"action", "procedure_indexed", "procedure_named"})
public void testResetBranchConsumer(String invoker) throws Exception {
init(warehouse);
Expand Down Expand Up @@ -222,18 +231,23 @@ public void testResetBranchConsumer(String invoker) throws Exception {
String branchTableName = tableName + "$branch_b1";

// use consumer streaming read table
testStreamingRead(
BlockingIterator<Row, Row> iterator =
testStreamingRead(
"SELECT * FROM `"
+ branchTableName
+ "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
Arrays.asList(
changelogRow("+I", 1L, "Hi"),
changelogRow("+I", 2L, "Hello"),
changelogRow("+I", 3L, "Paimon")))
.close();
changelogRow("+I", 3L, "Paimon")));

ConsumerManager consumerManager =
new ConsumerManager(table.fileIO(), table.location(), branchName);
while (!consumerManager.consumer("myid").isPresent()) {
Thread.sleep(1000);
}
iterator.close();

Optional<Consumer> consumer1 = consumerManager.consumer("myid");
assertThat(consumer1).isPresent();
assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
Expand Down

0 comments on commit 63551bb

Please sign in to comment.