Skip to content

Commit

Permalink
[flink] Refactor FileStoreLookupFunction to abstract more into Lookup…
Browse files Browse the repository at this point in the history
…Table
  • Loading branch information
JingsongLi committed Jan 10, 2024
1 parent 82c4198 commit b13d99f
Show file tree
Hide file tree
Showing 15 changed files with 432 additions and 504 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@
</tr>
<tr>
<td><h5>lookup.cache</h5></td>
<td style="word-wrap: break-word;">FULL</td>
<td style="word-wrap: break-word;">AUTO</td>
<td><p>Enum</p></td>
<td>The cache mode of lookup join.<br /><br />Possible values:<ul><li>"PARTIAL"</li><li>"FULL"</li></ul></td>
<td>The cache mode of lookup join.<br /><br />Possible values:<ul><li>"AUTO"</li><li>"FULL"</li></ul></td>
</tr>
<tr>
<td><h5>scan.infer-parallelism</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import javax.annotation.Nullable;

import java.util.List;

/** A {@link java.util.function.Predicate} to filter {@link InternalRow}. */
public class PredicateFilter implements java.util.function.Predicate<InternalRow> {
/** A {@link Filter} to filter {@link InternalRow}. */
public class PredicateFilter implements Filter<InternalRow> {

private final RowDataToObjectArrayConverter arrayConverter;
@Nullable private final Predicate predicate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public class FlinkConnectorOptions {
public static final ConfigOption<LookupCacheMode> LOOKUP_CACHE_MODE =
ConfigOptions.key("lookup.cache")
.enumType(LookupCacheMode.class)
.defaultValue(LookupCacheMode.FULL)
.defaultValue(LookupCacheMode.AUTO)
.withDescription("The cache mode of lookup join.");

public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
Expand Down Expand Up @@ -331,8 +331,8 @@ public static List<ConfigOption<?>> getOptions() {

/** The mode of lookup cache. */
public enum LookupCacheMode {
/** Use partial caching mode. */
PARTIAL,
/** Auto mode, try to use partial mode. */
AUTO,

/** Use full caching mode. */
FULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,18 @@

package org.apache.paimon.flink.lookup;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.lookup.FullCacheLookupTable.TableBulkLoader;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBState;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateFilter;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.PartialRow;
import org.apache.paimon.utils.TypeUtils;

import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
Expand All @@ -66,17 +52,18 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;

/** A lookup {@link TableFunction} for file store. */
public class FileStoreLookupFunction implements Serializable, Closeable {
Expand All @@ -92,16 +79,10 @@ public class FileStoreLookupFunction implements Serializable, Closeable {

private transient Duration refreshInterval;
private transient File path;
private transient RocksDBStateFactory stateFactory;
private transient LookupTable lookupTable;

// timestamp when cache expires
private transient long nextLoadTime;
private transient TableStreamingReader streamingReader;

private final boolean sequenceFieldEnabled;

private transient TableFileMonitor fileMonitor;

public FileStoreLookupFunction(
Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) {
Expand All @@ -128,9 +109,6 @@ public FileStoreLookupFunction(
}

this.predicate = predicate;
this.sequenceFieldEnabled =
table.primaryKeys().size() > 0
&& new CoreOptions(table.options()).sequenceField().isPresent();
}

public void open(FunctionContext context) throws Exception {
Expand All @@ -148,79 +126,39 @@ void open(String tmpDirectory) throws Exception {
}

private void open() throws Exception {
this.nextLoadTime = -1;

Options options = Options.fromMap(table.options());
this.refreshInterval =
options.getOptional(LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL)
.orElse(options.get(CONTINUOUS_DISCOVERY_INTERVAL));
LookupCacheMode cacheMode = options.get(FlinkConnectorOptions.LOOKUP_CACHE_MODE);

List<String> fieldNames = table.rowType().getFieldNames();
int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
RowType rowType = TypeUtils.project(table.rowType(), projection);
FileStoreTable storeTable = (FileStoreTable) table;

PredicateFilter recordFilter = createRecordFilter(projection);

switch (cacheMode) {
case FULL:
this.stateFactory = new RocksDBStateFactory(path.toString(), options, null);
if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO
&& new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) {
try {
this.lookupTable =
LookupTable.create(
stateFactory,
sequenceFieldEnabled
? rowType.appendDataField(
SEQUENCE_NUMBER, DataTypes.BIGINT())
: rowType,
table.primaryKeys(),
joinKeys,
recordFilter,
options.get(LOOKUP_CACHE_ROWS));
this.streamingReader = new TableStreamingReader(table, projection, this.predicate);
bulkLoad(options);
break;
case PARTIAL:
this.lookupTable = LookupTable.create(table, projection, joinKeys, path);
this.fileMonitor = new TableFileMonitor(table, this.predicate);
((PartialCacheLookupTable) lookupTable).refresh(fileMonitor.readChanges());
break;
default:
throw new IllegalArgumentException("Unsupported lookup cache mode: " + cacheMode);
}
this.nextLoadTime = -1;
}

private void bulkLoad(Options options) throws Exception {
BinaryExternalSortBuffer bulkLoadSorter =
RocksDBState.createBulkLoadSorter(
IOManager.create(path.toString()), new CoreOptions(options));
FullCacheLookupTable lookupTable = (FullCacheLookupTable) this.lookupTable;
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(streamingReader.nextBatch(true, sequenceFieldEnabled))) {
while (batch.hasNext()) {
InternalRow row = batch.next();
if (lookupTable.recordFilter().test(row)) {
bulkLoadSorter.write(
GenericRow.of(
lookupTable.toKeyBytes(row), lookupTable.toValueBytes(row)));
}
new PrimaryKeyPartialLookupTable(storeTable, predicate, projection, path);
} catch (UnsupportedOperationException ignore) {
}
}

MutableObjectIterator<BinaryRow> keyIterator = bulkLoadSorter.sortedIterator();
BinaryRow row = new BinaryRow(2);
TableBulkLoader bulkLoader = lookupTable.createBulkLoader();
try {
while ((row = keyIterator.next(row)) != null) {
bulkLoader.write(row.getBinary(0), row.getBinary(1));
}
} catch (BulkLoader.WriteException e) {
throw new RuntimeException(
"Exception in bulkLoad, the most suspicious reason is that "
+ "your data contains duplicates, please check your lookup table. ",
e.getCause());
if (lookupTable == null) {
FullCacheLookupTable.Context context =
new FullCacheLookupTable.Context(
storeTable,
projection,
predicate,
path,
createRecordFilter(projection),
joinKeys);
this.lookupTable = FullCacheLookupTable.create(context, options.get(LOOKUP_CACHE_ROWS));
}

bulkLoader.finish();
bulkLoadSorter.clear();
lookupTable.open();
}

private PredicateFilter createRecordFilter(int[] projection) {
Expand All @@ -245,9 +183,6 @@ public Collection<RowData> lookup(RowData keyRow) {
List<InternalRow> results = lookupTable.get(new FlinkRowWrapper(keyRow));
List<RowData> rows = new ArrayList<>(results.size());
for (InternalRow matchedRow : results) {
if (sequenceFieldEnabled) {
matchedRow = new PartialRow(matchedRow.getFieldCount() - 1, matchedRow);
}
rows.add(new FlinkRowData(matchedRow));
}
return rows;
Expand Down Expand Up @@ -285,27 +220,14 @@ private void checkRefresh() throws Exception {
}

private void refresh() throws Exception {
if (lookupTable instanceof FullCacheLookupTable) {
while (true) {
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(
streamingReader.nextBatch(false, sequenceFieldEnabled))) {
if (!batch.hasNext()) {
return;
}
((FullCacheLookupTable) this.lookupTable).refresh(batch, sequenceFieldEnabled);
}
}
} else {
((PartialCacheLookupTable) lookupTable).refresh(fileMonitor.readChanges());
}
lookupTable.refresh();
}

@Override
public void close() throws IOException {
if (stateFactory != null) {
stateFactory.close();
stateFactory = null;
if (lookupTable != null) {
lookupTable.close();
lookupTable = null;
}

if (path != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Extractor to extract bucket from the primary key. */
public class FixedBucketKeyExtractor implements KeyAndBucketExtractor<InternalRow> {
public class FixedBucketFromPkExtractor implements KeyAndBucketExtractor<InternalRow> {

private transient InternalRow primaryKey;

Expand All @@ -47,7 +47,7 @@ public class FixedBucketKeyExtractor implements KeyAndBucketExtractor<InternalRo

private final Projection logPrimaryKeyProjection;

public FixedBucketKeyExtractor(TableSchema schema) {
public FixedBucketFromPkExtractor(TableSchema schema) {
this.numBuckets = new CoreOptions(schema.options()).bucket();
checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
this.sameBucketKeyAndTrimmedPrimaryKey =
Expand Down
Loading

0 comments on commit b13d99f

Please sign in to comment.