Skip to content

Commit

Permalink
[flink] Fix lookup join with lookup.dynamic-partition and cross-parti…
Browse files Browse the repository at this point in the history
…tion table throws ArrayIndexOutOfBoundsException
  • Loading branch information
yuzelin committed Aug 1, 2024
1 parent 4154c2b commit a7f79e2
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ public void open() {
this.comparator = CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
}

public void addJoinKeys(List<String> joinKeys) {
public void addPartitionKeysTo(List<String> joinKeys, List<String> projectFields) {
List<String> partitionKeys = table.partitionKeys();
checkArgument(joinKeys.stream().noneMatch(partitionKeys::contains));
joinKeys.addAll(partitionKeys);

partitionKeys.stream().filter(k -> !projectFields.contains(k)).forEach(projectFields::add);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ public FileStoreLookupFunction(
.mapToObj(i -> table.rowType().getFieldNames().get(projection[i]))
.collect(Collectors.toList());

if (partitionLoader != null) {
partitionLoader.addJoinKeys(joinKeys);
}

this.projectFields =
Arrays.stream(projection)
.mapToObj(i -> table.rowType().getFieldNames().get(i))
Expand All @@ -125,6 +121,10 @@ public FileStoreLookupFunction(
}
}

if (partitionLoader != null) {
partitionLoader.addPartitionKeysTo(joinKeys, projectFields);
}

this.predicate = predicate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,25 +641,28 @@ public void testLookupPartitionedTable() throws Exception {
iterator.close();
}

@Test
public void testLookupMaxPtPartitionedTablePartialCache() throws Exception {
innerTestLookupMaxPtPartitionedTable(LookupCacheMode.AUTO);
}

@Test
public void testLookupMaxPtPartitionedTableFullCache() throws Exception {
innerTestLookupMaxPtPartitionedTable(LookupCacheMode.FULL);
}

private void innerTestLookupMaxPtPartitionedTable(LookupCacheMode mode) throws Exception {
tEnv.executeSql(
"CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
@ParameterizedTest
@EnumSource(LookupCacheMode.class)
public void testLookupMaxPtPartitionedTable(LookupCacheMode mode) throws Exception {
boolean testDynamicBucket = ThreadLocalRandom.current().nextBoolean();
String primaryKeys;
String bucket;
if (testDynamicBucket) {
primaryKeys = "k";
bucket = "-1";
} else {
primaryKeys = "pt, k";
bucket = "1";
}
sql(
"CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (%s) NOT ENFORCED)"
+ "PARTITIONED BY (`pt`) WITH ("
+ "'bucket' = '1', "
+ "'bucket' = '%s', "
+ "'lookup.dynamic-partition' = 'max_pt()', "
+ "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ String.format("'lookup.cache' = '%s', ", mode)
+ "'continuous.discovery-interval'='1 ms')");
+ "'lookup.cache' = '%s', "
+ "'continuous.discovery-interval'='1 ms')",
primaryKeys, bucket, mode);
String query =
"SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for system_time as of T.proctime AS D ON T.i = D.k";
BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
Expand Down

0 comments on commit a7f79e2

Please sign in to comment.