Skip to content

Commit

Permalink
[flink] Consumer expiration time must be specified when using consume…
Browse files Browse the repository at this point in the history
…r-id (#3736)

This closes #3736.
  • Loading branch information
discivigour authored Jul 15, 2024
1 parent 99be282 commit ebb05fc
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,15 @@ public DataStream<RowData> build() {
if (env == null) {
throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
}
if (conf.contains(CoreOptions.CONSUMER_ID)
&& !conf.contains(CoreOptions.CONSUMER_EXPIRATION_TIME)) {
throw new IllegalArgumentException(
"consumer.expiration-time should be specified when using consumer-id.");
}

if (sourceBounded) {
return buildStaticFileSource();
}

TableScanUtils.streamingReadingValidate(table);

// TODO visit all options through CoreOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,8 @@ public void testConsumersTable() throws Exception {

BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter("SELECT * FROM T /*+ OPTIONS('consumer-id'='my1') */"));
streamSqlIter(
"SELECT * FROM T /*+ OPTIONS('consumer-id'='my1','consumer.expiration-time'='3h') */"));

batchSql("INSERT INTO T VALUES (5, 6), (7, 8)");
assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1, 2), Row.of(3, 4));
Expand All @@ -754,6 +755,34 @@ public void testConsumersTable() throws Exception {
assertThat((Long) result.get(0).getField(1)).isGreaterThanOrEqualTo(3);
}

@Test
public void testConsumerIdExpInBatchMode() {
batchSql("CREATE TABLE T (a INT, b INT)");
batchSql("INSERT INTO T VALUES (1, 2)");
batchSql("INSERT INTO T VALUES (3, 4)");
batchSql("INSERT INTO T VALUES (5, 6), (7, 8)");
assertThatThrownBy(
() ->
sql(
"SELECT * FROM T /*+ OPTIONS('consumer-id' = 'test-id') */ WHERE a = 1"))
.rootCause()
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("consumer.expiration-time should be specified.");
}

@Test
public void testConsumerIdExpInStreamingMode() {
batchSql("CREATE TABLE T (a INT, b INT)");
batchSql("INSERT INTO T VALUES (1, 2)");
batchSql("INSERT INTO T VALUES (3, 4)");
assertThatThrownBy(
() ->
streamSqlIter(
"SELECT * FROM T /*+ OPTIONS('consumer-id'='test-id') */"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("consumer.expiration-time should be specified.");
}

@Test
public void testPartitionsTable() {
String table = "PARTITIONS_TABLE";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ public void testConsumerId() throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table));
"SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
table));

batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
assertThat(iterator.collect(2))
Expand All @@ -150,7 +151,8 @@ public void testConsumerId() throws Exception {
iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table));
"SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
table));
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
iterator.close();
Expand All @@ -164,7 +166,8 @@ public void testConsumerIdInBatch() throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table));
"SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
table));

assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
Expand All @@ -174,7 +177,10 @@ public void testConsumerIdInBatch() throws Exception {

batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
// ignore the consumer id in batch mode
assertThat(sql("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table))
assertThat(
sql(
"SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
table))
.containsExactlyInAnyOrder(
Row.of("1", "2", "3"), Row.of("4", "5", "6"), Row.of("7", "8", "9"));
}
Expand All @@ -188,7 +194,8 @@ public void testSnapshotWatermark() throws Exception {
CloseableIterator<Row> insert1 = streamSqlIter("INSERT INTO T2 SELECT a, b, c FROM gen");
sql("CREATE TABLE WT (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)");
CloseableIterator<Row> insert2 =
streamSqlIter("INSERT INTO WT SELECT * FROM T2 /*+ OPTIONS('consumer-id'='me') */");
streamSqlIter(
"INSERT INTO WT SELECT * FROM T2 /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */");
while (true) {
Set<Long> watermarks =
sql("SELECT `watermark` FROM WT$snapshots").stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void testResetConsumer() throws Exception {

// use consumer streaming read table
testStreamingRead(
"SELECT * FROM `" + tableName + "` /*+ OPTIONS('consumer-id'='myid') */",
"SELECT * FROM `"
+ tableName
+ "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
Arrays.asList(
changelogRow("+I", 1L, "Hi"),
changelogRow("+I", 2L, "Hello"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testNumRecordsInWithConsumerId() throws Exception {
"CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector' = 'blackhole' )");
TableResult tableResult =
tEnv.executeSql(
"INSERT INTO B SELECT * FROM T /*+ OPTIONS('consumer-id' = 'test') */");
"INSERT INTO B SELECT * FROM T /*+ OPTIONS('consumer-id' = 'test','consumer.expiration-time'='3h') */");
JobClient client = tableResult.getJobClient().get();
JobID jobId = client.getJobID();

Expand Down

0 comments on commit ebb05fc

Please sign in to comment.