From 882c070ba52b1d93b0faf9299d07bbfb242eb059 Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Mon, 22 Jul 2024 14:28:51 +0800 Subject: [PATCH] branch support consumer --- .../flink/action/ConsumerActionITCase.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java index 89ae7767276a..6a17a4512de7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java @@ -151,16 +151,17 @@ public void testResetBranchConsumer() throws Exception { // use consumer streaming read table testStreamingRead( - "SELECT * FROM `" - + branchTableName - + "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */", - Arrays.asList( - changelogRow("+I", 1L, "Hi"), - changelogRow("+I", 2L, "Hello"), - changelogRow("+I", 3L, "Paimon"))) + "SELECT * FROM `" + + branchTableName + + "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */", + Arrays.asList( + changelogRow("+I", 1L, "Hi"), + changelogRow("+I", 2L, "Hello"), + changelogRow("+I", 3L, "Paimon"))) .close(); - ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location(), branchName); + ConsumerManager consumerManager = + new ConsumerManager(table.fileIO(), table.location(), branchName); Optional consumer1 = consumerManager.consumer("myid"); assertThat(consumer1).isPresent(); assertThat(consumer1.get().nextSnapshot()).isEqualTo(4); @@ -184,7 +185,8 @@ public void testResetBranchConsumer() throws Exception { } else { callProcedure( String.format( - "CALL sys.reset_consumer('%s.%s', 'myid', 1)", database, branchTableName)); + "CALL sys.reset_consumer('%s.%s', 'myid', 1)", + database, branchTableName)); } Optional consumer2 = consumerManager.consumer("myid"); assertThat(consumer2).isPresent(); @@ -195,10 +197,10 @@ public void testResetBranchConsumer() throws Exception { createAction(ResetConsumerAction.class, args.subList(0, 9)).run(); } else { callProcedure( - String.format("CALL sys.reset_consumer('%s.%s', 'myid')", database, branchTableName)); + String.format( + "CALL sys.reset_consumer('%s.%s', 'myid')", database, branchTableName)); } Optional consumer3 = consumerManager.consumer("myid"); assertThat(consumer3).isNotPresent(); - } }