From a7f79e2ec5d9a88f167bda15b4c4697bb50329d2 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Thu, 1 Aug 2024 18:19:38 +0800 Subject: [PATCH] [flink] Fix lookup join with lookup.dynamic-partition and cross-partition table throws ArrayIndexOutOfBoundsException --- .../flink/lookup/DynamicPartitionLoader.java | 4 ++- .../flink/lookup/FileStoreLookupFunction.java | 8 ++--- .../apache/paimon/flink/LookupJoinITCase.java | 35 ++++++++++--------- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java index aa80dc657425..a8660ee8c4d5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java @@ -66,10 +66,12 @@ public void open() { this.comparator = CodeGenUtils.newRecordComparator(partitionType.getFieldTypes()); } - public void addJoinKeys(List joinKeys) { + public void addPartitionKeysTo(List joinKeys, List projectFields) { List partitionKeys = table.partitionKeys(); checkArgument(joinKeys.stream().noneMatch(partitionKeys::contains)); joinKeys.addAll(partitionKeys); + + partitionKeys.stream().filter(k -> !projectFields.contains(k)).forEach(projectFields::add); } @Nullable diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 65c9c7302f0c..01ebbde20154 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -109,10 +109,6 @@ public FileStoreLookupFunction( .mapToObj(i -> table.rowType().getFieldNames().get(projection[i])) .collect(Collectors.toList()); - if (partitionLoader != null) { - partitionLoader.addJoinKeys(joinKeys); - } - this.projectFields = Arrays.stream(projection) .mapToObj(i -> table.rowType().getFieldNames().get(i)) @@ -125,6 +121,10 @@ public FileStoreLookupFunction( } } + if (partitionLoader != null) { + partitionLoader.addPartitionKeysTo(joinKeys, projectFields); + } + this.predicate = predicate; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 26982e44aa7f..5b35cb131393 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -641,25 +641,28 @@ public void testLookupPartitionedTable() throws Exception { iterator.close(); } - @Test - public void testLookupMaxPtPartitionedTablePartialCache() throws Exception { - innerTestLookupMaxPtPartitionedTable(LookupCacheMode.AUTO); - } - - @Test - public void testLookupMaxPtPartitionedTableFullCache() throws Exception { - innerTestLookupMaxPtPartitionedTable(LookupCacheMode.FULL); - } - - private void innerTestLookupMaxPtPartitionedTable(LookupCacheMode mode) throws Exception { - tEnv.executeSql( - "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + @ParameterizedTest + @EnumSource(LookupCacheMode.class) + public void testLookupMaxPtPartitionedTable(LookupCacheMode mode) throws Exception { + boolean testDynamicBucket = ThreadLocalRandom.current().nextBoolean(); + String primaryKeys; + String bucket; + if (testDynamicBucket) { + primaryKeys = "k"; + bucket = "-1"; + } else { + primaryKeys = "pt, k"; + bucket = "1"; + } + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (%s) NOT ENFORCED)" + "PARTITIONED BY (`pt`) WITH (" - + "'bucket' = '1', " + + "'bucket' = '%s', " + "'lookup.dynamic-partition' = 'max_pt()', " + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " - + String.format("'lookup.cache' = '%s', ", mode) - + "'continuous.discovery-interval'='1 ms')"); + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + primaryKeys, bucket, mode); String query = "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for system_time as of T.proctime AS D ON T.i = D.k"; BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());