Skip to content

Commit

Permalink
[flink] Introduce Query Service (apache#2709)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jan 17, 2024
1 parent 996eda6 commit 37683e6
Show file tree
Hide file tree
Showing 15 changed files with 698 additions and 26 deletions.
44 changes: 28 additions & 16 deletions docs/content/how-to/lookup-joins.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ under the License.

Paimon supports lookup joins on tables with primary keys and append tables in Flink. The following example illustrates this feature.

### Prepare
## Prepare

First, let's create a Paimon table and update it in real-time.

Expand Down Expand Up @@ -69,7 +69,7 @@ CREATE TEMPORARY TABLE Orders (
);
```

### Normal Lookup
## Normal Lookup

You can now use `customers` in a lookup join query.

Expand All @@ -82,7 +82,7 @@ FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
```

### Retry Lookup
## Retry Lookup

If the records of `Orders` (main table) join missing because the corresponding data of `customers` (lookup table) is not ready.
You can consider using Flink's [Delayed Retry Strategy For Lookup](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/#3-enable-delayed-retry-strategy-for-lookup).
Expand All @@ -98,7 +98,7 @@ FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
```

### Async Retry Lookup
## Async Retry Lookup

The problem with synchronous retry is that one record will block subsequent records, causing the entire job to be blocked.
You can consider using async + allow_unordered to avoid blocking, the records that join missing will no longer block
Expand All @@ -120,22 +120,34 @@ your streaming job may be blocked. You can try to use `audit_log` system table f
(convert CDC stream to append stream).
{{< /hint >}}

### Performance
## Query Service

The lookup join operator will maintain a RocksDB cache locally and pull the latest updates of the table in real time. Lookup join operator will only pull the necessary data, so your filter conditions are very important for performance.
You can run a Flink Streaming Job to start query service for the table. When QueryService exists, Flink Lookup Join
will prioritize obtaining data from it, which will effectively improve query performance.

This feature is only suitable for tables containing at most tens of millions of records to avoid excessive use of local disks.
{{< tabs "query-service" >}}

## RocksDB Cache Options

The following options allow users to finely adjust RocksDB for better performance. You can either specify them in table properties or in dynamic table hints.
{{< tab "Flink SQL" >}}

```sql
-- dynamic table hints example
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o JOIN customers /*+ OPTIONS('lookup.cache-rows'='20000') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
CALL sys.query_service('database_name.table_name', parallelism);
```

{{< generated/rocksdb_configuration >}}
{{< /tab >}}

{{< tab "Flink Action" >}}

```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
query_service \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--parallelism <parallelism>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
{{< /tab >}}
{{< /tabs >}}
2 changes: 2 additions & 0 deletions docs/content/how-to/writing-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ INSERT INTO MyTable SELECT ...
{{< /tabs >}}

## Overwriting

Note :If `spark.sql.sources.partitionOverwriteMode` is set to `dynamic` by default in Spark,
in order to ensure that the insert overwrite function of the Paimon table can be used normally,
`spark.sql.extensions` should be set to `org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions`.

### Overwriting the Whole Table

For unpartitioned tables, Paimon supports overwriting the whole table.
Expand Down
8 changes: 7 additions & 1 deletion docs/content/maintenance/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,10 @@ Flink connector options for paimon.

Spark connector options for paimon.

{{< generated/spark_connector_configuration >}}
{{< generated/spark_connector_configuration >}}

## RocksDB Options

The following options allow users to finely adjust RocksDB for better performance. You can either specify them in table properties or in dynamic table hints.

{{< generated/rocksdb_configuration >}}
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels);
}

/** TODO remove synchronized and supports multiple thread to lookup. */
@Nullable
@Override
public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {
public synchronized InternalRow lookup(BinaryRow partition, int bucket, InternalRow key)
throws IOException {
Map<Integer, LookupLevels> buckets = tableView.get(partition);
if (buckets == null || buckets.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.flink.service.QueryService;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.Map;
import java.util.Optional;

/** Factory to create QueryService Action. */
public class QueryServiceActionFactory implements ActionFactory {

public static final String IDENTIFIER = "query_service";

public static final String PARALLELISM = "parallelism";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
Map<String, String> tableConfig = optionalConfigMap(params, TABLE_CONF);
String parallStr = params.get(PARALLELISM);
int parallelism = parallStr == null ? 1 : Integer.parseInt(parallStr);
Action action =
new TableActionBase(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig) {
@Override
public void run() throws Exception {
QueryService.build(env, table.copy(tableConfig), parallelism);
execute("Query Service job");
}
};
return Optional.of(action);
}

@Override
public void printHelp() {
System.out.println(
"Action \"query-service\" runs a dedicated job starting query service for a table.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" query-service --warehouse <warehouse-path> --database <database-name> --table <table-name> --parallelism <parallelism>"
+ "[--catalog_conf <key>=<value> [--catalog_conf <key>=<value> ...]] "
+ "[--table_conf <key>=<value> [--table_conf <key>=<value> ...]] ");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ protected String[] execute(
action.withStreamExecutionEnvironment(env);
action.build();

ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
JobClient jobClient = env.executeAsync(name);
return execute(jobClient, conf.get(TABLE_DML_SYNC));
return execute(env, defaultJobName);
}

protected String[] execute(ProcedureContext procedureContext, JobClient jobClient) {
Expand All @@ -70,6 +67,13 @@ protected String[] execute(ProcedureContext procedureContext, JobClient jobClien
return execute(jobClient, conf.get(TABLE_DML_SYNC));
}

protected String[] execute(StreamExecutionEnvironment env, String defaultJobName)
throws Exception {
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
return execute(env.executeAsync(name), conf.get(TABLE_DML_SYNC));
}

private String[] execute(JobClient jobClient, boolean dmlSync) {
String jobId = jobClient.getJobID().toString();
if (dmlSync) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.service.QueryService;
import org.apache.paimon.table.Table;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.procedure.ProcedureContext;

/**
* Query Service procedure. Usage:
*
* <pre><code>
* CALL sys.query_service('tableId', 'parallelism')
* </code></pre>
*/
public class QueryServiceProcedure extends ProcedureBase {

public static final String IDENTIFIER = "query_service";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(ProcedureContext procedureContext, String tableId, int parallelism)
throws Exception {
Table table = catalog.getTable(Identifier.fromString(tableId));
StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
QueryService.build(env, table, parallelism);
return execute(env, IDENTIFIER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.service.client.KvQueryClient;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.Projection;
Expand All @@ -47,11 +48,11 @@ public class RemoteTableQuery implements TableQuery {
private final KvQueryClient client;
private final InternalRowSerializer keySerializer;

private int[] projection;
@Nullable private int[] projection;

public RemoteTableQuery(FileStoreTable table) {
this.table = table;
ServiceManager manager = table.store().newServiceManager();
public RemoteTableQuery(Table table) {
this.table = (FileStoreTable) table;
ServiceManager manager = this.table.store().newServiceManager();
this.client = new KvQueryClient(new QueryLocationImpl(manager), 1);
this.keySerializer =
InternalSerializers.create(TypeUtils.project(table.rowType(), table.primaryKeys()));
Expand Down Expand Up @@ -79,6 +80,10 @@ public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) thro
throw new IOException(e.getCause());
}

if (projection == null) {
return row;
}

if (row == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.service;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.net.InetSocketAddress;
import java.util.TreeMap;

import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;

/** Operator for address server to register addresses to {@link ServiceManager}. */
public class QueryAddressRegister extends RichSinkFunction<InternalRow> {

private final Table table;

private transient int numberExecutors;
private transient TreeMap<Integer, InetSocketAddress> executors;

public QueryAddressRegister(Table table) {
this.table = table;
}

@Override
public void open(Configuration parameters) throws Exception {
this.executors = new TreeMap<>();
}

@Override
public void invoke(InternalRow row, SinkFunction.Context context) {
int numberExecutors = row.getInt(0);
if (this.numberExecutors != 0 && this.numberExecutors != numberExecutors) {
throw new IllegalArgumentException(
String.format(
"Number Executors can not be changed! Old %s , New %s .",
this.numberExecutors, numberExecutors));
}
this.numberExecutors = numberExecutors;

int executorId = row.getInt(1);
String hostname = row.getString(2).toString();
int port = row.getInt(3);

executors.put(executorId, new InetSocketAddress(hostname, port));

if (executors.size() == numberExecutors) {
FileStoreTable storeTable = (FileStoreTable) table;
ServiceManager manager = storeTable.store().newServiceManager();
manager.resetService(
PRIMARY_KEY_LOOKUP, executors.values().toArray(new InetSocketAddress[0]));
}
}
}
Loading

0 comments on commit 37683e6

Please sign in to comment.