Skip to content

Commit

Permalink
[flink] support lookup on lsm tree directly
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored and JingsongLi committed Jan 10, 2024
1 parent ec041d3 commit 82c4198
Show file tree
Hide file tree
Showing 15 changed files with 718 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<td>Integer</td>
<td>The parallelism for bootstrap in a single task for lookup join.</td>
</tr>
<tr>
<td><h5>lookup.cache</h5></td>
<td style="word-wrap: break-word;">FULL</td>
<td><p>Enum</p></td>
<td>The cache mode of lookup join.<br /><br />Possible values:<ul><li>"PARTIAL"</li><li>"FULL"</li></ul></td>
</tr>
<tr>
<td><h5>scan.infer-parallelism</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.reader.RecordReader;
Expand Down Expand Up @@ -110,9 +111,14 @@ protected final void write(Table table, Pair<InternalRow, Integer>... rows) thro
}

protected void write(Table table, InternalRow... rows) throws Exception {
write(table, null, rows);
}

protected void write(Table table, IOManager ioManager, InternalRow... rows) throws Exception {
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
write.withIOManager(ioManager);
for (InternalRow row : rows) {
write.write(row);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ public class FlinkConnectorOptions {
.defaultValue(16)
.withDescription("The thread number for lookup async.");

public static final ConfigOption<LookupCacheMode> LOOKUP_CACHE_MODE =
ConfigOptions.key("lookup.cache")
.enumType(LookupCacheMode.class)
.defaultValue(LookupCacheMode.FULL)
.withDescription("The cache mode of lookup join.");

public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
ConfigOptions.key("sink.savepoint.auto-tag")
.booleanType()
Expand Down Expand Up @@ -323,6 +329,15 @@ public static List<ConfigOption<?>> getOptions() {
return list;
}

/** The mode of lookup cache. */
public enum LookupCacheMode {
/** Use partial caching mode. */
PARTIAL,

/** Use full caching mode. */
FULL
}

/** Watermark emit strategy for scan. */
public enum WatermarkEmitStrategy implements DescribedEnum {
ON_PERIODIC(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.lookup.LookupTable.TableBulkLoader;
import org.apache.paimon.flink.lookup.FullCacheLookupTable.TableBulkLoader;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBState;
Expand Down Expand Up @@ -99,6 +101,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable {

private final boolean sequenceFieldEnabled;

private transient TableFileMonitor fileMonitor;

public FileStoreLookupFunction(
Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) {
TableScanUtils.streamingReadingValidate(table);
Expand Down Expand Up @@ -137,6 +141,9 @@ public void open(FunctionContext context) throws Exception {
// we tag this method friendly for testing
void open(String tmpDirectory) throws Exception {
this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
if (!path.mkdirs()) {
throw new RuntimeException("Failed to create dir: " + path);
}
open();
}

Expand All @@ -145,32 +152,47 @@ private void open() throws Exception {
this.refreshInterval =
options.getOptional(LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL)
.orElse(options.get(CONTINUOUS_DISCOVERY_INTERVAL));
this.stateFactory = new RocksDBStateFactory(path.toString(), options, null);
LookupCacheMode cacheMode = options.get(FlinkConnectorOptions.LOOKUP_CACHE_MODE);

List<String> fieldNames = table.rowType().getFieldNames();
int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
RowType rowType = TypeUtils.project(table.rowType(), projection);

PredicateFilter recordFilter = createRecordFilter(projection);
this.lookupTable =
LookupTable.create(
stateFactory,
sequenceFieldEnabled
? rowType.appendDataField(SEQUENCE_NUMBER, DataTypes.BIGINT())
: rowType,
table.primaryKeys(),
joinKeys,
recordFilter,
options.get(LOOKUP_CACHE_ROWS));

switch (cacheMode) {
case FULL:
this.stateFactory = new RocksDBStateFactory(path.toString(), options, null);
this.lookupTable =
LookupTable.create(
stateFactory,
sequenceFieldEnabled
? rowType.appendDataField(
SEQUENCE_NUMBER, DataTypes.BIGINT())
: rowType,
table.primaryKeys(),
joinKeys,
recordFilter,
options.get(LOOKUP_CACHE_ROWS));
this.streamingReader = new TableStreamingReader(table, projection, this.predicate);
bulkLoad(options);
break;
case PARTIAL:
this.lookupTable = LookupTable.create(table, projection, joinKeys, path);
this.fileMonitor = new TableFileMonitor(table, this.predicate);
((PartialCacheLookupTable) lookupTable).refresh(fileMonitor.readChanges());
break;
default:
throw new IllegalArgumentException("Unsupported lookup cache mode: " + cacheMode);
}
this.nextLoadTime = -1;
this.streamingReader = new TableStreamingReader(table, projection, this.predicate);
bulkLoad(options);
}

private void bulkLoad(Options options) throws Exception {
BinaryExternalSortBuffer bulkLoadSorter =
RocksDBState.createBulkLoadSorter(
IOManager.create(path.toString()), new CoreOptions(options));
FullCacheLookupTable lookupTable = (FullCacheLookupTable) this.lookupTable;
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(streamingReader.nextBatch(true, sequenceFieldEnabled))) {
while (batch.hasNext()) {
Expand Down Expand Up @@ -263,15 +285,19 @@ private void checkRefresh() throws Exception {
}

private void refresh() throws Exception {
while (true) {
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(
streamingReader.nextBatch(false, sequenceFieldEnabled))) {
if (!batch.hasNext()) {
return;
if (lookupTable instanceof FullCacheLookupTable) {
while (true) {
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(
streamingReader.nextBatch(false, sequenceFieldEnabled))) {
if (!batch.hasNext()) {
return;
}
((FullCacheLookupTable) this.lookupTable).refresh(batch, sequenceFieldEnabled);
}
this.lookupTable.refresh(batch, sequenceFieldEnabled);
}
} else {
((PartialCacheLookupTable) lookupTable).refresh(fileMonitor.readChanges());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Extractor to extract bucket from the primary key. */
public class FixedBucketKeyExtractor implements KeyAndBucketExtractor<InternalRow> {

private transient InternalRow primaryKey;

private BinaryRow bucketKey;

private final boolean sameBucketKeyAndTrimmedPrimaryKey;

private final int numBuckets;

private final Projection bucketKeyProjection;

private final Projection trimmedPrimaryKeyProjection;

private final Projection partitionProjection;

private final Projection logPrimaryKeyProjection;

public FixedBucketKeyExtractor(TableSchema schema) {
this.numBuckets = new CoreOptions(schema.options()).bucket();
checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
this.sameBucketKeyAndTrimmedPrimaryKey =
schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
this.bucketKeyProjection =
CodeGenUtils.newProjection(
schema.logicalPrimaryKeysType(),
schema.bucketKeys().stream()
.mapToInt(schema.primaryKeys()::indexOf)
.toArray());
this.trimmedPrimaryKeyProjection =
CodeGenUtils.newProjection(
schema.logicalPrimaryKeysType(),
schema.trimmedPrimaryKeys().stream()
.mapToInt(schema.primaryKeys()::indexOf)
.toArray());
this.partitionProjection =
CodeGenUtils.newProjection(
schema.logicalPrimaryKeysType(),
schema.partitionKeys().stream()
.mapToInt(schema.primaryKeys()::indexOf)
.toArray());
this.logPrimaryKeyProjection =
CodeGenUtils.newProjection(
schema.logicalRowType(), schema.projection(schema.primaryKeys()));
}

@Override
public void setRecord(InternalRow record) {
this.primaryKey = record;
}

@Override
public BinaryRow partition() {
return partitionProjection.apply(primaryKey);
}

private BinaryRow bucketKey() {
if (sameBucketKeyAndTrimmedPrimaryKey) {
return trimmedPrimaryKey();
}

if (bucketKey == null) {
bucketKey = bucketKeyProjection.apply(primaryKey);
}
return bucketKey;
}

@Override
public int bucket() {
BinaryRow bucketKey = bucketKey();
return KeyAndBucketExtractor.bucket(
KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), numBuckets);
}

@Override
public BinaryRow trimmedPrimaryKey() {
return trimmedPrimaryKeyProjection.apply(primaryKey);
}

@Override
public BinaryRow logPrimaryKey() {
return logPrimaryKeyProjection.apply(primaryKey);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.paimon.flink.lookup;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.lookup.BulkLoader;

import java.io.IOException;
import java.util.Iterator;
import java.util.function.Predicate;

/** Lookup table of full cache. */
public interface FullCacheLookupTable extends LookupTable {

void refresh(Iterator<InternalRow> input, boolean orderByLastField) throws IOException;

Predicate<InternalRow> recordFilter();

byte[] toKeyBytes(InternalRow row) throws IOException;

byte[] toValueBytes(InternalRow row) throws IOException;

TableBulkLoader createBulkLoader();

/** Bulk loader for the table. */
interface TableBulkLoader {

void write(byte[] key, byte[] value) throws BulkLoader.WriteException, IOException;

void finish() throws IOException;
}
}
Loading

0 comments on commit 82c4198

Please sign in to comment.