Skip to content

Commit

Permalink
[core] Fix keyRearrange in PrimaryKeyLookupTable
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Jan 10, 2024
1 parent b57dc35 commit af25cd2
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ public class FixedBucketFromPkExtractor implements KeyAndBucketExtractor<Interna

private transient InternalRow primaryKey;

private BinaryRow bucketKey;

private final boolean sameBucketKeyAndTrimmedPrimaryKey;

private final int numBuckets;
Expand Down Expand Up @@ -90,8 +88,7 @@ private BinaryRow bucketKey() {
return trimmedPrimaryKey();
}

bucketKey = bucketKeyProjection.apply(primaryKey);
return bucketKey;
return bucketKeyProjection.apply(primaryKey);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ static FullCacheLookupTable create(Context context, long lruCacheSize) throws IO
return new NoPrimaryKeyLookupTable(context, lruCacheSize);
} else {
if (new HashSet<>(primaryKeys).equals(new HashSet<>(context.joinKey))) {
return new PrimaryKeyLookupTable(context, lruCacheSize);
return new PrimaryKeyLookupTable(context, lruCacheSize, context.joinKey);
} else {
return new SecondaryIndexLookupTable(context, lruCacheSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBValueState;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.KeyProjectedRow;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.TypeUtils;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -40,11 +44,15 @@ public class PrimaryKeyLookupTable extends FullCacheLookupTable {

protected final KeyProjectedRow primaryKeyRow;

public PrimaryKeyLookupTable(Context context, long lruCacheSize) throws IOException {
@Nullable private final ProjectedRow keyRearrange;

public PrimaryKeyLookupTable(Context context, long lruCacheSize, List<String> joinKey)
throws IOException {
super(context);
List<String> fieldNames = projectedType.getFieldNames();
FileStoreTable table = context.table;
this.primaryKeyMapping =
context.table.primaryKeys().stream().mapToInt(fieldNames::indexOf).toArray();
table.primaryKeys().stream().mapToInt(fieldNames::indexOf).toArray();
this.primaryKeyRow = new KeyProjectedRow(primaryKeyMapping);

this.tableState =
Expand All @@ -54,10 +62,24 @@ public PrimaryKeyLookupTable(Context context, long lruCacheSize) throws IOExcept
TypeUtils.project(projectedType, primaryKeyMapping)),
InternalSerializers.create(projectedType),
lruCacheSize);

ProjectedRow keyRearrange = null;
if (!table.primaryKeys().equals(joinKey)) {
keyRearrange =
ProjectedRow.from(
table.primaryKeys().stream()
.map(joinKey::indexOf)
.mapToInt(value -> value)
.toArray());
}
this.keyRearrange = keyRearrange;
}

@Override
public List<InternalRow> innerGet(InternalRow key) throws IOException {
if (keyRearrange != null) {
key = keyRearrange.replaceRow(key);
}
InternalRow value = tableState.get(key);
return value == null ? Collections.emptyList() : Collections.singletonList(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class PrimaryKeyPartialLookupTable implements LookupTable {

private final TableFileMonitor fileMonitor;

private final ProjectedRow projectedKey;
@Nullable private final ProjectedRow keyRearrange;

public PrimaryKeyPartialLookupTable(
FileStoreTable table,
Expand All @@ -71,12 +71,17 @@ public PrimaryKeyPartialLookupTable(
.withIOManager(new IOManagerImpl(tempPath.toString()));
this.extractor = new FixedBucketFromPkExtractor(table.schema());
this.fileMonitor = new TableFileMonitor(table, predicate);
this.projectedKey =
ProjectedRow.from(
table.primaryKeys().stream()
.map(joinKey::indexOf)
.mapToInt(value -> value)
.toArray());

ProjectedRow keyRearrange = null;
if (!table.primaryKeys().equals(joinKey)) {
keyRearrange =
ProjectedRow.from(
table.primaryKeys().stream()
.map(joinKey::indexOf)
.mapToInt(value -> value)
.toArray());
}
this.keyRearrange = keyRearrange;
}

@Override
Expand All @@ -86,7 +91,9 @@ public void open() throws Exception {

@Override
public List<InternalRow> get(InternalRow key) throws IOException {
key = projectedKey.replaceRow(key);
if (keyRearrange != null) {
key = keyRearrange.replaceRow(key);
}
extractor.setRecord(key);
int bucket = extractor.bucket();
BinaryRow partition = extractor.partition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
private final KeyProjectedRow secKeyRow;

public SecondaryIndexLookupTable(Context context, long lruCacheSize) throws IOException {
super(context, lruCacheSize / 2);
super(context, lruCacheSize / 2, context.table.primaryKeys());
List<String> fieldNames = projectedType.getFieldNames();
int[] secKeyMapping = context.joinKey.stream().mapToInt(fieldNames::indexOf).toArray();
this.secKeyRow = new KeyProjectedRow(secKeyMapping);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,11 +746,13 @@ public void testAsyncRetryLookupSecKeyWithSequenceField() throws Exception {
iterator.close();
}

@Test
public void testPartialCacheBucketKeyOrder() throws Exception {
@ParameterizedTest
@EnumSource(LookupCacheMode.class)
public void testPartialCacheBucketKeyOrder(LookupCacheMode mode) throws Exception {
sql(
"CREATE TABLE DIM (k2 INT, k1 INT, j INT , i INT, PRIMARY KEY(i, j) NOT ENFORCED) WITH"
+ " ('continuous.discovery-interval'='1 ms', 'lookup.cache'='auto', 'bucket' = '2', 'bucket-key' = 'j')");
+ " ('continuous.discovery-interval'='1 ms', 'lookup.cache'='%s', 'bucket' = '2', 'bucket-key' = 'j')",
mode);

sql("CREATE TABLE T2 (j INT, i INT, `proctime` AS PROCTIME())");

Expand Down

0 comments on commit af25cd2

Please sign in to comment.