Skip to content

Commit

Permalink
[flink] Introduce RemoteTableQuery for Flink Lookup join (apache#2681)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jan 15, 2024
1 parent 0c0ab1b commit a3d1e1f
Show file tree
Hide file tree
Showing 16 changed files with 493 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.AbstractDataTableRead;
import org.apache.paimon.table.source.AppendOnlySplitGenerator;
Expand Down Expand Up @@ -151,7 +151,7 @@ record -> {
}

@Override
public TableQuery newTableQuery() {
public LocalTableQuery newLocalTableQuery() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -97,7 +97,7 @@ default Optional<String> comment() {
@Override
TableCommitImpl newCommit(String commitUser);

TableQuery newTableQuery();
LocalTableQuery newLocalTableQuery();

default BinaryTableStats getSchemaFieldStats(DataFileMeta dataFileMeta) {
return dataFileMeta.valueStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.table.query.TableQueryImpl;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.table.sink.TableWriteImpl;
Expand Down Expand Up @@ -209,7 +208,7 @@ record -> {
}

@Override
public TableQuery newTableQuery() {
return new TableQueryImpl(this);
public LocalTableQuery newLocalTableQuery() {
return new LocalTableQuery(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import static org.apache.paimon.CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;

/** Implementation for {@link TableQuery}. */
public class TableQueryImpl implements TableQuery {
/** Implementation for {@link TableQuery} for caching data and file in local. */
public class LocalTableQuery implements TableQuery {

private final Map<BinaryRow, Map<Integer, LookupLevels>> tableView;

Expand All @@ -68,7 +68,7 @@ public class TableQueryImpl implements TableQuery {

private IOManager ioManager;

public TableQueryImpl(FileStoreTable table) {
public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
this.tableView = new HashMap<>();
FileStore<?> tableStore = table.store();
Expand Down Expand Up @@ -105,7 +105,6 @@ public TableQueryImpl(FileStoreTable table) {
}
}

@Override
public void refreshFiles(
BinaryRow partition,
int bucket,
Expand Down Expand Up @@ -169,13 +168,12 @@ public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) thro
}

@Override
public TableQuery withValueProjection(int[][] projection) {
public LocalTableQuery withValueProjection(int[][] projection) {
this.readerFactoryBuilder.withValueProjection(projection);
return this;
}

@Override
public TableQuery withIOManager(IOManager ioManager) {
public LocalTableQuery withIOManager(IOManager ioManager) {
this.ioManager = ioManager;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.utils.Projection;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

/** A query of Table to perform lookup. */
public interface TableQuery extends Closeable {
Expand All @@ -42,16 +39,8 @@ default TableQuery withValueProjection(int[] projection) {

TableQuery withValueProjection(int[][] projection);

TableQuery withIOManager(IOManager ioManager);

InternalRowSerializer createValueSerializer();

void refreshFiles(
BinaryRow partition,
int bucket,
List<DataFileMeta> beforeFiles,
List<DataFileMeta> dataFiles);

@Nullable
InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
Expand Down Expand Up @@ -1176,7 +1176,7 @@ private void innerTestTableQuery(FileStoreTable table) throws Exception {
List<CommitMessage> commitMessages1 = write.prepareCommit(true, 0);
commit.commit(0, commitMessages1);

TableQuery query = table.newTableQuery();
LocalTableQuery query = table.newLocalTableQuery();
query.withIOManager(ioManager);

refreshTableService(query, commitMessages1);
Expand Down Expand Up @@ -1226,7 +1226,7 @@ private void innerTestTableQuery(FileStoreTable table) throws Exception {
commit.close();
}

private void refreshTableService(TableQuery query, List<CommitMessage> commitMessages) {
private void refreshTableService(LocalTableQuery query, List<CommitMessage> commitMessages) {
for (CommitMessage m : commitMessages) {
CommitMessageImpl msg = (CommitMessageImpl) m;
query.refreshFiles(
Expand Down
2 changes: 2 additions & 0 deletions paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ under the License.
<artifactSet>
<includes combine.children="append">
<include>org.apache.paimon:paimon-bundle</include>
<include>org.apache.paimon:paimon-service-client</include>
<include>org.apache.paimon:paimon-service-runtime</include>
</includes>
</artifactSet>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.lookup;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode;
import org.apache.paimon.flink.FlinkRowData;
Expand Down Expand Up @@ -61,6 +62,7 @@

import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
Expand Down Expand Up @@ -139,10 +141,17 @@ private void open() throws Exception {

if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO
&& new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) {
try {
if (isRemoteServiceAvailable(storeTable)) {
this.lookupTable =
new PrimaryKeyPartialLookupTable(storeTable, projection, path, joinKeys);
} catch (UnsupportedOperationException ignore) {
PrimaryKeyPartialLookupTable.createRemoteTable(
storeTable, projection, joinKeys);
} else {
try {
this.lookupTable =
PrimaryKeyPartialLookupTable.createLocalTable(
storeTable, projection, path, joinKeys);
} catch (UnsupportedOperationException ignore2) {
}
}
}

Expand Down Expand Up @@ -219,6 +228,11 @@ private void checkRefresh() throws Exception {
nextLoadTime = System.currentTimeMillis() + refreshInterval.toMillis();
}

@VisibleForTesting
LookupTable lookupTable() {
return lookupTable;
}

private void refresh() throws Exception {
lookupTable.refresh();
}
Expand Down
Loading

0 comments on commit a3d1e1f

Please sign in to comment.