From 61d1945121ca472133bff18d6a67b88201800c56 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Mon, 17 Jun 2024 15:22:57 +0800 Subject: [PATCH] [flink] Fix lookup IndexOutOfBounds for sequence defined with projection (#3527) --- .../java/org/apache/paimon/types/RowType.java | 9 +++++ .../flink/lookup/FullCacheLookupTable.java | 15 ++++++- .../paimon/flink/lookup/LookupTableTest.java | 40 +++++++++++++++++++ 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java index fe11976de698..a691dbc0b57d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java @@ -99,6 +99,15 @@ public int getFieldIndex(String fieldName) { return -1; } + public int[] getFieldIndices(List projectFields) { + List fieldNames = getFieldNames(); + int[] projection = new int[projectFields.size()]; + for (int i = 0; i < projection.length; i++) { + projection[i] = fieldNames.indexOf(projectFields.get(i)); + } + return projection; + } + public boolean containsField(String fieldName) { for (DataField field : fields) { if (field.name().equals(fieldName)) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index bd0848e7ade8..ed5c4c13f61e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -85,7 +85,6 @@ public abstract class FullCacheLookupTable implements LookupTable { private Predicate specificPartition; public FullCacheLookupTable(Context context) { - this.context = context; this.table = context.table; List sequenceFields = new ArrayList<>(); if (table.primaryKeys().size() > 0) { @@ -106,6 +105,7 @@ public FullCacheLookupTable(Context context) { builder.field(f.name(), f.type()); }); projectedType = builder.build(); + context = context.copy(table.rowType().getFieldIndices(projectedType.getFieldNames())); this.userDefinedSeqComparator = UserDefinedSeqComparator.create(projectedType, sequenceFields); this.appendUdsFieldNumber = appendUdsFieldNumber.get(); @@ -114,6 +114,8 @@ public FullCacheLookupTable(Context context) { this.appendUdsFieldNumber = 0; } + this.context = context; + Options options = Options.fromMap(context.table.options()); this.projectedType = projectedType; this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC); @@ -354,5 +356,16 @@ public Context( this.joinKey = joinKey; this.requiredCachedBucketIds = requiredCachedBucketIds; } + + public Context copy(int[] newProjection) { + return new Context( + table, + newProjection, + tablePredicate, + projectedPredicate, + tempPath, + joinKey, + requiredCachedBucketIds); + } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 7cca8e25d0b5..14643542e73d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -222,6 +222,46 @@ public void testPkTableWithSequenceField() throws Exception { assertRow(result.get(0), 1, 22, 222); } + @Test + public void testPkTableWithSequenceFieldProjection() throws Exception { + Options options = new Options(); + options.set(CoreOptions.SEQUENCE_FIELD, "f2"); + options.set(CoreOptions.BUCKET, 1); + FileStoreTable storeTable = createTable(singletonList("f0"), options); + FullCacheLookupTable.Context context = + new FullCacheLookupTable.Context( + storeTable, + new int[] {0, 1}, + null, + null, + tempDir.toFile(), + singletonList("f0"), + null); + table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); + table.open(); + + // first write + write(storeTable, GenericRow.of(1, 11, 111)); + table.refresh(); + List result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 11); + + // second write + write(storeTable, GenericRow.of(1, 22, 222)); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 22); + + // not update + write(storeTable, GenericRow.of(1, 33, 111)); + table.refresh(); + result = table.get(row(1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, 22); + } + @Test public void testPkTablePkFilter() throws Exception { FileStoreTable storeTable = createTable(singletonList("f0"), new Options());