Skip to content

Commit

Permalink
[flink] Add test to cover bucket extractor in PrimaryKeyPartialLookup…
Browse files Browse the repository at this point in the history
…Table
  • Loading branch information
Aitozi authored and JingsongLi committed Jan 10, 2024
1 parent b13d99f commit b57dc35
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ private void open() throws Exception {
&& new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) {
try {
this.lookupTable =
new PrimaryKeyPartialLookupTable(storeTable, predicate, projection, path);
new PrimaryKeyPartialLookupTable(
storeTable, predicate, projection, path, joinKeys);
} catch (UnsupportedOperationException ignore) {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ private BinaryRow bucketKey() {
return trimmedPrimaryKey();
}

if (bucketKey == null) {
bucketKey = bucketKeyProjection.apply(primaryKey);
}
bucketKey = bucketKeyProjection.apply(primaryKey);
return bucketKey;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.Projection;

import javax.annotation.Nullable;
Expand All @@ -46,8 +47,14 @@ public class PrimaryKeyPartialLookupTable implements LookupTable {

private final TableFileMonitor fileMonitor;

private final ProjectedRow projectedKey;

public PrimaryKeyPartialLookupTable(
FileStoreTable table, @Nullable Predicate predicate, int[] projection, File tempPath) {
FileStoreTable table,
@Nullable Predicate predicate,
int[] projection,
File tempPath,
List<String> joinKey) {
if (table.partitionKeys().size() > 0) {
throw new UnsupportedOperationException(
"The partitioned table are not supported in partial cache mode.");
Expand All @@ -64,6 +71,12 @@ public PrimaryKeyPartialLookupTable(
.withIOManager(new IOManagerImpl(tempPath.toString()));
this.extractor = new FixedBucketFromPkExtractor(table.schema());
this.fileMonitor = new TableFileMonitor(table, predicate);
this.projectedKey =
ProjectedRow.from(
table.primaryKeys().stream()
.map(joinKey::indexOf)
.mapToInt(value -> value)
.toArray());
}

@Override
Expand All @@ -73,6 +86,7 @@ public void open() throws Exception {

@Override
public List<InternalRow> get(InternalRow key) throws IOException {
key = projectedKey.replaceRow(key);
extractor.setRecord(key);
int bucket = extractor.bucket();
BinaryRow partition = extractor.partition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,4 +745,40 @@ public void testAsyncRetryLookupSecKeyWithSequenceField() throws Exception {

iterator.close();
}

@Test
public void testPartialCacheBucketKeyOrder() throws Exception {
sql(
"CREATE TABLE DIM (k2 INT, k1 INT, j INT , i INT, PRIMARY KEY(i, j) NOT ENFORCED) WITH"
+ " ('continuous.discovery-interval'='1 ms', 'lookup.cache'='auto', 'bucket' = '2', 'bucket-key' = 'j')");

sql("CREATE TABLE T2 (j INT, i INT, `proctime` AS PROCTIME())");

sql("INSERT INTO DIM VALUES (1111, 111, 11, 1), (2222, 222, 22, 2)");

String query =
"SELECT T2.i, D.j, D.k1, D.k2 FROM T2 LEFT JOIN DIM for system_time as of T2.proctime AS D ON T2.i = D.i and T2.j = D.j";
BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());

sql("INSERT INTO T2 VALUES (11, 1), (22, 2), (33, 3)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, 11, 111, 1111),
Row.of(2, 22, 222, 2222),
Row.of(3, null, null, null));

sql("INSERT INTO DIM VALUES (2222, 222, 11, 1), (3333, 333, 33, 3)");
Thread.sleep(2000); // wait refresh
sql("INSERT INTO T2 VALUES (11, 1), (22, 2), (33, 3), (44, 4)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, 11, 222, 2222),
Row.of(2, 22, 222, 2222),
Row.of(3, 33, 333, 3333),
Row.of(4, null, null, null));

iterator.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;

import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -472,7 +473,11 @@ public void testPartialLookupTable() throws Exception {
FileStoreTable dimTable = createDimTable();
PrimaryKeyPartialLookupTable table =
new PrimaryKeyPartialLookupTable(
dimTable, null, new int[] {0, 1, 2}, tempDir.toFile());
dimTable,
null,
new int[] {0, 1, 2},
tempDir.toFile(),
ImmutableList.of("pk1", "pk2"));
List<InternalRow> result = table.get(row(1, -1));
assertThat(result).hasSize(0);

Expand All @@ -499,7 +504,11 @@ public void testPartialLookupTableWithProjection() throws Exception {
FileStoreTable dimTable = createDimTable();
PrimaryKeyPartialLookupTable table =
new PrimaryKeyPartialLookupTable(
dimTable, null, new int[] {2, 1}, tempDir.toFile());
dimTable,
null,
new int[] {2, 1},
tempDir.toFile(),
ImmutableList.of("pk1", "pk2"));
List<InternalRow> result = table.get(row(1, -1));
assertThat(result).hasSize(0);

Expand All @@ -516,20 +525,45 @@ public void testPartialLookupTableWithProjection() throws Exception {
assertRow(result.get(0), 22, -2);
}

@Test
public void testPartialLookupTableJoinKeyOrder() throws Exception {
FileStoreTable dimTable = createDimTable();
PrimaryKeyPartialLookupTable table =
new PrimaryKeyPartialLookupTable(
dimTable,
null,
new int[] {2, 1},
tempDir.toFile(),
ImmutableList.of("pk2", "pk1"));
List<InternalRow> result = table.get(row(-1, 1));
assertThat(result).hasSize(0);

write(dimTable, ioManager, GenericRow.of(1, -1, 11), GenericRow.of(2, -2, 22));
result = table.get(row(-1, 1));
assertThat(result).hasSize(0);

table.refresh();
result = table.get(row(-1, 1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 11, -1);
result = table.get(row(-2, 2));
assertThat(result).hasSize(1);
assertRow(result.get(0), 22, -2);
}

private FileStoreTable createDimTable() throws Exception {
FileIO fileIO = LocalFileIO.create();
org.apache.paimon.fs.Path tablePath =
new org.apache.paimon.fs.Path(
String.format("%s/%s.db/%s", warehouse, database, "T"));
Schema schema =
Schema.newBuilder()
.column("pk", DataTypes.INT())
.column("col1", DataTypes.INT())
.column("pk1", DataTypes.INT())
.column("pk2", DataTypes.INT())
.column("col2", DataTypes.INT())
.primaryKey("pk", "col1")
.option(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup")
.primaryKey("pk1", "pk2")
.option(CoreOptions.BUCKET.key(), "2")
.option(CoreOptions.BUCKET_KEY.key(), "col1")
.option(CoreOptions.BUCKET_KEY.key(), "pk2")
.build();
TableSchema tableSchema =
SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), schema);
Expand Down

0 comments on commit b57dc35

Please sign in to comment.