diff --git a/docs/content/how-to/lookup-joins.md b/docs/content/how-to/lookup-joins.md index a0c8072fba95..34973bdc167f 100644 --- a/docs/content/how-to/lookup-joins.md +++ b/docs/content/how-to/lookup-joins.md @@ -30,7 +30,7 @@ under the License. Paimon supports lookup joins on tables with primary keys and append tables in Flink. The following example illustrates this feature. -### Prepare +## Prepare First, let's create a Paimon table and update it in real-time. @@ -69,7 +69,7 @@ CREATE TEMPORARY TABLE Orders ( ); ``` -### Normal Lookup +## Normal Lookup You can now use `customers` in a lookup join query. @@ -82,7 +82,7 @@ FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; ``` -### Retry Lookup +## Retry Lookup If the records of `Orders` (main table) join missing because the corresponding data of `customers` (lookup table) is not ready. You can consider using Flink's [Delayed Retry Strategy For Lookup](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/#3-enable-delayed-retry-strategy-for-lookup). @@ -98,7 +98,7 @@ FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; ``` -### Async Retry Lookup +## Async Retry Lookup The problem with synchronous retry is that one record will block subsequent records, causing the entire job to be blocked. You can consider using async + allow_unordered to avoid blocking, the records that join missing will no longer block @@ -120,22 +120,34 @@ your streaming job may be blocked. You can try to use `audit_log` system table f (convert CDC stream to append stream). {{< /hint >}} -### Performance +## Query Service -The lookup join operator will maintain a RocksDB cache locally and pull the latest updates of the table in real time. Lookup join operator will only pull the necessary data, so your filter conditions are very important for performance. +You can run a Flink Streaming Job to start query service for the table. When QueryService exists, Flink Lookup Join +will prioritize obtaining data from it, which will effectively improve query performance. -This feature is only suitable for tables containing at most tens of millions of records to avoid excessive use of local disks. +{{< tabs "query-service" >}} -## RocksDB Cache Options - -The following options allow users to finely adjust RocksDB for better performance. You can either specify them in table properties or in dynamic table hints. +{{< tab "Flink SQL" >}} ```sql --- dynamic table hints example -SELECT o.order_id, o.total, c.country, c.zip -FROM Orders AS o JOIN customers /*+ OPTIONS('lookup.cache-rows'='20000') */ -FOR SYSTEM_TIME AS OF o.proc_time AS c -ON o.customer_id = c.id; +CALL sys.query_service('database_name.table_name', parallelism); ``` -{{< generated/rocksdb_configuration >}} +{{< /tab >}} + +{{< tab "Flink Action" >}} + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + query_service \ + --warehouse \ + --database \ + --table \ + [--parallelism ] \ + [--catalog_conf [--catalog_conf ...]] +``` + +{{< /tab >}} + +{{< /tabs >}} diff --git a/docs/content/how-to/writing-tables.md b/docs/content/how-to/writing-tables.md index c47f9d97a635..d42c1ce833ac 100644 --- a/docs/content/how-to/writing-tables.md +++ b/docs/content/how-to/writing-tables.md @@ -128,9 +128,11 @@ INSERT INTO MyTable SELECT ... {{< /tabs >}} ## Overwriting + Note :If `spark.sql.sources.partitionOverwriteMode` is set to `dynamic` by default in Spark, in order to ensure that the insert overwrite function of the Paimon table can be used normally, `spark.sql.extensions` should be set to `org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions`. + ### Overwriting the Whole Table For unpartitioned tables, Paimon supports overwriting the whole table. diff --git a/docs/content/maintenance/configurations.md b/docs/content/maintenance/configurations.md index a6b967a65f10..f48956462b6a 100644 --- a/docs/content/maintenance/configurations.md +++ b/docs/content/maintenance/configurations.md @@ -66,4 +66,10 @@ Flink connector options for paimon. Spark connector options for paimon. -{{< generated/spark_connector_configuration >}} \ No newline at end of file +{{< generated/spark_connector_configuration >}} + +## RocksDB Options + +The following options allow users to finely adjust RocksDB for better performance. You can either specify them in table properties or in dynamic table hints. + +{{< generated/rocksdb_configuration >}} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 2ce70332a66f..1ba0c698aff6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -147,9 +147,11 @@ private void newLookupLevels(BinaryRow partition, int bucket, List tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels); } + /** TODO remove synchronized and supports multiple thread to lookup. */ @Nullable @Override - public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException { + public synchronized InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) + throws IOException { Map buckets = tableView.get(partition); if (buckets == null || buckets.isEmpty()) { return null; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/QueryServiceActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/QueryServiceActionFactory.java new file mode 100644 index 000000000000..fecbc91e6270 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/QueryServiceActionFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.flink.service.QueryService; + +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; +import java.util.Optional; + +/** Factory to create QueryService Action. */ +public class QueryServiceActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "query_service"; + + public static final String PARALLELISM = "parallelism"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + Tuple3 tablePath = getTablePath(params); + Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + Map tableConfig = optionalConfigMap(params, TABLE_CONF); + String parallStr = params.get(PARALLELISM); + int parallelism = parallStr == null ? 1 : Integer.parseInt(parallStr); + Action action = + new TableActionBase(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig) { + @Override + public void run() throws Exception { + QueryService.build(env, table.copy(tableConfig), parallelism); + execute("Query Service job"); + } + }; + return Optional.of(action); + } + + @Override + public void printHelp() { + System.out.println( + "Action \"query-service\" runs a dedicated job starting query service for a table."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " query-service --warehouse --database --table --parallelism " + + "[--catalog_conf = [--catalog_conf = ...]] " + + "[--table_conf = [--table_conf = ...]] "); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java index 5f71d53c27b2..fed5c43ac7c8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java @@ -58,10 +58,7 @@ protected String[] execute( action.withStreamExecutionEnvironment(env); action.build(); - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); - String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName); - JobClient jobClient = env.executeAsync(name); - return execute(jobClient, conf.get(TABLE_DML_SYNC)); + return execute(env, defaultJobName); } protected String[] execute(ProcedureContext procedureContext, JobClient jobClient) { @@ -70,6 +67,13 @@ protected String[] execute(ProcedureContext procedureContext, JobClient jobClien return execute(jobClient, conf.get(TABLE_DML_SYNC)); } + protected String[] execute(StreamExecutionEnvironment env, String defaultJobName) + throws Exception { + ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName); + return execute(env.executeAsync(name), conf.get(TABLE_DML_SYNC)); + } + private String[] execute(JobClient jobClient, boolean dmlSync) { String jobId = jobClient.getJobID().toString(); if (dmlSync) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java new file mode 100644 index 000000000000..2f236bb4fde8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.service.QueryService; +import org.apache.paimon.table.Table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Query Service procedure. Usage: + * + *

