From b57dc358d6be1704d6c005d46793c8c44fe569d9 Mon Sep 17 00:00:00 2001 From: Aitozi <1059789585@qq.com> Date: Tue, 9 Jan 2024 17:54:16 +0800 Subject: [PATCH] [flink] Add test to cover bucket extractor in PrimaryKeyPartialLookupTable --- .../flink/lookup/FileStoreLookupFunction.java | 3 +- .../lookup/FixedBucketFromPkExtractor.java | 4 +- .../lookup/PrimaryKeyPartialLookupTable.java | 16 ++++++- .../apache/paimon/flink/LookupJoinITCase.java | 36 ++++++++++++++ .../paimon/flink/lookup/LookupTableTest.java | 48 ++++++++++++++++--- 5 files changed, 95 insertions(+), 12 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 96b7b0b476eb..73805b5feac5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -141,7 +141,8 @@ private void open() throws Exception { && new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) { try { this.lookupTable = - new PrimaryKeyPartialLookupTable(storeTable, predicate, projection, path); + new PrimaryKeyPartialLookupTable( + storeTable, predicate, projection, path, joinKeys); } catch (UnsupportedOperationException ignore) { } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java index 2c296992ae01..fa5acf427219 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java @@ -90,9 +90,7 @@ private BinaryRow bucketKey() { return trimmedPrimaryKey(); } - if (bucketKey == null) { - bucketKey = bucketKeyProjection.apply(primaryKey); - } + bucketKey = bucketKeyProjection.apply(primaryKey); return bucketKey; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index 57a274e114f4..fc950948faaf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -28,6 +28,7 @@ import org.apache.paimon.table.query.TableQuery; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; +import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.Projection; import javax.annotation.Nullable; @@ -46,8 +47,14 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { private final TableFileMonitor fileMonitor; + private final ProjectedRow projectedKey; + public PrimaryKeyPartialLookupTable( - FileStoreTable table, @Nullable Predicate predicate, int[] projection, File tempPath) { + FileStoreTable table, + @Nullable Predicate predicate, + int[] projection, + File tempPath, + List joinKey) { if (table.partitionKeys().size() > 0) { throw new UnsupportedOperationException( "The partitioned table are not supported in partial cache mode."); @@ -64,6 +71,12 @@ public PrimaryKeyPartialLookupTable( .withIOManager(new IOManagerImpl(tempPath.toString())); this.extractor = new FixedBucketFromPkExtractor(table.schema()); this.fileMonitor = new TableFileMonitor(table, predicate); + this.projectedKey = + ProjectedRow.from( + table.primaryKeys().stream() + .map(joinKey::indexOf) + .mapToInt(value -> value) + .toArray()); } @Override @@ -73,6 +86,7 @@ public void open() throws Exception { @Override public List get(InternalRow key) throws IOException { + key = projectedKey.replaceRow(key); extractor.setRecord(key); int bucket = extractor.bucket(); BinaryRow partition = extractor.partition(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 76b4fd1c5c1e..30e34dcc353e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -745,4 +745,40 @@ public void testAsyncRetryLookupSecKeyWithSequenceField() throws Exception { iterator.close(); } + + @Test + public void testPartialCacheBucketKeyOrder() throws Exception { + sql( + "CREATE TABLE DIM (k2 INT, k1 INT, j INT , i INT, PRIMARY KEY(i, j) NOT ENFORCED) WITH" + + " ('continuous.discovery-interval'='1 ms', 'lookup.cache'='auto', 'bucket' = '2', 'bucket-key' = 'j')"); + + sql("CREATE TABLE T2 (j INT, i INT, `proctime` AS PROCTIME())"); + + sql("INSERT INTO DIM VALUES (1111, 111, 11, 1), (2222, 222, 22, 2)"); + + String query = + "SELECT T2.i, D.j, D.k1, D.k2 FROM T2 LEFT JOIN DIM for system_time as of T2.proctime AS D ON T2.i = D.i and T2.j = D.j"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T2 VALUES (11, 1), (22, 2), (33, 3)"); + List result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111, 1111), + Row.of(2, 22, 222, 2222), + Row.of(3, null, null, null)); + + sql("INSERT INTO DIM VALUES (2222, 222, 11, 1), (3333, 333, 33, 3)"); + Thread.sleep(2000); // wait refresh + sql("INSERT INTO T2 VALUES (11, 1), (22, 2), (33, 3), (44, 4)"); + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 222, 2222), + Row.of(2, 22, 222, 2222), + Row.of(3, 33, 333, 3333), + Row.of(4, null, null, null)); + + iterator.close(); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index e67e5eb760b1..9ce27a32162b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -47,6 +47,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; import java.io.IOException; import java.nio.file.Path; @@ -472,7 +473,11 @@ public void testPartialLookupTable() throws Exception { FileStoreTable dimTable = createDimTable(); PrimaryKeyPartialLookupTable table = new PrimaryKeyPartialLookupTable( - dimTable, null, new int[] {0, 1, 2}, tempDir.toFile()); + dimTable, + null, + new int[] {0, 1, 2}, + tempDir.toFile(), + ImmutableList.of("pk1", "pk2")); List result = table.get(row(1, -1)); assertThat(result).hasSize(0); @@ -499,7 +504,11 @@ public void testPartialLookupTableWithProjection() throws Exception { FileStoreTable dimTable = createDimTable(); PrimaryKeyPartialLookupTable table = new PrimaryKeyPartialLookupTable( - dimTable, null, new int[] {2, 1}, tempDir.toFile()); + dimTable, + null, + new int[] {2, 1}, + tempDir.toFile(), + ImmutableList.of("pk1", "pk2")); List result = table.get(row(1, -1)); assertThat(result).hasSize(0); @@ -516,6 +525,32 @@ public void testPartialLookupTableWithProjection() throws Exception { assertRow(result.get(0), 22, -2); } + @Test + public void testPartialLookupTableJoinKeyOrder() throws Exception { + FileStoreTable dimTable = createDimTable(); + PrimaryKeyPartialLookupTable table = + new PrimaryKeyPartialLookupTable( + dimTable, + null, + new int[] {2, 1}, + tempDir.toFile(), + ImmutableList.of("pk2", "pk1")); + List result = table.get(row(-1, 1)); + assertThat(result).hasSize(0); + + write(dimTable, ioManager, GenericRow.of(1, -1, 11), GenericRow.of(2, -2, 22)); + result = table.get(row(-1, 1)); + assertThat(result).hasSize(0); + + table.refresh(); + result = table.get(row(-1, 1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 11, -1); + result = table.get(row(-2, 2)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 22, -2); + } + private FileStoreTable createDimTable() throws Exception { FileIO fileIO = LocalFileIO.create(); org.apache.paimon.fs.Path tablePath = @@ -523,13 +558,12 @@ private FileStoreTable createDimTable() throws Exception { String.format("%s/%s.db/%s", warehouse, database, "T")); Schema schema = Schema.newBuilder() - .column("pk", DataTypes.INT()) - .column("col1", DataTypes.INT()) + .column("pk1", DataTypes.INT()) + .column("pk2", DataTypes.INT()) .column("col2", DataTypes.INT()) - .primaryKey("pk", "col1") - .option(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup") + .primaryKey("pk1", "pk2") .option(CoreOptions.BUCKET.key(), "2") - .option(CoreOptions.BUCKET_KEY.key(), "col1") + .option(CoreOptions.BUCKET_KEY.key(), "pk2") .build(); TableSchema tableSchema = SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), schema);