Skip to content

Commit

Permalink
FullCacheLookupTable support cache row filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Sxnan committed Oct 12, 2024
1 parent c41d23c commit 594e894
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Predicate;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -110,6 +111,19 @@ protected final void write(Table table, Pair<InternalRow, Integer>... rows) thro
}
}

protected void writeWithBucketAssigner(
Table table, Function<InternalRow, Integer> bucketAssigner, InternalRow... rows)
throws Exception {
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
for (InternalRow row : rows) {
write.write(row, bucketAssigner.apply(row));
}
commit.commit(write.prepareCommit());
}
}

protected void write(Table table, InternalRow... rows) throws Exception {
write(table, null, rows);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
Expand Down Expand Up @@ -96,6 +97,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable {

protected FunctionContext functionContext;

@Nullable private Filter<InternalRow> cacheRowFilter;

public FileStoreLookupFunction(
Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) {
if (!TableScanUtils.supportCompactDiffStreamingReading(table)) {
Expand Down Expand Up @@ -195,6 +198,9 @@ private void open() throws Exception {
}

refreshDynamicPartition(false);
if (cacheRowFilter != null) {
lookupTable.specifyCacheRowFilter(cacheRowFilter);
}
lookupTable.open();
}

Expand Down Expand Up @@ -361,4 +367,8 @@ protected Set<Integer> getRequireCachedBucketIds() {
// TODO: Implement the method when Flink support bucket shuffle for lookup join.
return null;
}

protected void setCacheRowFilter(@Nullable Filter<InternalRow> cacheRowFilter) {
this.cacheRowFilter = cacheRowFilter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.paimon.utils.ExecutorUtils;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.PartialRow;
import org.apache.paimon.utils.TypeUtils;
Expand Down Expand Up @@ -85,6 +86,7 @@ public abstract class FullCacheLookupTable implements LookupTable {
private Future<?> refreshFuture;
private LookupStreamingReader reader;
private Predicate specificPartition;
@Nullable private Filter<InternalRow> cacheRowFilter;

public FullCacheLookupTable(Context context) {
this.table = context.table;
Expand Down Expand Up @@ -138,6 +140,11 @@ public void specificPartitionFilter(Predicate filter) {
this.specificPartition = filter;
}

@Override
public void specifyCacheRowFilter(Filter<InternalRow> filter) {
this.cacheRowFilter = filter;
}

protected void openStateFactory() throws Exception {
this.stateFactory =
new RocksDBStateFactory(
Expand All @@ -154,7 +161,8 @@ protected void bootstrap() throws Exception {
context.table,
context.projection,
scanPredicate,
context.requiredCachedBucketIds);
context.requiredCachedBucketIds,
cacheRowFilter);
BinaryExternalSortBuffer bulkLoadSorter =
RocksDBState.createBulkLoadSorter(
IOManager.create(context.tempPath.toString()), context.table.coreOptions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.FunctionWithIOException;
import org.apache.paimon.utils.TypeUtils;

Expand All @@ -51,6 +52,7 @@ public class LookupStreamingReader {

private final LookupFileStoreTable table;
private final int[] projection;
@Nullable private final Filter<InternalRow> cacheRowFilter;
private final ReadBuilder readBuilder;
@Nullable private final Predicate projectedPredicate;
private final StreamTableScan scan;
Expand All @@ -59,9 +61,11 @@ public LookupStreamingReader(
LookupFileStoreTable table,
int[] projection,
@Nullable Predicate predicate,
Set<Integer> requireCachedBucketIds) {
Set<Integer> requireCachedBucketIds,
@Nullable Filter<InternalRow> cacheRowFilter) {
this.table = table;
this.projection = projection;
this.cacheRowFilter = cacheRowFilter;
this.readBuilder =
this.table
.newReadBuilder()
Expand Down Expand Up @@ -125,6 +129,10 @@ public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws Except
if (projectedPredicate != null) {
reader = reader.filter(projectedPredicate::test);
}

if (cacheRowFilter != null) {
reader = reader.filter(cacheRowFilter);
}
return reader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.utils.Filter;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -35,4 +36,6 @@ public interface LookupTable extends Closeable {
List<InternalRow> get(InternalRow key) throws IOException;

void refresh() throws Exception;

void specifyCacheRowFilter(Filter<InternalRow> filter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,129 @@ public void testNoPrimaryKeyTableFilter() throws Exception {
assertRow(result.get(1), 1, 11, 111);
}

@Test
public void testPkTableWithCacheRowFilter() throws Exception {
FileStoreTable storeTable = createTable(singletonList("f0"), new Options());
writeWithBucketAssigner(
storeTable, row -> 0, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222));

FullCacheLookupTable.Context context =
new FullCacheLookupTable.Context(
storeTable,
new int[] {0, 1, 2},
null,
null,
tempDir.toFile(),
singletonList("f0"),
null);
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
assertThat(table).isInstanceOf(PrimaryKeyLookupTable.class);
table.specifyCacheRowFilter(row -> row.getInt(0) < 2);
table.open();

List<InternalRow> res = table.get(GenericRow.of(1));
assertThat(res).hasSize(1);
assertRow(res.get(0), 1, 11, 111);

res = table.get(GenericRow.of(2));
assertThat(res).isEmpty();

writeWithBucketAssigner(
storeTable, row -> 0, GenericRow.of(0, 0, 0), GenericRow.of(3, 33, 333));
res = table.get(GenericRow.of(0));
assertThat(res).isEmpty();

table.refresh();
res = table.get(GenericRow.of(0));
assertThat(res).hasSize(1);
assertRow(res.get(0), 0, 0, 0);

res = table.get(GenericRow.of(3));
assertThat(res).isEmpty();
}

@Test
public void testNoPkTableWithCacheRowFilter() throws Exception {
FileStoreTable storeTable = createTable(emptyList(), new Options());
writeWithBucketAssigner(
storeTable, row -> 0, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222));

FullCacheLookupTable.Context context =
new FullCacheLookupTable.Context(
storeTable,
new int[] {0, 1, 2},
null,
null,
tempDir.toFile(),
singletonList("f0"),
null);
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
assertThat(table).isInstanceOf(NoPrimaryKeyLookupTable.class);
table.specifyCacheRowFilter(row -> row.getInt(0) < 2);
table.open();

List<InternalRow> res = table.get(GenericRow.of(1));
assertThat(res).hasSize(1);
assertRow(res.get(0), 1, 11, 111);

res = table.get(GenericRow.of(2));
assertThat(res).isEmpty();

writeWithBucketAssigner(
storeTable, row -> 0, GenericRow.of(0, 0, 0), GenericRow.of(3, 33, 333));
res = table.get(GenericRow.of(0));
assertThat(res).isEmpty();

table.refresh();
res = table.get(GenericRow.of(0));
assertThat(res).hasSize(1);
assertRow(res.get(0), 0, 0, 0);

res = table.get(GenericRow.of(3));
assertThat(res).isEmpty();
}

@Test
public void testSecKeyTableWithCacheRowFilter() throws Exception {
FileStoreTable storeTable = createTable(singletonList("f0"), new Options());
writeWithBucketAssigner(
storeTable, row -> 0, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222));

FullCacheLookupTable.Context context =
new FullCacheLookupTable.Context(
storeTable,
new int[] {0, 1, 2},
null,
null,
tempDir.toFile(),
singletonList("f1"),
null);
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
assertThat(table).isInstanceOf(SecondaryIndexLookupTable.class);
table.specifyCacheRowFilter(row -> row.getInt(1) < 22);
table.open();

List<InternalRow> res = table.get(GenericRow.of(11));
assertThat(res).hasSize(1);
assertRow(res.get(0), 1, 11, 111);

res = table.get(GenericRow.of(22));
assertThat(res).isEmpty();

writeWithBucketAssigner(
storeTable, row -> 0, GenericRow.of(0, 0, 0), GenericRow.of(3, 33, 333));
res = table.get(GenericRow.of(0));
assertThat(res).isEmpty();

table.refresh();
res = table.get(GenericRow.of(0));
assertThat(res).hasSize(1);
assertRow(res.get(0), 0, 0, 0);

res = table.get(GenericRow.of(33));
assertThat(res).isEmpty();
}

@Test
public void testPartialLookupTable() throws Exception {
FileStoreTable dimTable = createDimTable();
Expand Down Expand Up @@ -866,4 +989,15 @@ private static void assertRow(InternalRow resultRow, int... expected) {
assertThat(results).containsExactly(expected);
assertThat(resultRow.getFieldCount()).isEqualTo(expected.length);
}

private void writeAndCommit(FileStoreTable table, InternalRow... rows) throws Exception {
BatchWriteBuilder builder = table.newBatchWriteBuilder();
try (BatchTableWrite writer = builder.newWrite();
BatchTableCommit commiter = builder.newCommit()) {
for (InternalRow row : rows) {
writer.write(row, 0);
}
commiter.commit(writer.prepareCommit());
}
}
}

0 comments on commit 594e894

Please sign in to comment.