+ *  CALL sys.query_service('tableId', 'parallelism')
+ * 
+ */ +public class QueryServiceProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "query_service"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId, int parallelism) + throws Exception { + Table table = catalog.getTable(Identifier.fromString(tableId)); + StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment(); + QueryService.build(env, table, parallelism); + return execute(env, IDENTIFIER); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java index 3a821b5e387f..2599b4f11678 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java @@ -28,6 +28,7 @@ import org.apache.paimon.service.ServiceManager; import org.apache.paimon.service.client.KvQueryClient; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; import org.apache.paimon.table.query.TableQuery; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.Projection; @@ -47,11 +48,11 @@ public class RemoteTableQuery implements TableQuery { private final KvQueryClient client; private final InternalRowSerializer keySerializer; - private int[] projection; + @Nullable private int[] projection; - public RemoteTableQuery(FileStoreTable table) { - this.table = table; - ServiceManager manager = table.store().newServiceManager(); + public RemoteTableQuery(Table table) { + this.table = (FileStoreTable) table; + ServiceManager manager = this.table.store().newServiceManager(); this.client = new KvQueryClient(new QueryLocationImpl(manager), 1); this.keySerializer = InternalSerializers.create(TypeUtils.project(table.rowType(), table.primaryKeys())); @@ -79,6 +80,10 @@ public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) thro throw new IOException(e.getCause()); } + if (projection == null) { + return row; + } + if (row == null) { return null; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java new file mode 100644 index 000000000000..ed883d9ebd78 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.service; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.service.ServiceManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +import java.net.InetSocketAddress; +import java.util.TreeMap; + +import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP; + +/** Operator for address server to register addresses to {@link ServiceManager}. */ +public class QueryAddressRegister extends RichSinkFunction { + + private final Table table; + + private transient int numberExecutors; + private transient TreeMap executors; + + public QueryAddressRegister(Table table) { + this.table = table; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.executors = new TreeMap<>(); + } + + @Override + public void invoke(InternalRow row, SinkFunction.Context context) { + int numberExecutors = row.getInt(0); + if (this.numberExecutors != 0 && this.numberExecutors != numberExecutors) { + throw new IllegalArgumentException( + String.format( + "Number Executors can not be changed! Old %s , New %s .", + this.numberExecutors, numberExecutors)); + } + this.numberExecutors = numberExecutors; + + int executorId = row.getInt(1); + String hostname = row.getString(2).toString(); + int port = row.getInt(3); + + executors.put(executorId, new InetSocketAddress(hostname, port)); + + if (executors.size() == numberExecutors) { + FileStoreTable storeTable = (FileStoreTable) table; + ServiceManager manager = storeTable.store().newServiceManager(); + manager.resetService( + PRIMARY_KEY_LOOKUP, executors.values().toArray(new InetSocketAddress[0])); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java new file mode 100644 index 000000000000..556c30839688 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.service; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFileMetaSerializer; +import org.apache.paimon.service.network.NetworkUtils; +import org.apache.paimon.service.network.stats.DisabledServiceRequestStats; +import org.apache.paimon.service.server.KvQueryServer; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.query.LocalTableQuery; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; + +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; + +/** Operator for query executor. */ +public class QueryExecutorOperator extends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final long serialVersionUID = 1L; + + private final Table table; + + private transient LocalTableQuery query; + + private transient IOManager ioManager; + + public QueryExecutorOperator(Table table) { + this.table = table; + } + + public static RowType outputType() { + return RowType.of(DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + this.ioManager = + IOManager.create( + getContainingTask() + .getEnvironment() + .getIOManager() + .getSpillingDirectoriesPaths()); + this.query = ((FileStoreTable) table).newLocalTableQuery().withIOManager(ioManager); + KvQueryServer server = + new KvQueryServer( + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks(), + NetworkUtils.findHostAddress(), + Collections.singletonList(0).iterator(), + 1, + 1, + query, + new DisabledServiceRequestStats()); + + try { + server.start(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + InetSocketAddress address = server.getServerAddress(); + this.output.collect( + new StreamRecord<>( + GenericRow.of( + getRuntimeContext().getNumberOfParallelSubtasks(), + getRuntimeContext().getIndexOfThisSubtask(), + BinaryString.fromString(address.getHostName()), + address.getPort()))); + } + + @Override + public void processElement(StreamRecord streamRecord) throws Exception { + InternalRow row = streamRecord.getValue(); + BinaryRow partition = deserializeBinaryRow(row.getBinary(1)); + int bucket = row.getInt(2); + DataFileMetaSerializer fileMetaSerializer = new DataFileMetaSerializer(); + List beforeFiles = fileMetaSerializer.deserializeList(row.getBinary(3)); + List dataFiles = fileMetaSerializer.deserializeList(row.getBinary(4)); + query.refreshFiles(partition, bucket, beforeFiles, dataFiles); + } + + @Override + public void close() throws Exception { + super.close(); + if (query != null) { + query.close(); + } + if (ioManager != null) { + ioManager.close(); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java new file mode 100644 index 000000000000..4cd34ab26e40 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.service; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.utils.InternalTypeInfo; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.ChannelComputer; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.StreamTableScan; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.system.FileMonitorTable; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; + +/** + * This is the single (non-parallel) monitoring task, it is responsible for: + * + *
    + *
  1. Read incremental files from tables. + *
  2. Assigning them to downstream tasks for further processing. + *
+ */ +public class QueryFileMonitor extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private final Table table; + private final long monitorInterval; + + private transient SourceContext ctx; + private transient StreamTableScan scan; + private transient TableRead read; + + private volatile boolean isRunning = true; + + public QueryFileMonitor(Table table) { + this.table = table; + this.monitorInterval = + Options.fromMap(table.options()) + .get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL) + .toMillis(); + } + + @Override + public void open(Configuration parameters) throws Exception { + FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable) table); + ReadBuilder readBuilder = monitorTable.newReadBuilder(); + this.scan = readBuilder.newStreamScan(); + this.read = readBuilder.newRead(); + } + + @Override + public void run(SourceContext ctx) throws Exception { + this.ctx = ctx; + while (isRunning) { + boolean isEmpty; + synchronized (ctx.getCheckpointLock()) { + if (!isRunning) { + return; + } + isEmpty = doScan(); + } + + if (isEmpty) { + Thread.sleep(monitorInterval); + } + } + } + + private boolean doScan() throws Exception { + List records = new ArrayList<>(); + read.createReader(scan.plan()).forEachRemaining(records::add); + records.forEach(ctx::collect); + return records.isEmpty(); + } + + @Override + public void cancel() { + // this is to cover the case where cancel() is called before the run() + if (ctx != null) { + synchronized (ctx.getCheckpointLock()) { + isRunning = false; + } + } else { + isRunning = false; + } + } + + public static DataStream build(StreamExecutionEnvironment env, Table table) { + return env.addSource( + new QueryFileMonitor(table), + "FileMonitor-" + table.name(), + InternalTypeInfo.fromRowType(FileMonitorTable.getRowType())); + } + + public static ChannelComputer createChannelComputer() { + return new FileMonitorChannelComputer(); + } + + /** A {@link ChannelComputer} to handle rows from {@link FileMonitorTable}. */ + private static class FileMonitorChannelComputer implements ChannelComputer { + + private int numChannels; + + @Override + public void setup(int numChannels) { + this.numChannels = numChannels; + } + + @Override + public int channel(InternalRow row) { + BinaryRow partition = deserializeBinaryRow(row.getBinary(1)); + int bucket = row.getInt(2); + return ChannelComputer.select(partition, bucket, numChannels); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java new file mode 100644 index 000000000000..5b2c13c8488d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.service; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.utils.InternalTypeInfo; +import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; + +/** A class to build Query Service topology. */ +public class QueryService { + + public static void build(StreamExecutionEnvironment env, Table table, int parallelism) { + ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + Preconditions.checkArgument( + conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING, + "Query Service only supports streaming mode."); + + FileStoreTable storeTable = (FileStoreTable) table; + if (storeTable.bucketMode() != BucketMode.FIXED + || storeTable.schema().primaryKeys().isEmpty()) { + throw new UnsupportedOperationException( + "The bucket mode of " + + table.name() + + " is not fixed or the table has no primary key."); + } + + DataStream stream = QueryFileMonitor.build(env, table); + stream = partition(stream, QueryFileMonitor.createChannelComputer(), parallelism); + + QueryExecutorOperator executorOperator = new QueryExecutorOperator(table); + stream.transform( + "Executor", + InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()), + executorOperator) + .setParallelism(parallelism) + .addSink(new QueryAddressRegister(table)) + .setParallelism(1) + .setMaxParallelism(1); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index fb483b73c6d4..51c0f1ad1d4e 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -25,6 +25,7 @@ org.apache.paimon.flink.action.DeleteTagActionFactory org.apache.paimon.flink.action.ResetConsumerActionFactory org.apache.paimon.flink.action.MigrateTableActionFactory org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory +org.apache.paimon.flink.action.QueryServiceActionFactory ### procedure factories org.apache.paimon.flink.procedure.CompactDatabaseProcedure @@ -38,3 +39,4 @@ org.apache.paimon.flink.procedure.RollbackToProcedure org.apache.paimon.flink.procedure.MigrateTableProcedure org.apache.paimon.flink.procedure.MigrateFileProcedure org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure +org.apache.paimon.flink.procedure.QueryServiceProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java index 4903ec9a045d..a487ed4c135c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.query.RemoteTableQuery; import org.apache.paimon.service.ServiceManager; import org.apache.paimon.service.network.stats.DisabledServiceRequestStats; import org.apache.paimon.service.server.KvQueryServer; @@ -34,6 +35,7 @@ import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; import java.io.Closeable; @@ -60,6 +62,38 @@ protected int defaultParallelism() { return 1; } + @Test + public void testQueryServiceLookup() throws Exception { + sql( + "CREATE TABLE DIM (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket' = '2', 'continuous.discovery-interval' = '1ms')"); + CloseableIterator service = streamSqlIter("CALL sys.query_service('default.DIM', 2)"); + RemoteTableQuery query = new RemoteTableQuery(getPaimonTable("DIM")); + + sql("INSERT INTO DIM VALUES (1, 11), (2, 22), (3, 33), (4, 44)"); + Thread.sleep(2000); + + assertThat(query.lookup(row(), 0, row(1))) + .isNotNull() + .extracting(r -> r.getInt(1)) + .isEqualTo(11); + assertThat(query.lookup(row(), 0, row(2))) + .isNotNull() + .extracting(r -> r.getInt(1)) + .isEqualTo(22); + assertThat(query.lookup(row(), 1, row(3))) + .isNotNull() + .extracting(r -> r.getInt(1)) + .isEqualTo(33); + assertThat(query.lookup(row(), 0, row(4))) + .isNotNull() + .extracting(r -> r.getInt(1)) + .isEqualTo(44); + assertThat(query.lookup(row(), 0, row(5))).isNull(); + + service.close(); + query.close(); + } + @Test public void testLookupRemoteTable() throws Throwable { sql("CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT)"); diff --git a/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/network/NetworkUtils.java b/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/network/NetworkUtils.java new file mode 100644 index 000000000000..66b46ba74c2d --- /dev/null +++ b/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/network/NetworkUtils.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.service.network; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.Enumeration; +import java.util.LinkedList; + +/** Utils for network. */ +public class NetworkUtils { + + public static String findHostAddress() throws SocketException, UnknownHostException { + return findLocalAddress().getHostAddress(); + } + + public static InetAddress findLocalAddress() throws UnknownHostException, SocketException { + InetAddress localAddress = InetAddress.getLocalHost(); + if (localAddress.isLoopbackAddress()) { + Enumeration activeNetworkInterfaces = + NetworkInterface.getNetworkInterfaces(); + LinkedList reversedNetworkInterfaces = new LinkedList<>(); + // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order + // on unix-like system. + while (activeNetworkInterfaces.hasMoreElements()) { + reversedNetworkInterfaces.addFirst(activeNetworkInterfaces.nextElement()); + } + + for (NetworkInterface networkInterface : reversedNetworkInterfaces) { + Enumeration addresses = networkInterface.getInetAddresses(); + while (addresses.hasMoreElements()) { + InetAddress address = addresses.nextElement(); + if (address.isLinkLocalAddress() || address.isLoopbackAddress()) { + continue; + } + + // because of Inet6Address.toHostName may add interface at the end if it knows + // about it + if (!(address instanceof Inet4Address)) { + continue; + } + + return InetAddress.getByAddress(address.getAddress()); + } + } + } + return localAddress; + } +}