diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java index 0c1386ce441d..d12de1211bfd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java @@ -27,7 +27,6 @@ import org.apache.paimon.utils.Filter; import java.io.Serializable; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -125,18 +124,6 @@ default ReadBuilder withFilter(List predicates) { */ ReadBuilder withProjection(int[] projection); - /** Apply projection to the reader, only support top level projection. */ - @Deprecated - default ReadBuilder withProjection(int[][] projection) { - if (projection == null) { - return this; - } - if (Arrays.stream(projection).anyMatch(arr -> arr.length > 1)) { - throw new IllegalStateException("Not support nested projection"); - } - return withProjection(Arrays.stream(projection).mapToInt(arr -> arr[0]).toArray()); - } - /** the row number pushed down. */ ReadBuilder withLimit(int limit); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java index 00554b233c59..8de9bf8508fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java @@ -243,15 +243,18 @@ private List readDataFields(List allDataFields) { .filter(f -> f.id() == dataField.id()) .findFirst() .ifPresent( - field -> - readDataFields.add( - dataField.newType( - pruneDataType( - field.type(), dataField.type())))); + field -> { + DataType prunedType = + pruneDataType(field.type(), dataField.type()); + if (prunedType != null) { + readDataFields.add(dataField.newType(prunedType)); + } + }); } return readDataFields; } + @Nullable private DataType pruneDataType(DataType readType, DataType dataType) { switch (readType.getTypeRoot()) { case ROW: @@ -261,25 +264,40 @@ private DataType pruneDataType(DataType readType, DataType dataType) { for (DataField rf : r.getFields()) { if (d.containsField(rf.id())) { DataField df = d.getField(rf.id()); - newFields.add(df.newType(pruneDataType(rf.type(), df.type()))); + DataType newType = pruneDataType(rf.type(), df.type()); + if (newType == null) { + continue; + } + newFields.add(df.newType(newType)); } } + if (newFields.isEmpty()) { + // When all fields are pruned, we should not return an empty row type + return null; + } return d.copy(newFields); case MAP: - return ((MapType) dataType) - .newKeyValueType( - pruneDataType( - ((MapType) readType).getKeyType(), - ((MapType) dataType).getKeyType()), - pruneDataType( - ((MapType) readType).getValueType(), - ((MapType) dataType).getValueType())); + DataType keyType = + pruneDataType( + ((MapType) readType).getKeyType(), + ((MapType) dataType).getKeyType()); + DataType valueType = + pruneDataType( + ((MapType) readType).getValueType(), + ((MapType) dataType).getValueType()); + if (keyType == null || valueType == null) { + return null; + } + return ((MapType) dataType).newKeyValueType(keyType, valueType); case ARRAY: - return ((ArrayType) dataType) - .newElementType( - pruneDataType( - ((ArrayType) readType).getElementType(), - ((ArrayType) dataType).getElementType())); + DataType elementType = + pruneDataType( + ((ArrayType) readType).getElementType(), + ((ArrayType) dataType).getElementType()); + if (elementType == null) { + return null; + } + return ((ArrayType) dataType).newElementType(elementType); default: return dataType; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java index b4b905d36453..43fbe2b6460a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java @@ -271,7 +271,7 @@ public void testTagIncremental() throws Exception { GenericRow.of(fromString("+I"), 1, 6, 1)); // read tag1 tag3 projection - result = read(table, new int[][] {{1}}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); + result = read(table, new int[] {1}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2), GenericRow.of(6)); assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG2,TAG1"))) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index 7d7617cf8bd1..aedf5553e020 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -171,7 +171,7 @@ protected List read(Table table, Pair, String>... d protected List read( Table table, - @Nullable int[][] projection, + @Nullable int[] projection, Pair, String>... dynamicOptions) throws Exception { Map options = new HashMap<>(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java index 57cb6605a922..b6bd71087412 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java @@ -72,7 +72,7 @@ public void before() throws Exception { @Test public void testBucketsTable() throws Exception { - assertThat(read(bucketsTable, new int[][] {{0}, {1}, {2}, {4}})) + assertThat(read(bucketsTable, new int[] {0, 1, 2, 4})) .containsExactlyInAnyOrder( GenericRow.of(BinaryString.fromString("[1]"), 0, 2L, 2L), GenericRow.of(BinaryString.fromString("[2]"), 0, 2L, 2L)); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java index 8d12dc707bf5..74d9a5eb7b1f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java @@ -90,7 +90,7 @@ public void testPartitionRecordCount() throws Exception { expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L)); // Only read partition and record count, record size may not stable. - List result = read(partitionsTable, new int[][] {{0}, {1}}); + List result = read(partitionsTable, new int[] {0, 1}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } @@ -105,7 +105,7 @@ public void testPartitionTimeTravel() throws Exception { read( partitionsTable.copy( Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), "1")), - new int[][] {{0}, {1}}); + new int[] {0, 1}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } @@ -117,7 +117,7 @@ public void testPartitionValue() throws Exception { expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 1L, 1L)); expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L, 1L)); - List result = read(partitionsTable, new int[][] {{0}, {1}, {3}}); + List result = read(partitionsTable, new int[] {0, 1, 3}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java new file mode 100644 index 000000000000..810cc1ae4218 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.function.BiFunction; + +/** + * A {@link RowData} that provides a mapping view of the original {@link RowData} according to + * projection information. Compared with {@link ProjectedRowData}, this class supports nested + * projection. + */ +public class NestedProjectedRowData implements RowData, Serializable { + private static final long serialVersionUID = 1L; + + private final RowType producedDataType; + private final int[][] projectedFields; + private final int[] lastProjectedFields; + + private final Object[] cachedFields; + private final boolean[] isFieldsCached; + + private final boolean[] cachedNullAt; + private final boolean[] isNullAtCached; + + private transient RowData row; + + NestedProjectedRowData(RowType producedDataType, int[][] projectedFields) { + this.producedDataType = producedDataType; + this.projectedFields = projectedFields; + this.lastProjectedFields = new int[projectedFields.length]; + for (int i = 0; i < projectedFields.length; i++) { + this.lastProjectedFields[i] = projectedFields[i][projectedFields[i].length - 1]; + } + + this.cachedFields = new Object[projectedFields.length]; + this.isFieldsCached = new boolean[projectedFields.length]; + this.cachedNullAt = new boolean[projectedFields.length]; + this.isNullAtCached = new boolean[projectedFields.length]; + } + + public NestedProjectedRowData replaceRow(RowData row) { + this.row = row; + Arrays.fill(isFieldsCached, false); + Arrays.fill(isNullAtCached, false); + return this; + } + + public static @Nullable NestedProjectedRowData copy(@Nullable NestedProjectedRowData rowData) { + if (rowData == null) { + return null; + } + return new NestedProjectedRowData(rowData.producedDataType, rowData.projectedFields); + } + + @Override + public int getArity() { + return projectedFields.length; + } + + @Override + public RowKind getRowKind() { + return row.getRowKind(); + } + + @Override + public void setRowKind(RowKind rowKind) { + row.setRowKind(rowKind); + } + + @Override + public boolean isNullAt(int pos) { + if (isNullAtCached[pos]) { + return cachedNullAt[pos]; + } + + RowData rowData = extractInternalRow(pos); + boolean result; + if (rowData == null) { + result = true; + } else { + result = rowData.isNullAt(lastProjectedFields[pos]); + } + + isNullAtCached[pos] = true; + cachedNullAt[pos] = result; + + return result; + } + + @Override + public boolean getBoolean(int pos) { + return getFieldAs(pos, RowData::getBoolean); + } + + @Override + public byte getByte(int pos) { + return getFieldAs(pos, RowData::getByte); + } + + @Override + public short getShort(int pos) { + return getFieldAs(pos, RowData::getShort); + } + + @Override + public int getInt(int pos) { + return getFieldAs(pos, RowData::getInt); + } + + @Override + public long getLong(int pos) { + return getFieldAs(pos, RowData::getLong); + } + + @Override + public float getFloat(int pos) { + return getFieldAs(pos, RowData::getFloat); + } + + @Override + public double getDouble(int pos) { + return getFieldAs(pos, RowData::getDouble); + } + + @Override + public StringData getString(int pos) { + return getFieldAs(pos, RowData::getString); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return getFieldAs( + pos, (rowData, internalPos) -> rowData.getDecimal(internalPos, precision, scale)); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + return getFieldAs( + pos, (rowData, internalPos) -> rowData.getTimestamp(internalPos, precision)); + } + + @Override + public RawValueData getRawValue(int pos) { + return getFieldAs(pos, RowData::getRawValue); + } + + @Override + public byte[] getBinary(int pos) { + return getFieldAs(pos, RowData::getBinary); + } + + @Override + public ArrayData getArray(int pos) { + return getFieldAs(pos, RowData::getArray); + } + + @Override + public MapData getMap(int pos) { + return getFieldAs(pos, RowData::getMap); + } + + @Override + public RowData getRow(int pos, int numFields) { + return getFieldAs(pos, (rowData, internalPos) -> rowData.getRow(internalPos, numFields)); + } + + private @Nullable RowData extractInternalRow(int pos) { + int[] projectedField = projectedFields[pos]; + RowData rowData = this.row; + RowType dataType = producedDataType; + for (int i = 0; i < projectedField.length - 1; i++) { + dataType = (RowType) dataType.getTypeAt(projectedField[i]); + if (rowData.isNullAt(projectedField[i])) { + return null; + } + rowData = rowData.getRow(projectedField[i], dataType.getFieldCount()); + } + return rowData; + } + + @SuppressWarnings("unchecked") + private T getFieldAs(int pos, BiFunction getter) { + if (isFieldsCached[pos]) { + return (T) cachedFields[pos]; + } + + RowData rowData = extractInternalRow(pos); + T result; + if (rowData == null) { + isNullAtCached[pos] = true; + cachedNullAt[pos] = true; + isFieldsCached[pos] = true; + cachedFields[pos] = null; + result = null; + } else { + result = getter.apply(rowData, lastProjectedFields[pos]); + isNullAtCached[pos] = true; + cachedNullAt[pos] = result == null; + isFieldsCached[pos] = true; + cachedFields[pos] = result; + } + + return result; + } + + @VisibleForTesting + public int[][] getProjectedFields() { + return projectedFields; + } + + @VisibleForTesting + public RowType getRowType() { + return producedDataType; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java index 521d8a01311a..a6b99b132f01 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java @@ -18,15 +18,22 @@ package org.apache.paimon.flink; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DataTypeVisitor; + import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; + /** * {@link Projection} represents a list of (possibly nested) indexes that can be used to project * data types. A row projection includes both reducing the accessible fields and reordering them. @@ -38,6 +45,12 @@ private Projection() {} public abstract RowType project(RowType logicalType); + public abstract org.apache.paimon.types.RowType project( + org.apache.paimon.types.RowType rowType); + + public abstract NestedProjectedRowData getOuterProjectRow( + org.apache.paimon.types.RowType rowType); + /** @return {@code true} whether this projection is nested or not. */ public abstract boolean isNested(); @@ -132,6 +145,16 @@ public RowType project(RowType dataType) { return new NestedProjection(toNestedIndexes()).project(dataType); } + @Override + public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).project(rowType); + } + + @Override + public NestedProjectedRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).getOuterProjectRow(rowType); + } + @Override public boolean isNested() { return false; @@ -184,6 +207,71 @@ public RowType project(RowType rowType) { return new RowType(rowType.isNullable(), updatedFields); } + @Override + public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) { + if (!nested) { + return rowType.project( + Arrays.stream(this.projection).mapToInt(x -> x[0]).toArray()); + } + + MutableRowType result = + new MutableRowType(rowType.isNullable(), Collections.emptyList()); + for (int[] indexPath : this.projection) { + org.apache.paimon.types.RowType sourceType = rowType; + MutableRowType targetType = result; + int index; + for (index = 0; index < indexPath.length - 1; index++) { + String fieldName = sourceType.getFieldNames().get(indexPath[index]); + DataField field = sourceType.getField(fieldName); + sourceType = (org.apache.paimon.types.RowType) field.type(); + if (!targetType.containsField(fieldName)) { + targetType.appendDataField( + fieldName, + field.id(), + new MutableRowType( + sourceType.isNullable(), Collections.emptyList()), + field.description()); + } + targetType = (MutableRowType) targetType.getField(fieldName).type(); + } + + String fieldName = sourceType.getFieldNames().get(indexPath[index]); + DataField field = sourceType.getField(fieldName); + targetType.appendDataField( + fieldName, field.id(), field.type(), field.description()); + } + return result.toRowType(); + } + + @Override + public NestedProjectedRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { + if (!nested) { + return null; + } + + org.apache.paimon.types.RowType resultType = project(rowType); + + int[][] resultIndices = new int[this.projection.length][]; + for (int i = 0; i < this.projection.length; i++) { + org.apache.paimon.types.RowType sourceType = rowType; + org.apache.paimon.types.RowType targetType = resultType; + resultIndices[i] = new int[this.projection[i].length]; + for (int j = 0; j < this.projection[i].length; j++) { + DataField sourceField = sourceType.getFields().get(this.projection[i][j]); + String fieldName = sourceField.name(); + resultIndices[i][j] = targetType.getFieldIndex(fieldName); + if (j < this.projection[i].length - 1) { + targetType = + (org.apache.paimon.types.RowType) + targetType.getField(fieldName).type(); + sourceType = (org.apache.paimon.types.RowType) sourceField.type(); + } + } + } + + return new NestedProjectedRowData(toLogicalType(resultType), resultIndices); + } + @Override public boolean isNested() { return nested; @@ -217,6 +305,16 @@ public RowType project(RowType dataType) { return new NestedProjection(toNestedIndexes()).project(dataType); } + @Override + public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).project(rowType); + } + + @Override + public NestedProjectedRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).getOuterProjectRow(rowType); + } + @Override public boolean isNested() { return false; @@ -232,4 +330,103 @@ public int[][] toNestedIndexes() { return Arrays.stream(projection).mapToObj(i -> new int[] {i}).toArray(int[][]::new); } } + + /** + * A mutable version of {@link org.apache.paimon.types.RowType} to facilitate the building + * process of projections. + * + *

It is mutable in aspect of the {@link #appendDataField} method. + */ + private static class MutableRowType extends org.apache.paimon.types.DataType { + private final List fields; + private final boolean isNullable; + + private MutableRowType(org.apache.paimon.types.RowType rowType) { + this(rowType.isNullable(), rowType.getFields()); + } + + private MutableRowType(boolean isNullable, List fields) { + super(isNullable, DataTypeRoot.ROW); + this.fields = new ArrayList<>(fields); + this.isNullable = isNullable; + } + + private org.apache.paimon.types.RowType toRowType() { + for (int i = 0; i < fields.size(); i++) { + DataField field = fields.get(i); + if (field.type() instanceof MutableRowType) { + fields.set( + i, + new DataField( + field.id(), + field.name(), + ((MutableRowType) field.type()).toRowType(), + field.description())); + } + } + return new org.apache.paimon.types.RowType(isNullable, fields); + } + + private boolean containsField(String fieldName) { + for (DataField field : fields) { + if (field.name().equals(fieldName)) { + return true; + } + } + return false; + } + + private DataField getField(String fieldName) { + for (DataField field : fields) { + if (field.name().equals(fieldName)) { + return field; + } + } + + throw new RuntimeException("Cannot find field: " + fieldName); + } + + private void appendDataField( + String name, int newId, org.apache.paimon.types.DataType type, String description) { + for (DataField field : fields) { + if (field.name().equals(name)) { + throw new IllegalStateException( + String.format( + "A field with name %s has already been appended. Existing fields: %s", + name, fields)); + } + if (field.id() == newId) { + throw new IllegalStateException( + String.format( + "A field with id %s has already been appended. Existing fields: %s", + newId, fields)); + } + } + + if (type instanceof org.apache.paimon.types.RowType) { + type = new MutableRowType((org.apache.paimon.types.RowType) type); + } + fields.add(new DataField(newId, name, type, description)); + } + + @Override + public int defaultSize() { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.paimon.types.DataType copy(boolean isNullable) { + throw new UnsupportedOperationException(); + } + + @Override + public String asSQLString() { + throw new UnsupportedOperationException(); + } + + @Override + public R accept(DataTypeVisitor visitor) { + throw new UnsupportedOperationException(); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java index 559976921e2e..cc33ef167c7d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.options.Options; import org.apache.paimon.table.BucketMode; @@ -48,15 +49,16 @@ public class ContinuousFileStoreSource extends FlinkSource { public ContinuousFileStoreSource( ReadBuilder readBuilder, Map options, @Nullable Long limit) { - this(readBuilder, options, limit, BucketMode.HASH_FIXED); + this(readBuilder, options, limit, BucketMode.HASH_FIXED, null); } public ContinuousFileStoreSource( ReadBuilder readBuilder, Map options, @Nullable Long limit, - BucketMode bucketMode) { - super(readBuilder, limit); + BucketMode bucketMode, + @Nullable NestedProjectedRowData rowData) { + super(readBuilder, limit, rowData); this.options = options; this.bucketMode = bucketMode; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java index 8fc78c868ba5..937d54c9f758 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.table.source.TableRead; @@ -48,7 +49,8 @@ public FileStoreSourceReader( TableRead tableRead, FileStoreSourceReaderMetrics metrics, IOManager ioManager, - @Nullable Long limit) { + @Nullable Long limit, + @Nullable NestedProjectedRowData rowData) { // limiter is created in SourceReader, it can be shared in all split readers super( () -> @@ -56,7 +58,7 @@ public FileStoreSourceReader( tableRead, RecordLimiter.create(limit), metrics), (element, output, state) -> FlinkRecordsWithSplitIds.emitRecord( - readerContext, element, output, state, metrics), + readerContext, element, output, state, metrics, rowData), readerContext.getConfiguration(), readerContext); this.ioManager = ioManager; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java index ecb304f83c58..9860c1c055fe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.source; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.utils.Reference; @@ -109,7 +110,8 @@ public static void emitRecord( RecordIterator element, SourceOutput output, FileStoreSourceSplitState state, - FileStoreSourceReaderMetrics metrics) { + FileStoreSourceReaderMetrics metrics, + @Nullable NestedProjectedRowData nestedProjectedRowData) { long timestamp = TimestampAssigner.NO_TIMESTAMP; if (metrics.getLatestFileCreationTime() != FileStoreSourceReaderMetrics.UNDEFINED) { timestamp = metrics.getLatestFileCreationTime(); @@ -131,7 +133,11 @@ public static void emitRecord( numRecordsIn.inc(); } - output.collect(record.getRecord(), timestamp); + RowData rowData = record.getRecord(); + if (nestedProjectedRowData != null) { + rowData = nestedProjectedRowData.replaceRow(rowData); + } + output.collect(rowData, timestamp); state.setPosition(record); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java index 7643f7b775d3..5fcfc9b379c6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.table.source.ReadBuilder; @@ -43,10 +44,15 @@ public abstract class FlinkSource protected final ReadBuilder readBuilder; @Nullable protected final Long limit; + @Nullable protected final NestedProjectedRowData rowData; - public FlinkSource(ReadBuilder readBuilder, @Nullable Long limit) { + public FlinkSource( + ReadBuilder readBuilder, + @Nullable Long limit, + @Nullable NestedProjectedRowData rowData) { this.readBuilder = readBuilder; this.limit = limit; + this.rowData = rowData; } @Override @@ -56,7 +62,12 @@ public SourceReader createReader(SourceReaderCont FileStoreSourceReaderMetrics sourceReaderMetrics = new FileStoreSourceReaderMetrics(context.metricGroup()); return new FileStoreSourceReader( - context, readBuilder.newRead(), sourceReaderMetrics, ioManager, limit); + context, + readBuilder.newRead(), + sourceReaderMetrics, + ioManager, + limit, + NestedProjectedRowData.copy(rowData)); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index b85d5274b241..d99efae0539d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions.StartupMode; import org.apache.paimon.CoreOptions.StreamingReadMode; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.Projection; import org.apache.paimon.flink.log.LogSourceProvider; import org.apache.paimon.flink.sink.FlinkSink; @@ -171,9 +172,12 @@ FlinkSourceBuilder logSourceProvider(LogSourceProvider logSourceProvider) { return this; } - private ReadBuilder createReadBuilder() { - ReadBuilder readBuilder = - table.newReadBuilder().withProjection(projectedFields).withFilter(predicate); + private ReadBuilder createReadBuilder(@Nullable org.apache.paimon.types.RowType readType) { + ReadBuilder readBuilder = table.newReadBuilder(); + if (readType != null) { + readBuilder.withReadType(readType); + } + readBuilder.withFilter(predicate); if (limit != null) { readBuilder.withLimit(limit.intValue()); } @@ -184,24 +188,33 @@ private DataStream buildStaticFileSource() { Options options = Options.fromMap(table.options()); return toDataStream( new StaticFileStoreSource( - createReadBuilder(), + createReadBuilder(projectedRowType()), limit, options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE), options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE), - dynamicPartitionFilteringInfo)); + dynamicPartitionFilteringInfo, + outerProject())); } private DataStream buildContinuousFileSource() { return toDataStream( new ContinuousFileStoreSource( - createReadBuilder(), table.options(), limit, bucketMode)); + createReadBuilder(projectedRowType()), + table.options(), + limit, + bucketMode, + outerProject())); } private DataStream buildAlignedContinuousFileSource() { assertStreamingConfigurationForAlignMode(env); return toDataStream( new AlignedContinuousFileStoreSource( - createReadBuilder(), table.options(), limit, bucketMode)); + createReadBuilder(projectedRowType()), + table.options(), + limit, + bucketMode, + outerProject())); } private DataStream toDataStream(Source source) { @@ -237,6 +250,20 @@ private TypeInformation produceTypeInfo() { return InternalTypeInfo.of(produceType); } + private @Nullable org.apache.paimon.types.RowType projectedRowType() { + return Optional.ofNullable(projectedFields) + .map(Projection::of) + .map(p -> p.project(table.rowType())) + .orElse(null); + } + + private @Nullable NestedProjectedRowData outerProject() { + return Optional.ofNullable(projectedFields) + .map(Projection::of) + .map(p -> p.getOuterProjectRow(table.rowType())) + .orElse(null); + } + /** Build source {@link DataStream} with {@link RowData}. */ public DataStream buildForRow() { DataType rowType = fromLogicalToDataType(toLogicalType(table.rowType())); @@ -280,7 +307,10 @@ public DataStream build() { return toDataStream( HybridSource.builder( LogHybridSourceFactory.buildHybridFirstSource( - table, projectedFields, predicate)) + table, + projectedRowType(), + predicate, + outerProject())) .addSource( new LogHybridSourceFactory(logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED) @@ -310,12 +340,13 @@ private DataStream buildContinuousStreamOperator() { env, sourceName, produceTypeInfo(), - createReadBuilder(), + createReadBuilder(projectedRowType()), conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(), watermarkStrategy == null, conf.get( FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION), - bucketMode); + bucketMode, + outerProject()); if (parallelism != null) { dataStream.getTransformation().setParallelism(parallelism); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 12b579589d0f..741754fc3376 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -40,6 +40,7 @@ import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,11 +124,11 @@ public Result applyFilters(List filters) { @Override public boolean supportsNestedProjection() { - return false; + return true; } @Override - public void applyProjection(int[][] projectedFields) { + public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.projectFields = projectedFields; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java index 90c283bf87c9..bc361cdbf3a9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java @@ -20,6 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.log.LogSourceProvider; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.options.Options; @@ -29,6 +30,7 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.StreamDataTableScan; import org.apache.paimon.table.source.StreamTableScan; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.api.connector.source.Boundedness; @@ -69,7 +71,10 @@ public LogHybridSourceFactory(LogSourceProvider provider) { } public static FlinkSource buildHybridFirstSource( - Table table, @Nullable int[][] projectedFields, @Nullable Predicate predicate) { + Table table, + @Nullable RowType readType, + @Nullable Predicate predicate, + @Nullable NestedProjectedRowData rowData) { if (!(table instanceof DataTable)) { throw new UnsupportedOperationException( String.format( @@ -79,10 +84,16 @@ public static FlinkSource buildHybridFirstSource( DataTable dataTable = (DataTable) table; + ReadBuilder readBuilder = table.newReadBuilder(); + if (readType != null) { + readBuilder.withReadType(readType); + } + return new FlinkHybridFirstSource( - table.newReadBuilder().withProjection(projectedFields).withFilter(predicate), + readBuilder.withFilter(predicate), dataTable.snapshotManager(), - dataTable.coreOptions().toConfiguration()); + dataTable.coreOptions().toConfiguration(), + rowData); } /** The first source of a log {@link HybridSource}. */ @@ -94,8 +105,11 @@ private static class FlinkHybridFirstSource extends FlinkSource { private final Options options; public FlinkHybridFirstSource( - ReadBuilder readBuilder, SnapshotManager snapshotManager, Options options) { - super(readBuilder, null); + ReadBuilder readBuilder, + SnapshotManager snapshotManager, + Options options, + @Nullable NestedProjectedRowData rowData) { + super(readBuilder, null, rowData); this.snapshotManager = snapshotManager; this.options = options; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java index af425aab5e46..624f5434810d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.source; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner; import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner; @@ -53,7 +54,7 @@ public StaticFileStoreSource( @Nullable Long limit, int splitBatchSize, SplitAssignMode splitAssignMode) { - this(readBuilder, limit, splitBatchSize, splitAssignMode, null); + this(readBuilder, limit, splitBatchSize, splitAssignMode, null, null); } public StaticFileStoreSource( @@ -61,8 +62,9 @@ public StaticFileStoreSource( @Nullable Long limit, int splitBatchSize, SplitAssignMode splitAssignMode, - @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) { - super(readBuilder, limit); + @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo, + @Nullable NestedProjectedRowData rowData) { + super(readBuilder, limit, rowData); this.splitBatchSize = splitBatchSize; this.splitAssignMode = splitAssignMode; this.dynamicPartitionFilteringInfo = dynamicPartitionFilteringInfo; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index e914c617fffb..5198bd42136b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -19,9 +19,12 @@ package org.apache.paimon.flink.source; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.PaimonDataStreamScanProvider; +import org.apache.paimon.flink.Projection; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; @@ -32,8 +35,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.ScanTableSource.ScanContext; -import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; import org.apache.flink.table.data.RowData; import javax.annotation.Nullable; @@ -80,13 +81,29 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { Source source; - ReadBuilder readBuilder = - table.newReadBuilder().withProjection(projectFields).withFilter(predicate); + + NestedProjectedRowData rowData = null; + org.apache.paimon.types.RowType readType = null; + if (projectFields != null) { + Projection projection = Projection.of(projectFields); + rowData = projection.getOuterProjectRow(table.rowType()); + readType = projection.project(table.rowType()); + } + + ReadBuilder readBuilder = table.newReadBuilder(); + if (readType != null) { + readBuilder.withReadType(readType); + } + readBuilder.withFilter(predicate); if (isStreamingMode && table instanceof DataTable) { - source = new ContinuousFileStoreSource(readBuilder, table.options(), limit); + source = + new ContinuousFileStoreSource( + readBuilder, table.options(), limit, BucketMode.HASH_FIXED, rowData); } else { - source = new StaticFileStoreSource(readBuilder, limit, splitBatchSize, splitAssignMode); + source = + new StaticFileStoreSource( + readBuilder, limit, splitBatchSize, splitAssignMode, null, rowData); } return new PaimonDataStreamScanProvider( source.getBoundedness() == Boundedness.BOUNDED, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java index 705e1d9a7a4c..63b3f63f7f50 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.ContinuousFileStoreSource; import org.apache.paimon.flink.source.FileStoreSourceSplit; import org.apache.paimon.flink.source.PendingSplitsCheckpoint; @@ -52,8 +53,9 @@ public AlignedContinuousFileStoreSource( ReadBuilder readBuilder, Map options, @Nullable Long limit, - BucketMode bucketMode) { - super(readBuilder, options, limit, bucketMode); + BucketMode bucketMode, + @Nullable NestedProjectedRowData rowData) { + super(readBuilder, options, limit, bucketMode, rowData); } @Override @@ -72,8 +74,8 @@ public SourceReader createReader(SourceReaderCont ioManager, limit, new FutureCompletingBlockingQueue<>( - context.getConfiguration() - .get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY))); + context.getConfiguration().get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)), + rowData); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java index a8ffe3de561f..7d6f47296a74 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source.align; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.FileStoreSourceReader; import org.apache.paimon.flink.source.FileStoreSourceSplit; import org.apache.paimon.flink.source.FileStoreSourceSplitState; @@ -57,8 +58,9 @@ public AlignedSourceReader( IOManager ioManager, @Nullable Long limit, FutureCompletingBlockingQueue>> - elementsQueue) { - super(readerContext, tableRead, metrics, ioManager, limit); + elementsQueue, + @Nullable NestedProjectedRowData rowData) { + super(readerContext, tableRead, metrics, ioManager, limit, rowData); this.elementsQueue = elementsQueue; this.nextCheckpointId = null; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java index 4ec0a4f99d9f..2783b0ae0173 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.source.operator; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; import org.apache.paimon.flink.source.SimpleSourceSplit; @@ -210,7 +211,8 @@ public static DataStream buildSource( long monitorInterval, boolean emitSnapshotWatermark, boolean shuffleBucketWithPartition, - BucketMode bucketMode) { + BucketMode bucketMode, + NestedProjectedRowData nestedProjectedRowData) { SingleOutputStreamOperator singleOutputStreamOperator = env.fromSource( new MonitorSource( @@ -227,7 +229,7 @@ public static DataStream buildSource( singleOutputStreamOperator, shuffleBucketWithPartition); return sourceDataStream.transform( - name + "-Reader", typeInfo, new ReadOperator(readBuilder)); + name + "-Reader", typeInfo, new ReadOperator(readBuilder, nestedProjectedRowData)); } private static DataStream shuffleUnwareBucket( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index ccc66194560e..1757a859df44 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.FlinkRowData; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; @@ -36,6 +37,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; +import javax.annotation.Nullable; + /** * The operator that reads the {@link Split splits} received from the preceding {@link * MonitorSource}. Contrary to the {@link MonitorSource} which has a parallelism of 1, this operator @@ -47,6 +50,7 @@ public class ReadOperator extends AbstractStreamOperator private static final long serialVersionUID = 1L; private final ReadBuilder readBuilder; + @Nullable private final NestedProjectedRowData nestedProjectedRowData; private transient TableRead read; private transient StreamRecord reuseRecord; @@ -61,8 +65,10 @@ public class ReadOperator extends AbstractStreamOperator private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; private transient Counter numRecordsIn; - public ReadOperator(ReadBuilder readBuilder) { + public ReadOperator( + ReadBuilder readBuilder, @Nullable NestedProjectedRowData nestedProjectedRowData) { this.readBuilder = readBuilder; + this.nestedProjectedRowData = nestedProjectedRowData; } @Override @@ -85,7 +91,11 @@ public void open() throws Exception { .getSpillingDirectoriesPaths()); this.read = readBuilder.newRead().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); - this.reuseRecord = new StreamRecord<>(reuseRow); + if (nestedProjectedRowData != null) { + this.reuseRecord = new StreamRecord<>(nestedProjectedRowData); + } else { + this.reuseRecord = new StreamRecord<>(reuseRow); + } this.idlingStarted(); } @@ -116,6 +126,9 @@ public void processElement(StreamRecord record) throws Exception { } reuseRow.replace(iterator.next()); + if (nestedProjectedRowData != null) { + nestedProjectedRowData.replaceRow(this.reuseRow); + } output.collect(reuseRecord); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java new file mode 100644 index 000000000000..c2c0993b1e86 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link Projection}. */ +public class ProjectionTest { + @Test + public void testNestedProjection() { + RowType writeType = + DataTypes.ROW( + DataTypes.FIELD(0, "f0", DataTypes.INT()), + DataTypes.FIELD( + 1, + "f1", + DataTypes.ROW( + DataTypes.FIELD(2, "f0", DataTypes.INT()), + DataTypes.FIELD(3, "f1", DataTypes.INT()), + DataTypes.FIELD(4, "f2", DataTypes.INT())))); + + // skip read f0, f1.f1 + RowType readType = + DataTypes.ROW( + DataTypes.FIELD( + 1, + "f1", + DataTypes.ROW( + DataTypes.FIELD(2, "f0", DataTypes.INT()), + DataTypes.FIELD(4, "f2", DataTypes.INT())))); + + Projection projection = Projection.of(new int[][] {{1, 0}, {1, 2}}); + assertThat(projection.project(writeType)).isEqualTo(readType); + + RowType readTypeForFlink = + DataTypes.ROW( + DataTypes.FIELD( + 0, + "f1", + DataTypes.ROW( + DataTypes.FIELD(0, "f0", DataTypes.INT()), + DataTypes.FIELD(1, "f2", DataTypes.INT())))); + + NestedProjectedRowData rowData = projection.getOuterProjectRow(writeType); + + assertThat(rowData.getRowType()).isEqualTo(toLogicalType(readTypeForFlink)); + + assertThat(rowData.getProjectedFields()).isEqualTo(new int[][] {{0, 0}, {0, 1}}); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java index 13613dab063a..24f35cdfdb8c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java @@ -129,7 +129,7 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception { public void logHybridFileStoreSourceScanMetricsTest() throws Exception { writeOnce(); FlinkSource logHybridFileStoreSource = - LogHybridSourceFactory.buildHybridFirstSource(table, null, null); + LogHybridSourceFactory.buildHybridFirstSource(table, null, null, null); logHybridFileStoreSource.restoreEnumerator(context, null); assertThat(TestingMetricUtils.getGauge(scanMetricGroup, "lastScannedManifests").getValue()) .isEqualTo(1L); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java index 882763cf74da..608daa85978d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java @@ -141,6 +141,7 @@ protected FileStoreSourceReader createReader(TestingReaderContext context) { new TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(), new FileStoreSourceReaderMetrics(new DummyMetricGroup()), IOManager.create(tempDir.toString()), + null, null); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java index 7f5f2f174d6f..eb83790ec277 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java @@ -69,7 +69,8 @@ public void testEmitRecord() { iterator, output, state, - new FileStoreSourceReaderMetrics(new DummyMetricGroup())); + new FileStoreSourceReaderMetrics(new DummyMetricGroup()), + null); assertThat(output.getEmittedRecords()).containsExactly(rows); assertThat(state.recordsToSkip()).isEqualTo(2); assertThat(records.nextRecordFromSplit()).isNull(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java new file mode 100644 index 000000000000..62aa52c2d28e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.flink.CatalogITCaseBase; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; + +import org.apache.flink.table.api.ExplainFormat; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** Tests for {@link SupportsProjectionPushDown}. */ +public class ProjectionPushDownITCase extends CatalogITCaseBase { + + @Override + public List ddl() { + return ImmutableList.of( + "CREATE TABLE T (" + + "a INT, b ROW, c STRING, d ROW, d1 BOOLEAN, d2 INT>) PARTITIONED BY (a);"); + } + + @BeforeEach + @Override + public void before() throws IOException { + super.before(); + batchSql( + "INSERT INTO T VALUES " + + "(1, ROW(10, 'value1'), '1', ROW(ROW('valued1', 1), true, 10)), " + + "(1, ROW(20, 'value2'), '2', ROW(ROW('valued2', 2), false, 20)), " + + "(2, ROW(30, 'value3'), '3', ROW(ROW('valued3', 3), true, 30)), " + + "(3, ROW(30, 'value3'), '3', ROW(ROW('valued3', 3), false, 30))"); + } + + @Test + public void testProjectionPushDown() { + String sql = "SELECT a, c FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, c]]], fields=[a, c])", + Row.ofKind(RowKind.INSERT, 1, "1"), + Row.ofKind(RowKind.INSERT, 1, "2"), + Row.ofKind(RowKind.INSERT, 2, "3"), + Row.ofKind(RowKind.INSERT, 3, "3")); + } + + @Test + public void testProjectionPushDownWithUnorderedColumns() { + String sql = "SELECT c, a FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[c, a]]], fields=[c, a])", + Row.ofKind(RowKind.INSERT, "1", 1), + Row.ofKind(RowKind.INSERT, "2", 1), + Row.ofKind(RowKind.INSERT, "3", 2), + Row.ofKind(RowKind.INSERT, "3", 3)); + } + + @Test + public void testNestedProjectionPushDown() { + String sql = "SELECT a, b.b1 FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, b_b1]]], fields=[a, b_b1])", + Row.ofKind(RowKind.INSERT, 1, "value1"), + Row.ofKind(RowKind.INSERT, 1, "value2"), + Row.ofKind(RowKind.INSERT, 2, "value3"), + Row.ofKind(RowKind.INSERT, 3, "value3")); + } + + @Test + public void testNestedProjectionPushDownTripleLevel() { + String sql = "SELECT a, d.d0.d00 FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, d_d0_d00]]], fields=[a, d_d0_d00])", + Row.ofKind(RowKind.INSERT, 1, "valued1"), + Row.ofKind(RowKind.INSERT, 1, "valued2"), + Row.ofKind(RowKind.INSERT, 2, "valued3"), + Row.ofKind(RowKind.INSERT, 3, "valued3")); + } + + @Test + public void testNestedProjectionPushDownMultipleFields() { + String sql = "SELECT a, b.b1, d.d2 FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, b_b1, d_d2]]], fields=[a, b_b1, d_d2])", + Row.ofKind(RowKind.INSERT, 1, "value1", 10), + Row.ofKind(RowKind.INSERT, 1, "value2", 20), + Row.ofKind(RowKind.INSERT, 2, "value3", 30), + Row.ofKind(RowKind.INSERT, 3, "value3", 30)); + } + + @Test + public void testMultipleNestedProjectionPushDownWithUnorderedColumns() { + String sql = "SELECT c, d.d1, b.b1, a FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[c, d_d1, b_b1, a]]], fields=[c, d_d1, b_b1, a])", + Row.ofKind(RowKind.INSERT, "1", true, "value1", 1), + Row.ofKind(RowKind.INSERT, "2", false, "value2", 1), + Row.ofKind(RowKind.INSERT, "3", true, "value3", 2), + Row.ofKind(RowKind.INSERT, "3", false, "value3", 3)); + } + + @Test + public void testSystemTableProjectionPushDown() { + String sql = "SELECT schema_id, primary_keys FROM T$schemas"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T$schemas, project=[schema_id, primary_keys]]], fields=[schema_id, primary_keys])", + Row.ofKind(RowKind.INSERT, 0L, "[]")); + } + + private void assertPlanAndResult(String sql, String planIdentifier, Row... expectedRows) { + String plan = tEnv.explainSql(sql, ExplainFormat.TEXT); + String[] lines = plan.split("\n"); + String trimmed = Arrays.stream(lines).map(String::trim).collect(Collectors.joining("\n")); + Assertions.assertThat(trimmed).contains(planIdentifier); + List result = batchSql(sql); + Assertions.assertThat(result).containsExactlyInAnyOrder(expectedRows); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java index c36af0f6dcbc..f815dbe6321b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java @@ -77,6 +77,7 @@ protected FileStoreSourceReader createReader(TestingReaderContext context) { new FileStoreSourceReaderMetrics(new DummyMetricGroup()), IOManager.create(tempDir.toString()), null, - new FutureCompletingBlockingQueue<>(2)); + new FutureCompletingBlockingQueue<>(2), + null); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index 0cd969707cfa..6c4c7860e5fa 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -183,7 +183,7 @@ public void testMonitorSource() throws Exception { @Test public void testReadOperator() throws Exception { - ReadOperator readOperator = new ReadOperator(table.newReadBuilder()); + ReadOperator readOperator = new ReadOperator(table.newReadBuilder(), null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(readOperator); harness.setup( @@ -205,7 +205,7 @@ public void testReadOperator() throws Exception { @Test public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { - ReadOperator readOperator = new ReadOperator(table.newReadBuilder()); + ReadOperator readOperator = new ReadOperator(table.newReadBuilder(), null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(readOperator); harness.setup(