diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index 32e61903cb09..4fe6b3ec14e9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -250,11 +250,15 @@ public DataStream build() { if (env == null) { throw new IllegalArgumentException("StreamExecutionEnvironment should not be null."); } + if (conf.contains(CoreOptions.CONSUMER_ID) + && !conf.contains(CoreOptions.CONSUMER_EXPIRATION_TIME)) { + throw new IllegalArgumentException( + "consumer.expiration-time should be specified when using consumer-id."); + } if (sourceBounded) { return buildStaticFileSource(); } - TableScanUtils.streamingReadingValidate(table); // TODO visit all options through CoreOptions diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 83a89a6bbf8e..a9ce8135b848 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -742,7 +742,8 @@ public void testConsumersTable() throws Exception { BlockingIterator iterator = BlockingIterator.of( - streamSqlIter("SELECT * FROM T /*+ OPTIONS('consumer-id'='my1') */")); + streamSqlIter( + "SELECT * FROM T /*+ OPTIONS('consumer-id'='my1','consumer.expiration-time'='3h') */")); batchSql("INSERT INTO T VALUES (5, 6), (7, 8)"); assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1, 2), Row.of(3, 4)); @@ -754,6 +755,34 @@ public void testConsumersTable() throws Exception { assertThat((Long) result.get(0).getField(1)).isGreaterThanOrEqualTo(3); } + @Test + public void testConsumerIdExpInBatchMode() { + batchSql("CREATE TABLE T (a INT, b INT)"); + batchSql("INSERT INTO T VALUES (1, 2)"); + batchSql("INSERT INTO T VALUES (3, 4)"); + batchSql("INSERT INTO T VALUES (5, 6), (7, 8)"); + assertThatThrownBy( + () -> + sql( + "SELECT * FROM T /*+ OPTIONS('consumer-id' = 'test-id') */ WHERE a = 1")) + .rootCause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("consumer.expiration-time should be specified."); + } + + @Test + public void testConsumerIdExpInStreamingMode() { + batchSql("CREATE TABLE T (a INT, b INT)"); + batchSql("INSERT INTO T VALUES (1, 2)"); + batchSql("INSERT INTO T VALUES (3, 4)"); + assertThatThrownBy( + () -> + streamSqlIter( + "SELECT * FROM T /*+ OPTIONS('consumer-id'='test-id') */")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("consumer.expiration-time should be specified."); + } + @Test public void testPartitionsTable() { String table = "PARTITIONS_TABLE"; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index 54c193801fc8..a0ce8be3af7f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -138,7 +138,8 @@ public void testConsumerId() throws Exception { BlockingIterator iterator = BlockingIterator.of( streamSqlIter( - "SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table)); + "SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */", + table)); batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table); assertThat(iterator.collect(2)) @@ -150,7 +151,8 @@ public void testConsumerId() throws Exception { iterator = BlockingIterator.of( streamSqlIter( - "SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table)); + "SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */", + table)); batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table); assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9")); iterator.close(); @@ -164,7 +166,8 @@ public void testConsumerIdInBatch() throws Exception { BlockingIterator iterator = BlockingIterator.of( streamSqlIter( - "SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table)); + "SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */", + table)); assertThat(iterator.collect(2)) .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6")); @@ -174,7 +177,10 @@ public void testConsumerIdInBatch() throws Exception { batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table); // ignore the consumer id in batch mode - assertThat(sql("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table)) + assertThat( + sql( + "SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */", + table)) .containsExactlyInAnyOrder( Row.of("1", "2", "3"), Row.of("4", "5", "6"), Row.of("7", "8", "9")); } @@ -188,7 +194,8 @@ public void testSnapshotWatermark() throws Exception { CloseableIterator insert1 = streamSqlIter("INSERT INTO T2 SELECT a, b, c FROM gen"); sql("CREATE TABLE WT (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"); CloseableIterator insert2 = - streamSqlIter("INSERT INTO WT SELECT * FROM T2 /*+ OPTIONS('consumer-id'='me') */"); + streamSqlIter( + "INSERT INTO WT SELECT * FROM T2 /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */"); while (true) { Set watermarks = sql("SELECT `watermark` FROM WT$snapshots").stream() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java index 82503e5c3831..4818c97e64d2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java @@ -70,7 +70,9 @@ public void testResetConsumer() throws Exception { // use consumer streaming read table testStreamingRead( - "SELECT * FROM `" + tableName + "` /*+ OPTIONS('consumer-id'='myid') */", + "SELECT * FROM `" + + tableName + + "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */", Arrays.asList( changelogRow("+I", 1L, "Hi"), changelogRow("+I", 2L, "Hello"), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java index e7d05404a1cc..5bdc27dd9bf1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java @@ -111,7 +111,7 @@ public void testNumRecordsInWithConsumerId() throws Exception { "CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector' = 'blackhole' )"); TableResult tableResult = tEnv.executeSql( - "INSERT INTO B SELECT * FROM T /*+ OPTIONS('consumer-id' = 'test') */"); + "INSERT INTO B SELECT * FROM T /*+ OPTIONS('consumer-id' = 'test','consumer.expiration-time'='3h') */"); JobClient client = tableResult.getJobClient().get(); JobID jobId = client.getJobID();