Skip to content

Commit

Permalink
branch support consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Jul 22, 2024
1 parent cd91585 commit 882c070
Showing 1 changed file with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,17 @@ public void testResetBranchConsumer() throws Exception {

// use consumer streaming read table
testStreamingRead(
"SELECT * FROM `"
+ branchTableName
+ "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
Arrays.asList(
changelogRow("+I", 1L, "Hi"),
changelogRow("+I", 2L, "Hello"),
changelogRow("+I", 3L, "Paimon")))
"SELECT * FROM `"
+ branchTableName
+ "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
Arrays.asList(
changelogRow("+I", 1L, "Hi"),
changelogRow("+I", 2L, "Hello"),
changelogRow("+I", 3L, "Paimon")))
.close();

ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location(), branchName);
ConsumerManager consumerManager =
new ConsumerManager(table.fileIO(), table.location(), branchName);
Optional<Consumer> consumer1 = consumerManager.consumer("myid");
assertThat(consumer1).isPresent();
assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
Expand All @@ -184,7 +185,8 @@ public void testResetBranchConsumer() throws Exception {
} else {
callProcedure(
String.format(
"CALL sys.reset_consumer('%s.%s', 'myid', 1)", database, branchTableName));
"CALL sys.reset_consumer('%s.%s', 'myid', 1)",
database, branchTableName));
}
Optional<Consumer> consumer2 = consumerManager.consumer("myid");
assertThat(consumer2).isPresent();
Expand All @@ -195,10 +197,10 @@ public void testResetBranchConsumer() throws Exception {
createAction(ResetConsumerAction.class, args.subList(0, 9)).run();
} else {
callProcedure(
String.format("CALL sys.reset_consumer('%s.%s', 'myid')", database, branchTableName));
String.format(
"CALL sys.reset_consumer('%s.%s', 'myid')", database, branchTableName));
}
Optional<Consumer> consumer3 = consumerManager.consumer("myid");
assertThat(consumer3).isNotPresent();

}
}

0 comments on commit 882c070

Please sign in to comment.