Skip to content

Commit

Permalink
PrimaryKeyPartialLookupTable support cache row filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Sxnan committed Oct 12, 2024
1 parent 594e894 commit d22e830
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.paimon.utils.BulkFormatMapping;
import org.apache.paimon.utils.BulkFormatMapping.BulkFormatMappingBuilder;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -102,11 +103,23 @@ public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws IOExc

public RecordReader<KeyValue> createRecordReader(
long schemaId, String fileName, long fileSize, int level) throws IOException {
return createRecordReader(schemaId, fileName, fileSize, level, null);
}

public RecordReader<KeyValue> createRecordReader(
long schemaId,
String fileName,
long fileSize,
int level,
@Nullable Filter<InternalRow> valueFilter)
throws IOException {
if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) {
return new AsyncRecordReader<>(
() -> createRecordReader(schemaId, fileName, level, false, 2, fileSize));
() ->
createRecordReader(
schemaId, fileName, level, false, 2, fileSize, valueFilter));
}
return createRecordReader(schemaId, fileName, level, true, null, fileSize);
return createRecordReader(schemaId, fileName, level, true, null, fileSize, valueFilter);
}

private RecordReader<KeyValue> createRecordReader(
Expand All @@ -115,7 +128,8 @@ private RecordReader<KeyValue> createRecordReader(
int level,
boolean reuseFormat,
@Nullable Integer orcPoolSize,
long fileSize)
long fileSize,
@Nullable Filter<InternalRow> valueFilter)
throws IOException {
String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);

Expand Down Expand Up @@ -151,7 +165,15 @@ private RecordReader<KeyValue> createRecordReader(
new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get());
}

return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level);
RecordReader<KeyValue> keyValueDataFileRecordReader =
new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level);
if (valueFilter != null) {
keyValueDataFileRecordReader =
keyValueDataFileRecordReader.filter(
keyValue -> valueFilter.test(keyValue.value()));
}

return keyValueDataFileRecordReader;
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;

