Skip to content

Commit

Permalink
[flink] Support nested projection pushdown (#4667)
Browse files Browse the repository at this point in the history
This closes #4667.
  • Loading branch information
yunfengzhou-hub authored Dec 20, 2024
1 parent 8ade15f commit 3a9bed2
Show file tree
Hide file tree
Showing 28 changed files with 865 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.paimon.utils.Filter;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -125,18 +124,6 @@ default ReadBuilder withFilter(List<Predicate> predicates) {
*/
ReadBuilder withProjection(int[] projection);

/** Apply projection to the reader, only support top level projection. */
@Deprecated
default ReadBuilder withProjection(int[][] projection) {
if (projection == null) {
return this;
}
if (Arrays.stream(projection).anyMatch(arr -> arr.length > 1)) {
throw new IllegalStateException("Not support nested projection");
}
return withProjection(Arrays.stream(projection).mapToInt(arr -> arr[0]).toArray());
}

/** the row number pushed down. */
ReadBuilder withLimit(int limit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,18 @@ private List<DataField> readDataFields(List<DataField> allDataFields) {
.filter(f -> f.id() == dataField.id())
.findFirst()
.ifPresent(
field ->
readDataFields.add(
dataField.newType(
pruneDataType(
field.type(), dataField.type()))));
field -> {
DataType prunedType =
pruneDataType(field.type(), dataField.type());
if (prunedType != null) {
readDataFields.add(dataField.newType(prunedType));
}
});
}
return readDataFields;
}

@Nullable
private DataType pruneDataType(DataType readType, DataType dataType) {
switch (readType.getTypeRoot()) {
case ROW:
Expand All @@ -261,25 +264,40 @@ private DataType pruneDataType(DataType readType, DataType dataType) {
for (DataField rf : r.getFields()) {
if (d.containsField(rf.id())) {
DataField df = d.getField(rf.id());
newFields.add(df.newType(pruneDataType(rf.type(), df.type())));
DataType newType = pruneDataType(rf.type(), df.type());
if (newType == null) {
continue;
}
newFields.add(df.newType(newType));
}
}
if (newFields.isEmpty()) {
// When all fields are pruned, we should not return an empty row type
return null;
}
return d.copy(newFields);
case MAP:
return ((MapType) dataType)
.newKeyValueType(
pruneDataType(
((MapType) readType).getKeyType(),
((MapType) dataType).getKeyType()),
pruneDataType(
((MapType) readType).getValueType(),
((MapType) dataType).getValueType()));
DataType keyType =
pruneDataType(
((MapType) readType).getKeyType(),
((MapType) dataType).getKeyType());
DataType valueType =
pruneDataType(
((MapType) readType).getValueType(),
((MapType) dataType).getValueType());
if (keyType == null || valueType == null) {
return null;
}
return ((MapType) dataType).newKeyValueType(keyType, valueType);
case ARRAY:
return ((ArrayType) dataType)
.newElementType(
pruneDataType(
((ArrayType) readType).getElementType(),
((ArrayType) dataType).getElementType()));
DataType elementType =
pruneDataType(
((ArrayType) readType).getElementType(),
((ArrayType) dataType).getElementType());
if (elementType == null) {
return null;
}
return ((ArrayType) dataType).newElementType(elementType);
default:
return dataType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void testTagIncremental() throws Exception {
GenericRow.of(fromString("+I"), 1, 6, 1));

// read tag1 tag3 projection
result = read(table, new int[][] {{1}}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
result = read(table, new int[] {1}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2), GenericRow.of(6));

assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG2,TAG1")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ protected List<InternalRow> read(Table table, Pair<ConfigOption<?>, String>... d

protected List<InternalRow> read(
Table table,
@Nullable int[][] projection,
@Nullable int[] projection,
Pair<ConfigOption<?>, String>... dynamicOptions)
throws Exception {
Map<String, String> options = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void before() throws Exception {

@Test
public void testBucketsTable() throws Exception {
assertThat(read(bucketsTable, new int[][] {{0}, {1}, {2}, {4}}))
assertThat(read(bucketsTable, new int[] {0, 1, 2, 4}))
.containsExactlyInAnyOrder(
GenericRow.of(BinaryString.fromString("[1]"), 0, 2L, 2L),
GenericRow.of(BinaryString.fromString("[2]"), 0, 2L, 2L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testPartitionRecordCount() throws Exception {
expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L));

// Only read partition and record count, record size may not stable.
List<InternalRow> result = read(partitionsTable, new int[][] {{0}, {1}});
List<InternalRow> result = read(partitionsTable, new int[] {0, 1});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}

Expand All @@ -105,7 +105,7 @@ public void testPartitionTimeTravel() throws Exception {
read(
partitionsTable.copy(
Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), "1")),
new int[][] {{0}, {1}});
new int[] {0, 1});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}

Expand All @@ -117,7 +117,7 @@ public void testPartitionValue() throws Exception {
expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 1L, 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L, 1L));

List<InternalRow> result = read(partitionsTable, new int[][] {{0}, {1}, {3}});
List<InternalRow> result = read(partitionsTable, new int[] {0, 1, 3});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}
}
Loading

0 comments on commit 3a9bed2

Please sign in to comment.