Skip to content

Commit

Permalink
[flink] Fix lookup IndexOutOfBounds for sequence defined with project…
Browse files Browse the repository at this point in the history
…ion (#3527)
  • Loading branch information
JingsongLi authored Jun 17, 2024
1 parent 9c41f35 commit 61d1945
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ public int getFieldIndex(String fieldName) {
return -1;
}

public int[] getFieldIndices(List<String> projectFields) {
List<String> fieldNames = getFieldNames();
int[] projection = new int[projectFields.size()];
for (int i = 0; i < projection.length; i++) {
projection[i] = fieldNames.indexOf(projectFields.get(i));
}
return projection;
}

public boolean containsField(String fieldName) {
for (DataField field : fields) {
if (field.name().equals(fieldName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public abstract class FullCacheLookupTable implements LookupTable {
private Predicate specificPartition;

public FullCacheLookupTable(Context context) {
this.context = context;
this.table = context.table;
List<String> sequenceFields = new ArrayList<>();
if (table.primaryKeys().size() > 0) {
Expand All @@ -106,6 +105,7 @@ public FullCacheLookupTable(Context context) {
builder.field(f.name(), f.type());
});
projectedType = builder.build();
context = context.copy(table.rowType().getFieldIndices(projectedType.getFieldNames()));
this.userDefinedSeqComparator =
UserDefinedSeqComparator.create(projectedType, sequenceFields);
this.appendUdsFieldNumber = appendUdsFieldNumber.get();
Expand All @@ -114,6 +114,8 @@ public FullCacheLookupTable(Context context) {
this.appendUdsFieldNumber = 0;
}

this.context = context;

Options options = Options.fromMap(context.table.options());
this.projectedType = projectedType;
this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC);
Expand Down Expand Up @@ -354,5 +356,16 @@ public Context(
this.joinKey = joinKey;
this.requiredCachedBucketIds = requiredCachedBucketIds;
}

public Context copy(int[] newProjection) {
return new Context(
table,
newProjection,
tablePredicate,
projectedPredicate,
tempPath,
joinKey,
requiredCachedBucketIds);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,46 @@ public void testPkTableWithSequenceField() throws Exception {
assertRow(result.get(0), 1, 22, 222);
}

@Test
public void testPkTableWithSequenceFieldProjection() throws Exception {
Options options = new Options();
options.set(CoreOptions.SEQUENCE_FIELD, "f2");
options.set(CoreOptions.BUCKET, 1);
FileStoreTable storeTable = createTable(singletonList("f0"), options);
FullCacheLookupTable.Context context =
new FullCacheLookupTable.Context(
storeTable,
new int[] {0, 1},
null,
null,
tempDir.toFile(),
singletonList("f0"),
null);
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
table.open();

// first write
write(storeTable, GenericRow.of(1, 11, 111));
table.refresh();
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 11);

// second write
write(storeTable, GenericRow.of(1, 22, 222));
table.refresh();
result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22);

// not update
write(storeTable, GenericRow.of(1, 33, 111));
table.refresh();
result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22);
}

@Test
public void testPkTablePkFilter() throws Exception {
FileStoreTable storeTable = createTable(singletonList("f0"), new Options());
Expand Down

0 comments on commit 61d1945

Please sign in to comment.