Expand Down Expand Up @@ -79,6 +80,8 @@ public class LocalTableQuery implements TableQuery {
private final RowType rowType;
private final RowType partitionType;

@Nullable private Filter<InternalRow> cacheRowFilter;

public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
this.tableView = new HashMap<>();
Expand Down Expand Up @@ -159,7 +162,8 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
file.schemaId(),
file.fileName(),
file.fileSize(),
file.level()),
file.level(),
cacheRowFilter),
file ->
Preconditions.checkNotNull(ioManager, "IOManager is required.")
.createChannel(
Expand Down Expand Up @@ -206,6 +210,11 @@ public LocalTableQuery withIOManager(IOManager ioManager) {
return this;
}

public LocalTableQuery withCacheRowFilter(Filter<InternalRow> cacheRowFilter) {
this.cacheRowFilter = cacheRowFilter;
return this;
}

@Override
public InternalRowSerializer createValueSerializer() {
return InternalSerializers.create(readerFactoryBuilder.readValueType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public static class Data {
public final DataFileMeta meta;
public final List<KeyValue> content;

private Data(BinaryRow partition, int bucket, DataFileMeta meta, List<KeyValue> content) {
Data(BinaryRow partition, int bucket, DataFileMeta meta, List<KeyValue> content) {
this.partition = partition;
this.bucket = bucket;
this.meta = meta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FlushingFileFormat;
Expand All @@ -41,6 +42,7 @@
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;

import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
Expand All @@ -56,6 +58,7 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
import static org.apache.paimon.TestKeyValueGenerator.KEY_TYPE;
Expand Down Expand Up @@ -219,6 +222,56 @@ public void testReadValueType() throws Exception {
kv.value().getInt(1))));
}

@Test
public void testReadValueTypeWithRowFilter() throws Exception {
DataFileTestDataGenerator.Data data = gen.next();
KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), "avro");
DataFileMetaSerializer serializer = new DataFileMetaSerializer();

RollingFileWriter<KeyValue, DataFileMeta> writer =
writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
writer.write(CloseableIterator.fromList(data.content, kv -> {}));
writer.close();
List<DataFileMeta> actualMetas = writer.result();

// projection:
// (dt, hr, shopId, orderId, itemId, priceAmount, comment) ->
// (shopId, itemId, dt, hr)
RowType readValueType = DEFAULT_ROW_TYPE.project("shopId", "itemId", "dt", "hr");
KeyValueFileReaderFactory readerFactory =
createReaderFactory(tempDir.toString(), "avro", null, readValueType);
InternalRowSerializer projectedValueSerializer = new InternalRowSerializer(readValueType);

List<KeyValue> expectedValues =
data.content.stream()
.filter(kv -> kv.key().getInt(0) % 2 == 0)
.collect(Collectors.toList());
DataFileTestDataGenerator.Data expectedData =
new DataFileTestDataGenerator.Data(
data.partition, data.bucket, data.meta, expectedValues);
assertData(
expectedData,
actualMetas,
TestKeyValueGenerator.KEY_SERIALIZER,
projectedValueSerializer,
serializer,
readerFactory,
kv ->
new KeyValue()
.replace(
kv.key(),
kv.sequenceNumber(),
kv.valueKind(),
GenericRow.of(
kv.value().getInt(2),
kv.value().isNullAt(4)
? null
: kv.value().getLong(4),
kv.value().getString(0),
kv.value().getInt(1))),
internalRow -> internalRow.getInt(0) % 2 == 0);
}

protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String format) {
Path path = new Path(pathStr);
FileStorePathFactory pathFactory =
Expand Down Expand Up @@ -294,6 +347,27 @@ private void assertData(
KeyValueFileReaderFactory readerFactory,
Function<KeyValue, KeyValue> toExpectedKv)
throws Exception {
assertData(
data,
actualMetas,
keySerializer,
projectedValueSerializer,
dataFileMetaSerializer,
readerFactory,
toExpectedKv,
null);
}

private void assertData(
DataFileTestDataGenerator.Data data,
List<DataFileMeta> actualMetas,
InternalRowSerializer keySerializer,
InternalRowSerializer projectedValueSerializer,
DataFileMetaSerializer dataFileMetaSerializer,
KeyValueFileReaderFactory readerFactory,
Function<KeyValue, KeyValue> toExpectedKv,
Filter<InternalRow> rowFilter)
throws Exception {
Iterator<KeyValue> expectedIterator = data.content.iterator();
for (DataFileMeta meta : actualMetas) {
// check the contents of data file
Expand All @@ -303,7 +377,8 @@ private void assertData(
meta.schemaId(),
meta.fileName(),
meta.fileSize(),
meta.level()));
meta.level(),
rowFilter));
while (actualKvsIterator.hasNext()) {
assertThat(expectedIterator.hasNext()).isTrue();
KeyValue actualKv = actualKvsIterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ProjectedRow;

import javax.annotation.Nullable;
Expand All @@ -41,23 +42,21 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Function;

/** Lookup table for primary key which supports to read the LSM tree directly. */
public class PrimaryKeyPartialLookupTable implements LookupTable {

private final Function<Predicate, QueryExecutor> executorFactory;
private final QueryExecutorFactory executorFactory;
private final FixedBucketFromPkExtractor extractor;
@Nullable private final ProjectedRow keyRearrange;
@Nullable private final ProjectedRow trimmedKeyRearrange;

private Predicate specificPartition;
@Nullable private Filter<InternalRow> cacheRowFilter;
private QueryExecutor queryExecutor;

private PrimaryKeyPartialLookupTable(
Function<Predicate, QueryExecutor> executorFactory,
FileStoreTable table,
List<String> joinKey) {
QueryExecutorFactory executorFactory, FileStoreTable table, List<String> joinKey) {
this.executorFactory = executorFactory;

if (table.bucketMode() != BucketMode.HASH_FIXED) {
Expand Down Expand Up @@ -103,7 +102,7 @@ public void specificPartitionFilter(Predicate filter) {

@Override
public void open() throws Exception {
this.queryExecutor = executorFactory.apply(specificPartition);
this.queryExecutor = executorFactory.create(specificPartition, cacheRowFilter);
refresh();
}

Expand Down Expand Up @@ -135,6 +134,11 @@ public void refresh() {
queryExecutor.refresh();
}

@Override
public void specifyCacheRowFilter(Filter<InternalRow> filter) {
this.cacheRowFilter = filter;
}

@Override
public void close() throws IOException {
if (queryExecutor != null) {
Expand All @@ -149,21 +153,28 @@ public static PrimaryKeyPartialLookupTable createLocalTable(
List<String> joinKey,
Set<Integer> requireCachedBucketIds) {
return new PrimaryKeyPartialLookupTable(
filter ->
(filter, cacheRowFilter) ->
new LocalQueryExecutor(
new LookupFileStoreTable(table, joinKey),
projection,
tempPath,
filter,
requireCachedBucketIds),
requireCachedBucketIds,
cacheRowFilter),
table,
joinKey);
}

public static PrimaryKeyPartialLookupTable createRemoteTable(
FileStoreTable table, int[] projection, List<String> joinKey) {
return new PrimaryKeyPartialLookupTable(
filter -> new RemoteQueryExecutor(table, projection), table, joinKey);
(filter, cacheRowFilter) -> new RemoteQueryExecutor(table, projection),
table,
joinKey);
}

interface QueryExecutorFactory {
QueryExecutor create(Predicate filter, @Nullable Filter<InternalRow> cacheRowFilter);
}

interface QueryExecutor extends Closeable {
Expand All @@ -183,12 +194,17 @@ private LocalQueryExecutor(
int[] projection,
File tempPath,
@Nullable Predicate filter,
Set<Integer> requireCachedBucketIds) {
Set<Integer> requireCachedBucketIds,
@Nullable Filter<InternalRow> cacheRowFilter) {
this.tableQuery =
table.newLocalTableQuery()
.withValueProjection(projection)
.withIOManager(new IOManagerImpl(tempPath.toString()));

if (cacheRowFilter != null) {
this.tableQuery.withCacheRowFilter(cacheRowFilter);
}

this.scan =
table.newReadBuilder()
.withFilter(filter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,28 @@ public void testPartialLookupTable() throws Exception {
assertThat(result).hasSize(0);
}

@Test
public void testPartialLookupTableWithRowFilter() throws Exception {

Options options = new Options();
options.set(CoreOptions.BUCKET.key(), "2");
options.set(CoreOptions.BUCKET_KEY.key(), "f0");
FileStoreTable dimTable = createTable(singletonList("f0"), options);
write(dimTable, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222));

PrimaryKeyPartialLookupTable table =
PrimaryKeyPartialLookupTable.createLocalTable(
dimTable, new int[] {0, 2}, tempDir.toFile(), ImmutableList.of("f0"), null);
table.specifyCacheRowFilter(row -> row.getInt(0) < 2);
table.open();

List<InternalRow> result = table.get(row(1, 11));
assertThat(result).hasSize(1);

result = table.get(row(2, 22));
assertThat(result).isEmpty();
}

@Test
public void testPartialLookupTableWithProjection() throws Exception {
FileStoreTable dimTable = createDimTable();
Expand Down Expand Up @@ -989,15 +1011,4 @@ private static void assertRow(InternalRow resultRow, int... expected) {
assertThat(results).containsExactly(expected);
assertThat(resultRow.getFieldCount()).isEqualTo(expected.length);
}

private void writeAndCommit(FileStoreTable table, InternalRow... rows) throws Exception {
BatchWriteBuilder builder = table.newBatchWriteBuilder();
try (BatchTableWrite writer = builder.newWrite();
BatchTableCommit commiter = builder.newCommit()) {
for (InternalRow row : rows) {
writer.write(row, 0);
}
commiter.commit(writer.prepareCommit());
}
}
}

0 comments on commit d22e830

Please sign in to comment.