Skip to content

Commit

Permalink
PrimaryKeyPartialLookupTable support cache row filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Sxnan committed Oct 12, 2024
1 parent 594e894 commit cc05788
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import org.apache.paimon.mergetree.LookupFile;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;

Expand Down Expand Up @@ -79,6 +81,8 @@ public class LocalTableQuery implements TableQuery {
private final RowType rowType;
private final RowType partitionType;

@Nullable private Filter<InternalRow> cacheRowFilter;

public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
this.tableView = new HashMap<>();
Expand Down Expand Up @@ -154,12 +158,20 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
keyComparatorSupplier.get(),
readerFactoryBuilder.keyType(),
new LookupLevels.KeyValueProcessor(readerFactoryBuilder.readValueType()),
file ->
factory.createRecordReader(
file.schemaId(),
file.fileName(),
file.fileSize(),
file.level()),
file -> {
RecordReader<KeyValue> reader =
factory.createRecordReader(
file.schemaId(),
file.fileName(),
file.fileSize(),
file.level());
if (cacheRowFilter != null) {
reader =
reader.filter(
keyValue -> cacheRowFilter.test(keyValue.value()));
}
return reader;
},
file ->
Preconditions.checkNotNull(ioManager, "IOManager is required.")
.createChannel(
Expand Down Expand Up @@ -206,6 +218,11 @@ public LocalTableQuery withIOManager(IOManager ioManager) {
return this;
}

public LocalTableQuery withCacheRowFilter(Filter<InternalRow> cacheRowFilter) {
this.cacheRowFilter = cacheRowFilter;
return this;
}

@Override
public InternalRowSerializer createValueSerializer() {
return InternalSerializers.create(readerFactoryBuilder.readValueType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ProjectedRow;

import javax.annotation.Nullable;
Expand All @@ -41,23 +42,21 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Function;

/** Lookup table for primary key which supports to read the LSM tree directly. */
public class PrimaryKeyPartialLookupTable implements LookupTable {

private final Function<Predicate, QueryExecutor> executorFactory;
private final QueryExecutorFactory executorFactory;
private final FixedBucketFromPkExtractor extractor;
@Nullable private final ProjectedRow keyRearrange;
@Nullable private final ProjectedRow trimmedKeyRearrange;

private Predicate specificPartition;
@Nullable private Filter<InternalRow> cacheRowFilter;
private QueryExecutor queryExecutor;

private PrimaryKeyPartialLookupTable(
Function<Predicate, QueryExecutor> executorFactory,
FileStoreTable table,
List<String> joinKey) {
QueryExecutorFactory executorFactory, FileStoreTable table, List<String> joinKey) {
this.executorFactory = executorFactory;

if (table.bucketMode() != BucketMode.HASH_FIXED) {
Expand Down Expand Up @@ -103,7 +102,7 @@ public void specificPartitionFilter(Predicate filter) {

@Override
public void open() throws Exception {
this.queryExecutor = executorFactory.apply(specificPartition);
this.queryExecutor = executorFactory.create(specificPartition, cacheRowFilter);
refresh();
}

Expand Down Expand Up @@ -135,6 +134,11 @@ public void refresh() {
queryExecutor.refresh();
}

@Override
public void specifyCacheRowFilter(Filter<InternalRow> filter) {
this.cacheRowFilter = filter;
}

@Override
public void close() throws IOException {
if (queryExecutor != null) {
Expand All @@ -149,21 +153,28 @@ public static PrimaryKeyPartialLookupTable createLocalTable(
List<String> joinKey,
Set<Integer> requireCachedBucketIds) {
return new PrimaryKeyPartialLookupTable(
filter ->
(filter, cacheRowFilter) ->
new LocalQueryExecutor(
new LookupFileStoreTable(table, joinKey),
projection,
tempPath,
filter,
requireCachedBucketIds),
requireCachedBucketIds,
cacheRowFilter),
table,
joinKey);
}

public static PrimaryKeyPartialLookupTable createRemoteTable(
FileStoreTable table, int[] projection, List<String> joinKey) {
return new PrimaryKeyPartialLookupTable(
filter -> new RemoteQueryExecutor(table, projection), table, joinKey);
(filter, cacheRowFilter) -> new RemoteQueryExecutor(table, projection),
table,
joinKey);
}

interface QueryExecutorFactory {
QueryExecutor create(Predicate filter, @Nullable Filter<InternalRow> cacheRowFilter);
}

interface QueryExecutor extends Closeable {
Expand All @@ -183,12 +194,17 @@ private LocalQueryExecutor(
int[] projection,
File tempPath,
@Nullable Predicate filter,
Set<Integer> requireCachedBucketIds) {
Set<Integer> requireCachedBucketIds,
@Nullable Filter<InternalRow> cacheRowFilter) {
this.tableQuery =
table.newLocalTableQuery()
.withValueProjection(projection)
.withIOManager(new IOManagerImpl(tempPath.toString()));

if (cacheRowFilter != null) {
this.tableQuery.withCacheRowFilter(cacheRowFilter);
}

this.scan =
table.newReadBuilder()
.withFilter(filter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,27 @@ public void testPartialLookupTable() throws Exception {
assertThat(result).hasSize(0);
}

@Test
public void testPartialLookupTableWithRowFilter() throws Exception {
Options options = new Options();
options.set(CoreOptions.BUCKET.key(), "2");
options.set(CoreOptions.BUCKET_KEY.key(), "f0");
FileStoreTable dimTable = createTable(singletonList("f0"), options);
write(dimTable, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222));

PrimaryKeyPartialLookupTable table =
PrimaryKeyPartialLookupTable.createLocalTable(
dimTable, new int[] {0, 2}, tempDir.toFile(), ImmutableList.of("f0"), null);
table.specifyCacheRowFilter(row -> row.getInt(0) < 2);
table.open();

List<InternalRow> result = table.get(row(1, 11));
assertThat(result).hasSize(1);

result = table.get(row(2, 22));
assertThat(result).isEmpty();
}

@Test
public void testPartialLookupTableWithProjection() throws Exception {
FileStoreTable dimTable = createDimTable();
Expand Down Expand Up @@ -989,15 +1010,4 @@ private static void assertRow(InternalRow resultRow, int... expected) {
assertThat(results).containsExactly(expected);
assertThat(resultRow.getFieldCount()).isEqualTo(expected.length);
}

private void writeAndCommit(FileStoreTable table, InternalRow... rows) throws Exception {
BatchWriteBuilder builder = table.newBatchWriteBuilder();
try (BatchTableWrite writer = builder.newWrite();
BatchTableCommit commiter = builder.newCommit()) {
for (InternalRow row : rows) {
writer.write(row, 0);
}
commiter.commit(writer.prepareCommit());
}
}
}

0 comments on commit cc05788

Please sign in to comment.