Skip to content

Commit

Permalink
[flink] Try to make LookupJoinITCase stable
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Jan 10, 2024
1 parent 3e8e1c0 commit 9aaea21
Showing 1 changed file with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -37,7 +38,7 @@ public class LookupJoinITCase extends CatalogITCaseBase {

@Override
public List<String> ddl() {
return Arrays.asList("CREATE TABLE T (i INT, `proctime` AS PROCTIME())");
return Collections.singletonList("CREATE TABLE T (i INT, `proctime` AS PROCTIME())");
}

@Override
Expand All @@ -53,10 +54,9 @@ private void initTable(LookupCacheMode cacheMode) {
"CREATE TABLE PARTITIONED_DIM (i INT, j INT, k1 INT, k2 INT, PRIMARY KEY (i, j) NOT ENFORCED)"
+ "PARTITIONED BY (`i`) WITH ('continuous.discovery-interval'='1 ms' %s)";

String fullOption =
", 'changelog-producer'='lookup', 'lookup.cache' = 'full', 'bucket' = '2'";
String fullOption = ", 'lookup.cache' = 'full'";

String lruOption = ", 'changelog-producer'='lookup', 'bucket' = '2'";
String lruOption = ", 'changelog-producer'='lookup'";

switch (cacheMode) {
case FULL:
Expand Down Expand Up @@ -577,20 +577,19 @@ public void testAsyncRetryLookup(LookupCacheMode cacheMode) throws Exception {

String query =
"SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss',"
+ " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='1s','max-attempts'='60') */"
+ " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */"
+ " T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.i";
BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());

sql("INSERT INTO T VALUES (3)");
sql("INSERT INTO T VALUES (2)");
sql("INSERT INTO T VALUES (1)");
Thread.sleep(2000); // wait
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of(1, 11, 111, 1111), Row.of(2, 22, 222, 2222));

sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)");
Thread.sleep(2000); // wait
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3, 33, 333, 3333));
assertThat(iterator.collect(1, 10, TimeUnit.MINUTES))
.containsExactlyInAnyOrder(Row.of(3, 33, 333, 3333));

iterator.close();
}
Expand Down Expand Up @@ -700,14 +699,13 @@ public void testAsyncRetryLookupWithSequenceField() throws Exception {

String query =
"SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss',"
+ " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='1s','max-attempts'='60') */"
+ " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */"
+ " T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_WITH_SEQUENCE /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.i";
BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());

sql("INSERT INTO T VALUES (3)");
sql("INSERT INTO T VALUES (2)");
sql("INSERT INTO T VALUES (1)");
Thread.sleep(2000); // wait
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of(1, 11, 111, 1111), Row.of(2, 22, 222, 2222));

Expand All @@ -726,18 +724,16 @@ public void testAsyncRetryLookupSecKeyWithSequenceField() throws Exception {

String query =
"SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss',"
+ " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='1s','max-attempts'='60') */"
+ " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */"
+ " T.i, D.i, D.j, D.k2 FROM T LEFT JOIN DIM_WITH_SEQUENCE /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.k1";
BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());

sql("INSERT INTO T VALUES (111)");
sql("INSERT INTO T VALUES (333)");
Thread.sleep(2000); // wait
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of(111, 1, 1, 1111), Row.of(111, 2, 2, 2222));

sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (2, 1, 111, 3333), (3, 3, 333, 3333)");
Thread.sleep(2000); // wait
sql("INSERT INTO T VALUES (111)");
assertThat(iterator.collect(3))
.containsExactlyInAnyOrder(
Expand Down

0 comments on commit 9aaea21

Please sign in to comment.