From a3d1e1fc14f68b3b6c949249ed9770af92b5c711 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Mon, 15 Jan 2024 15:21:23 +0800 Subject: [PATCH] [flink] Introduce RemoteTableQuery for Flink Lookup join (#2681) --- .../table/AppendOnlyFileStoreTable.java | 4 +- .../apache/paimon/table/FileStoreTable.java | 4 +- .../table/PrimaryKeyFileStoreTable.java | 7 +- ...bleQueryImpl.java => LocalTableQuery.java} | 12 +- .../apache/paimon/table/query/TableQuery.java | 11 -- .../table/PrimaryKeyFileStoreTableTest.java | 6 +- paimon-flink/paimon-flink-common/pom.xml | 2 + .../flink/lookup/FileStoreLookupFunction.java | 20 ++- .../lookup/PrimaryKeyPartialLookupTable.java | 141 +++++++++++++---- .../paimon/flink/query/RemoteTableQuery.java | 109 +++++++++++++ .../paimon/flink/CatalogITCaseBase.java | 11 ++ .../paimon/flink/RemoteLookupJoinITCase.java | 149 ++++++++++++++++++ .../lookup/FileStoreLookupFunctionTest.java | 80 +++++++--- .../paimon/flink/lookup/LookupTableTest.java | 6 +- paimon-flink/pom.xml | 12 ++ .../paimon/service/KvQueryTableTest.java | 11 +- 16 files changed, 493 insertions(+), 92 deletions(-) rename paimon-core/src/main/java/org/apache/paimon/table/query/{TableQueryImpl.java => LocalTableQuery.java} (95%) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 2075680d2b82..0ebcd44aeb11 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -32,7 +32,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.query.TableQuery; +import org.apache.paimon.table.query.LocalTableQuery; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.AbstractDataTableRead; import org.apache.paimon.table.source.AppendOnlySplitGenerator; @@ -151,7 +151,7 @@ record -> { } @Override - public TableQuery newTableQuery() { + public LocalTableQuery newLocalTableQuery() { throw new UnsupportedOperationException(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index 5e169b70f9f7..b457f562abc6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -24,7 +24,7 @@ import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.BinaryTableStats; -import org.apache.paimon.table.query.TableQuery; +import org.apache.paimon.table.query.LocalTableQuery; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.types.RowType; @@ -97,7 +97,7 @@ default Optional comment() { @Override TableCommitImpl newCommit(String commitUser); - TableQuery newTableQuery(); + LocalTableQuery newLocalTableQuery(); default BinaryTableStats getSchemaFieldStats(DataFileMeta dataFileMeta) { return dataFileMeta.valueStats(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index d3d0f0857b6f..ef91edf31244 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -36,8 +36,7 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.query.TableQuery; -import org.apache.paimon.table.query.TableQueryImpl; +import org.apache.paimon.table.query.LocalTableQuery; import org.apache.paimon.table.sink.RowKindGenerator; import org.apache.paimon.table.sink.SequenceGenerator; import org.apache.paimon.table.sink.TableWriteImpl; @@ -209,7 +208,7 @@ record -> { } @Override - public TableQuery newTableQuery() { - return new TableQueryImpl(this); + public LocalTableQuery newLocalTableQuery() { + return new LocalTableQuery(this); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/TableQueryImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java similarity index 95% rename from paimon-core/src/main/java/org/apache/paimon/table/query/TableQueryImpl.java rename to paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index e4b90ff3e5e8..2ce70332a66f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/TableQueryImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -51,8 +51,8 @@ import static org.apache.paimon.CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE; import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE; -/** Implementation for {@link TableQuery}. */ -public class TableQueryImpl implements TableQuery { +/** Implementation for {@link TableQuery} for caching data and file in local. */ +public class LocalTableQuery implements TableQuery { private final Map> tableView; @@ -68,7 +68,7 @@ public class TableQueryImpl implements TableQuery { private IOManager ioManager; - public TableQueryImpl(FileStoreTable table) { + public LocalTableQuery(FileStoreTable table) { this.options = table.coreOptions(); this.tableView = new HashMap<>(); FileStore tableStore = table.store(); @@ -105,7 +105,6 @@ public TableQueryImpl(FileStoreTable table) { } } - @Override public void refreshFiles( BinaryRow partition, int bucket, @@ -169,13 +168,12 @@ public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) thro } @Override - public TableQuery withValueProjection(int[][] projection) { + public LocalTableQuery withValueProjection(int[][] projection) { this.readerFactoryBuilder.withValueProjection(projection); return this; } - @Override - public TableQuery withIOManager(IOManager ioManager) { + public LocalTableQuery withIOManager(IOManager ioManager) { this.ioManager = ioManager; return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/TableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/TableQuery.java index 5c5401f2c80c..791e284e21fd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/TableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/TableQuery.java @@ -23,15 +23,12 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; -import org.apache.paimon.disk.IOManager; -import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.utils.Projection; import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; -import java.util.List; /** A query of Table to perform lookup. */ public interface TableQuery extends Closeable { @@ -42,16 +39,8 @@ default TableQuery withValueProjection(int[] projection) { TableQuery withValueProjection(int[][] projection); - TableQuery withIOManager(IOManager ioManager); - InternalRowSerializer createValueSerializer(); - void refreshFiles( - BinaryRow partition, - int bucket, - List beforeFiles, - List dataFiles); - @Nullable InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 517e6a6b7d3b..c3dce3a8ac14 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -35,7 +35,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.query.TableQuery; +import org.apache.paimon.table.query.LocalTableQuery; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; @@ -1176,7 +1176,7 @@ private void innerTestTableQuery(FileStoreTable table) throws Exception { List commitMessages1 = write.prepareCommit(true, 0); commit.commit(0, commitMessages1); - TableQuery query = table.newTableQuery(); + LocalTableQuery query = table.newLocalTableQuery(); query.withIOManager(ioManager); refreshTableService(query, commitMessages1); @@ -1226,7 +1226,7 @@ private void innerTestTableQuery(FileStoreTable table) throws Exception { commit.close(); } - private void refreshTableService(TableQuery query, List commitMessages) { + private void refreshTableService(LocalTableQuery query, List commitMessages) { for (CommitMessage m : commitMessages) { CommitMessageImpl msg = (CommitMessageImpl) m; query.refreshFiles( diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index 3c49ecce510a..3f0ebe026758 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -237,6 +237,8 @@ under the License. org.apache.paimon:paimon-bundle + org.apache.paimon:paimon-service-client + org.apache.paimon:paimon-service-runtime diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index bece61b57732..5685cd1cdc82 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.lookup; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode; import org.apache.paimon.flink.FlinkRowData; @@ -61,6 +62,7 @@ import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; +import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable; import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS; import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; @@ -139,10 +141,17 @@ private void open() throws Exception { if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO && new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) { - try { + if (isRemoteServiceAvailable(storeTable)) { this.lookupTable = - new PrimaryKeyPartialLookupTable(storeTable, projection, path, joinKeys); - } catch (UnsupportedOperationException ignore) { + PrimaryKeyPartialLookupTable.createRemoteTable( + storeTable, projection, joinKeys); + } else { + try { + this.lookupTable = + PrimaryKeyPartialLookupTable.createLocalTable( + storeTable, projection, path, joinKeys); + } catch (UnsupportedOperationException ignore2) { + } } } @@ -219,6 +228,11 @@ private void checkRefresh() throws Exception { nextLoadTime = System.currentTimeMillis() + refreshInterval.toMillis(); } + @VisibleForTesting + LookupTable lookupTable() { + return lookupTable; + } + private void refresh() throws Exception { lookupTable.refresh(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index a5f616a96f50..690cca544ae9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -18,13 +18,15 @@ package org.apache.paimon.flink.lookup; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManagerImpl; +import org.apache.paimon.flink.query.RemoteTableQuery; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.query.TableQuery; +import org.apache.paimon.table.query.LocalTableQuery; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; @@ -33,6 +35,7 @@ import javax.annotation.Nullable; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.Collections; @@ -47,16 +50,13 @@ /** Lookup table for primary key which supports to read the LSM tree directly. */ public class PrimaryKeyPartialLookupTable implements LookupTable { + private final QueryExecutor queryExecutor; private final FixedBucketFromPkExtractor extractor; - - private final TableQuery tableQuery; - - private final StreamTableScan scan; - @Nullable private final ProjectedRow keyRearrange; - public PrimaryKeyPartialLookupTable( - FileStoreTable table, int[] projection, File tempPath, List joinKey) { + private PrimaryKeyPartialLookupTable( + QueryExecutor queryExecutor, FileStoreTable table, List joinKey) { + this.queryExecutor = queryExecutor; if (table.partitionKeys().size() > 0) { throw new UnsupportedOperationException( "The partitioned table are not supported in partial cache mode."); @@ -67,17 +67,8 @@ public PrimaryKeyPartialLookupTable( "Unsupported mode for partial lookup: " + table.bucketMode()); } - this.tableQuery = - table.newTableQuery() - .withValueProjection(Projection.of(projection).toNestedIndexes()) - .withIOManager(new IOManagerImpl(tempPath.toString())); this.extractor = new FixedBucketFromPkExtractor(table.schema()); - Map dynamicOptions = new HashMap<>(); - dynamicOptions.put(STREAM_SCAN_MODE.key(), FILE_MONITOR.getValue()); - dynamicOptions.put(SCAN_BOUNDED_WATERMARK.key(), null); - this.scan = table.copy(dynamicOptions).newReadBuilder().newStreamScan(); - ProjectedRow keyRearrange = null; if (!table.primaryKeys().equals(joinKey)) { keyRearrange = @@ -90,6 +81,11 @@ public PrimaryKeyPartialLookupTable( this.keyRearrange = keyRearrange; } + @VisibleForTesting + QueryExecutor queryExecutor() { + return queryExecutor; + } + @Override public void open() throws Exception { refresh(); @@ -104,7 +100,7 @@ public List get(InternalRow key) throws IOException { int bucket = extractor.bucket(); BinaryRow partition = extractor.partition(); - InternalRow kv = tableQuery.lookup(partition, bucket, key); + InternalRow kv = queryExecutor.lookup(partition, bucket, key); if (kv == null) { return Collections.emptyList(); } else { @@ -114,28 +110,105 @@ public List get(InternalRow key) throws IOException { @Override public void refresh() { - while (true) { - List splits = scan.plan().splits(); - if (splits.isEmpty()) { - return; - } + queryExecutor.refresh(); + } + + @Override + public void close() throws IOException { + queryExecutor.close(); + } + + public static PrimaryKeyPartialLookupTable createLocalTable( + FileStoreTable table, int[] projection, File tempPath, List joinKey) { + LocalQueryExecutor queryExecutor = new LocalQueryExecutor(table, projection, tempPath); + return new PrimaryKeyPartialLookupTable(queryExecutor, table, joinKey); + } + + public static PrimaryKeyPartialLookupTable createRemoteTable( + FileStoreTable table, int[] projection, List joinKey) { + RemoveQueryExecutor queryExecutor = new RemoveQueryExecutor(table, projection); + return new PrimaryKeyPartialLookupTable(queryExecutor, table, joinKey); + } + + interface QueryExecutor extends Closeable { + + InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException; + + void refresh(); + } + + static class LocalQueryExecutor implements QueryExecutor { + + private final LocalTableQuery tableQuery; + private final StreamTableScan scan; + + private LocalQueryExecutor(FileStoreTable table, int[] projection, File tempPath) { + this.tableQuery = + table.newLocalTableQuery() + .withValueProjection(Projection.of(projection).toNestedIndexes()) + .withIOManager(new IOManagerImpl(tempPath.toString())); + + Map dynamicOptions = new HashMap<>(); + dynamicOptions.put(STREAM_SCAN_MODE.key(), FILE_MONITOR.getValue()); + dynamicOptions.put(SCAN_BOUNDED_WATERMARK.key(), null); + this.scan = table.copy(dynamicOptions).newReadBuilder().newStreamScan(); + } - for (Split split : splits) { - if (!(split instanceof DataSplit)) { - throw new IllegalArgumentException("Unsupported split: " + split.getClass()); + @Override + public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) + throws IOException { + return tableQuery.lookup(partition, bucket, key); + } + + @Override + public void refresh() { + while (true) { + List splits = scan.plan().splits(); + if (splits.isEmpty()) { + return; } - BinaryRow partition = ((DataSplit) split).partition(); - int bucket = ((DataSplit) split).bucket(); - List before = ((DataSplit) split).beforeFiles(); - List after = ((DataSplit) split).dataFiles(); - tableQuery.refreshFiles(partition, bucket, before, after); + for (Split split : splits) { + if (!(split instanceof DataSplit)) { + throw new IllegalArgumentException( + "Unsupported split: " + split.getClass()); + } + BinaryRow partition = ((DataSplit) split).partition(); + int bucket = ((DataSplit) split).bucket(); + List before = ((DataSplit) split).beforeFiles(); + List after = ((DataSplit) split).dataFiles(); + + tableQuery.refreshFiles(partition, bucket, before, after); + } } } + + @Override + public void close() throws IOException { + tableQuery.close(); + } } - @Override - public void close() throws IOException { - tableQuery.close(); + static class RemoveQueryExecutor implements QueryExecutor { + + private final RemoteTableQuery tableQuery; + + private RemoveQueryExecutor(FileStoreTable table, int[] projection) { + this.tableQuery = new RemoteTableQuery(table).withValueProjection(projection); + } + + @Override + public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) + throws IOException { + return tableQuery.lookup(partition, bucket, key); + } + + @Override + public void refresh() {} + + @Override + public void close() throws IOException { + tableQuery.close(); + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java new file mode 100644 index 000000000000..3a821b5e387f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.paimon.flink.query; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.query.QueryLocationImpl; +import org.apache.paimon.service.ServiceManager; +import org.apache.paimon.service.client.KvQueryClient; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.query.TableQuery; +import org.apache.paimon.utils.ProjectedRow; +import org.apache.paimon.utils.Projection; +import org.apache.paimon.utils.TypeUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP; + +/** Implementation for {@link TableQuery} to lookup data from remote service. */ +public class RemoteTableQuery implements TableQuery { + + private final FileStoreTable table; + private final KvQueryClient client; + private final InternalRowSerializer keySerializer; + + private int[] projection; + + public RemoteTableQuery(FileStoreTable table) { + this.table = table; + ServiceManager manager = table.store().newServiceManager(); + this.client = new KvQueryClient(new QueryLocationImpl(manager), 1); + this.keySerializer = + InternalSerializers.create(TypeUtils.project(table.rowType(), table.primaryKeys())); + } + + public static boolean isRemoteServiceAvailable(FileStoreTable table) { + return table.store().newServiceManager().service(PRIMARY_KEY_LOOKUP).isPresent(); + } + + @Nullable + @Override + public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException { + BinaryRow row; + try { + row = + client.getValues( + partition, + bucket, + new BinaryRow[] {keySerializer.toBinaryRow(key)}) + .get()[0]; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + + if (row == null) { + return null; + } + + return ProjectedRow.from(projection).replaceRow(row); + } + + @Override + public RemoteTableQuery withValueProjection(int[] projection) { + return withValueProjection(Projection.of(projection).toNestedIndexes()); + } + + @Override + public RemoteTableQuery withValueProjection(int[][] projection) { + this.projection = Projection.of(projection).toTopLevelIndexes(); + return this; + } + + @Override + public InternalRowSerializer createValueSerializer() { + return InternalSerializers.create(TypeUtils.project(table.rowType(), projection)); + } + + @Override + public void close() throws IOException { + client.shutdown(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java index f35ef0960ecc..fd9a72f417dd 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java @@ -95,6 +95,17 @@ public void before() throws IOException { prepareEnv(); } + protected Table getPaimonTable(String tableName) { + FlinkCatalog flinkCatalog = (FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); + try { + return flinkCatalog + .catalog() + .getTable(new Identifier(tEnv.getCurrentDatabase(), tableName)); + } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } + protected Map catalogOptions() { return Collections.emptyMap(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java new file mode 100644 index 000000000000..4903ec9a045d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.service.ServiceManager; +import org.apache.paimon.service.network.stats.DisabledServiceRequestStats; +import org.apache.paimon.service.server.KvQueryServer; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.query.LocalTableQuery; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.BlockingIterator; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Test; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; + +import static org.apache.paimon.io.DataFileTestUtils.row; +import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP; +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for remote lookup join. */ +public class RemoteLookupJoinITCase extends CatalogITCaseBase { + + @Override + public List ddl() { + return Collections.singletonList("CREATE TABLE T (i INT, `proctime` AS PROCTIME())"); + } + + @Override + protected int defaultParallelism() { + return 1; + } + + @Test + public void testLookupRemoteTable() throws Throwable { + sql("CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT)"); + ServiceProxy proxy = launchQueryServer("DIM"); + + proxy.write(GenericRow.of(1, 11, 111, 1111)); + proxy.write(GenericRow.of(2, 22, 222, 2222)); + + String query = + "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2), (3)"); + List result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, null, null)); + + proxy.write(GenericRow.of(2, 44, 444, 4444)); + proxy.write(GenericRow.of(3, 33, 333, 3333)); + + Thread.sleep(2000); // wait refresh + sql("INSERT INTO T VALUES (1), (2), (3), (4)"); + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111), + Row.of(2, 44, 444), + Row.of(3, 33, 333), + Row.of(4, null, null)); + + iterator.close(); + + proxy.close(); + } + + private ServiceProxy launchQueryServer(String tableName) throws Throwable { + FileStoreTable table = (FileStoreTable) getPaimonTable(tableName); + LocalTableQuery query = table.newLocalTableQuery().withIOManager(IOManager.create(path)); + KvQueryServer server = + new KvQueryServer( + 0, + 1, + InetAddress.getLocalHost().getHostName(), + Collections.singletonList(0).iterator(), + 1, + 1, + query, + new DisabledServiceRequestStats()); + server.start(); + + InetSocketAddress[] addresses = new InetSocketAddress[] {server.getServerAddress()}; + ServiceManager serviceManager = table.store().newServiceManager(); + serviceManager.resetService(PRIMARY_KEY_LOOKUP, addresses); + + return new ServiceProxy() { + + @Override + public void write(InternalRow row) throws Exception { + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit(); + write.write(row); + List commitMessages = write.prepareCommit(); + commit.commit(commitMessages); + + CommitMessageImpl message = (CommitMessageImpl) commitMessages.get(0); + query.refreshFiles( + message.partition(), + message.bucket(), + Collections.emptyList(), + message.newFilesIncrement().newFiles()); + } + + @Override + public void close() throws IOException { + server.shutdown(); + query.close(); + } + }; + } + + private interface ServiceProxy extends Closeable { + + void write(InternalRow row) throws Exception; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java index 87590181b0e7..61dc4e230a2f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java @@ -22,11 +22,15 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkRowData; +import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.LocalQueryExecutor; +import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor; +import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.RemoveQueryExecutor; import org.apache.paimon.lookup.RocksDBOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.service.ServiceManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.CommitMessage; @@ -42,6 +46,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.net.InetSocketAddress; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; @@ -51,6 +56,7 @@ import java.util.Random; import java.util.UUID; +import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link FileStoreLookupFunction}. */ @@ -58,16 +64,26 @@ public class FileStoreLookupFunctionTest { private static final Random RANDOM = new Random(); + @TempDir private Path tempDir; + private final String commitUser = UUID.randomUUID().toString(); private final TraceableFileIO fileIO = new TraceableFileIO(); - private FileStoreLookupFunction fileStoreLookupFunction; - private FileStoreTable fileStoreTable; - @TempDir private Path tempDir; + + private org.apache.paimon.fs.Path tablePath; + private FileStoreLookupFunction lookupFunction; + private FileStoreTable table; @BeforeEach public void before() throws Exception { - org.apache.paimon.fs.Path path = new org.apache.paimon.fs.Path(tempDir.toString()); - SchemaManager schemaManager = new SchemaManager(fileIO, path); + tablePath = new org.apache.paimon.fs.Path(tempDir.toString()); + } + + private void createLookupFunction() throws Exception { + createLookupFunction(true, false); + } + + private void createLookupFunction(boolean isPartition, boolean joinEqualPk) throws Exception { + SchemaManager schemaManager = new SchemaManager(fileIO, tablePath); Options conf = new Options(); conf.set(CoreOptions.BUCKET, 2); conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3); @@ -82,31 +98,58 @@ public void before() throws Exception { Schema schema = new Schema( rowType.getFields(), - Collections.singletonList("pt"), + isPartition ? Collections.singletonList("pt") : Collections.emptyList(), Arrays.asList("pt", "k"), conf.toMap(), ""); TableSchema tableSchema = schemaManager.createTable(schema); - fileStoreTable = + table = FileStoreTableFactory.create( fileIO, new org.apache.paimon.fs.Path(tempDir.toString()), tableSchema); - fileStoreLookupFunction = - new FileStoreLookupFunction(fileStoreTable, new int[] {0, 1}, new int[] {1}, null); - fileStoreLookupFunction.open(tempDir.toString()); + lookupFunction = + new FileStoreLookupFunction( + table, + new int[] {0, 1}, + joinEqualPk ? new int[] {0, 1} : new int[] {1}, + null); + lookupFunction.open(tempDir.toString()); } @AfterEach public void close() throws Exception { - if (fileStoreLookupFunction != null) { - fileStoreLookupFunction.close(); + if (lookupFunction != null) { + lookupFunction.close(); } } + @Test + public void testDefaultLocalPartial() throws Exception { + createLookupFunction(false, true); + assertThat(lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class); + QueryExecutor queryExecutor = + ((PrimaryKeyPartialLookupTable) lookupFunction.lookupTable()).queryExecutor(); + assertThat(queryExecutor).isInstanceOf(LocalQueryExecutor.class); + } + + @Test + public void testDefaultRemotePartial() throws Exception { + createLookupFunction(false, true); + ServiceManager serviceManager = new ServiceManager(fileIO, tablePath); + serviceManager.resetService( + PRIMARY_KEY_LOOKUP, new InetSocketAddress[] {new InetSocketAddress(1)}); + lookupFunction.open(tempDir.toString()); + assertThat(lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class); + QueryExecutor queryExecutor = + ((PrimaryKeyPartialLookupTable) lookupFunction.lookupTable()).queryExecutor(); + assertThat(queryExecutor).isInstanceOf(RemoveQueryExecutor.class); + } + @Test public void testLookupScanLeak() throws Exception { + createLookupFunction(); commit(writeCommit(1)); - fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L))); + lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L))); assertThat( TraceableFileIO.openInputStreams( s -> s.toString().contains(tempDir.toString())) @@ -114,7 +157,7 @@ public void testLookupScanLeak() throws Exception { .isEqualTo(0); commit(writeCommit(10)); - fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L))); + lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L))); assertThat( TraceableFileIO.openInputStreams( s -> s.toString().contains(tempDir.toString())) @@ -124,25 +167,26 @@ public void testLookupScanLeak() throws Exception { @Test public void testLookupExpiredSnapshot() throws Exception { + createLookupFunction(); commit(writeCommit(1)); - fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L))); + lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L))); commit(writeCommit(2)); commit(writeCommit(3)); commit(writeCommit(4)); commit(writeCommit(5)); - fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L))); + lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L))); } private void commit(List messages) throws Exception { - TableCommitImpl commit = fileStoreTable.newCommit(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); commit.commit(messages); commit.close(); } private List writeCommit(int number) throws Exception { List messages = new ArrayList<>(); - StreamTableWrite writer = fileStoreTable.newStreamWriteBuilder().newWrite(); + StreamTableWrite writer = table.newStreamWriteBuilder().newWrite(); for (int i = 0; i < number; i++) { writer.write(randomRow()); messages.addAll(writer.prepareCommit(true, i)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 1be2e6ae9986..0e5d0792b954 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -472,7 +472,7 @@ public void testNoPrimaryKeyTableFilter() throws Exception { public void testPartialLookupTable() throws Exception { FileStoreTable dimTable = createDimTable(); PrimaryKeyPartialLookupTable table = - new PrimaryKeyPartialLookupTable( + PrimaryKeyPartialLookupTable.createLocalTable( dimTable, new int[] {0, 1, 2}, tempDir.toFile(), @@ -502,7 +502,7 @@ public void testPartialLookupTable() throws Exception { public void testPartialLookupTableWithProjection() throws Exception { FileStoreTable dimTable = createDimTable(); PrimaryKeyPartialLookupTable table = - new PrimaryKeyPartialLookupTable( + PrimaryKeyPartialLookupTable.createLocalTable( dimTable, new int[] {2, 1}, tempDir.toFile(), @@ -527,7 +527,7 @@ public void testPartialLookupTableWithProjection() throws Exception { public void testPartialLookupTableJoinKeyOrder() throws Exception { FileStoreTable dimTable = createDimTable(); PrimaryKeyPartialLookupTable table = - new PrimaryKeyPartialLookupTable( + PrimaryKeyPartialLookupTable.createLocalTable( dimTable, new int[] {2, 1}, tempDir.toFile(), diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml index f6adaeb21349..325d5e8be239 100644 --- a/paimon-flink/pom.xml +++ b/paimon-flink/pom.xml @@ -51,6 +51,18 @@ under the License. ${project.version} + + org.apache.paimon + paimon-service-client + ${project.version} + + + + org.apache.paimon + paimon-service-runtime + ${project.version} + + org.apache.paimon paimon-core diff --git a/paimon-service/paimon-service-runtime/src/test/java/org/apache/paimon/service/KvQueryTableTest.java b/paimon-service/paimon-service-runtime/src/test/java/org/apache/paimon/service/KvQueryTableTest.java index 078e2e9a3526..6e7c5effae7c 100644 --- a/paimon-service/paimon-service-runtime/src/test/java/org/apache/paimon/service/KvQueryTableTest.java +++ b/paimon-service/paimon-service-runtime/src/test/java/org/apache/paimon/service/KvQueryTableTest.java @@ -26,6 +26,7 @@ import org.apache.paimon.service.client.KvQueryClient; import org.apache.paimon.service.network.stats.DisabledServiceRequestStats; import org.apache.paimon.service.server.KvQueryServer; +import org.apache.paimon.table.query.LocalTableQuery; import org.apache.paimon.table.query.TableQuery; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.CommitMessage; @@ -48,8 +49,8 @@ /** Test for remote lookup. */ public class KvQueryTableTest extends PrimaryKeyTableTestBase { - private TableQuery query0; - private TableQuery query1; + private LocalTableQuery query0; + private LocalTableQuery query1; private KvQueryServer server0; private KvQueryServer server1; @@ -59,8 +60,8 @@ public class KvQueryTableTest extends PrimaryKeyTableTestBase { @BeforeEach public void beforeEach() { IOManager ioManager = IOManager.create(tempPath.toString()); - this.query0 = table.newTableQuery().withIOManager(ioManager); - this.query1 = table.newTableQuery().withIOManager(ioManager); + this.query0 = table.newLocalTableQuery().withIOManager(ioManager); + this.query1 = table.newLocalTableQuery().withIOManager(ioManager); this.server0 = createServer(0, query0, 7777); this.server1 = createServer(1, query1, 7900); @@ -190,7 +191,7 @@ private void innerTestServerRestart(Runnable restart) throws Throwable { private void write(int partition, int key, int value) throws Exception { int bucket = computeBucket(partition, key, value); - TableQuery query = select(row(partition), bucket, 2) == 0 ? query0 : query1; + LocalTableQuery query = select(row(partition), bucket, 2) == 0 ? query0 : query1; BatchTableWrite write = table.newBatchWriteBuilder().newWrite(); write.write(row(partition, key, value), bucket); CommitMessageImpl message = (CommitMessageImpl) write.prepareCommit().get(0);