Skip to content

Commit

Permalink
[flink] Fix implementation of ProjectionRowData and misc comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Dec 16, 2024
1 parent dc8714e commit 1c10be1
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ private List<DataField> readDataFields(List<DataField> allDataFields) {
return readDataFields;
}

@Nullable
private DataType pruneDataType(DataType readType, DataType dataType) {
switch (readType.getTypeRoot()) {
case ROW:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;

/**
* {@link Projection} represents a list of (possibly nested) indexes that can be used to project
* data types. A row projection includes both reducing the accessible fields and reordering them.
Expand All @@ -47,7 +49,7 @@ private Projection() {}
public abstract org.apache.paimon.types.RowType project(
org.apache.paimon.types.RowType rowType);

public abstract ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType);
public abstract ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType);

/** @return {@code true} whether this projection is nested or not. */
public abstract boolean isNested();
Expand Down Expand Up @@ -149,8 +151,8 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r
}

@Override
public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) {
return new NestedProjection(toNestedIndexes()).getRowData(rowType);
public ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) {
return new NestedProjection(toNestedIndexes()).getOuterProjectRow(rowType);
}

@Override
Expand Down Expand Up @@ -241,7 +243,7 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r
}

@Override
public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) {
public ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) {
org.apache.paimon.types.RowType resultType = project(rowType);

int[][] resultIndices = new int[this.projection.length][];
Expand All @@ -262,7 +264,7 @@ public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) {
}
}

return new ProjectionRowData(resultType, resultIndices);
return new ProjectionRowData(toLogicalType(resultType), resultIndices);
}

@Override
Expand Down Expand Up @@ -304,8 +306,8 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r
}

@Override
public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) {
return new NestedProjection(toNestedIndexes()).getRowData(rowType);
public ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) {
return new NestedProjection(toNestedIndexes()).getOuterProjectRow(rowType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@

package org.apache.paimon.flink;

import org.apache.paimon.types.RowType;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RawValueData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

import javax.annotation.Nullable;
Expand All @@ -39,22 +40,48 @@
* projection information.
*/
public class ProjectionRowData implements RowData, Serializable {
private static final long serialVersionUID = 1L;

private final RowType producedDataType;
private final int[][] projectedFields;
private final int[] lastProjectedFields;
private transient RowData row;
private final FieldGetter[][] fieldGetters;

private transient GenericRowData row;

ProjectionRowData(RowType producedDataType, int[][] projectedFields) {
this.producedDataType = producedDataType;
this.projectedFields = projectedFields;
this.lastProjectedFields = new int[projectedFields.length];
this.fieldGetters = new FieldGetter[projectedFields.length][];
for (int i = 0; i < projectedFields.length; i++) {
this.lastProjectedFields[i] = projectedFields[i][projectedFields[i].length - 1];
this.fieldGetters[i] = new FieldGetter[projectedFields[i].length];
LogicalType currentType = producedDataType;
for (int j = 0; j < projectedFields[i].length; j++) {
currentType = ((RowType) currentType).getTypeAt(projectedFields[i][j]);
this.fieldGetters[i][j] =
RowData.createFieldGetter(currentType, projectedFields[i][j]);
}
}
}

public ProjectionRowData replaceRow(RowData row) {
this.row = row;
public ProjectionRowData replaceRow(RowData inputRow) {
if (this.row == null) {
this.row = new GenericRowData(inputRow.getRowKind(), fieldGetters.length);
}

for (int i = 0; i < fieldGetters.length; i++) {
Object currentRow = inputRow;
for (int j = 0; j < fieldGetters[i].length; j++) {
if (currentRow == null) {
break;
}
currentRow = this.fieldGetters[i][j].getFieldOrNull((RowData) currentRow);
}
this.row.setField(i, currentRow);
}

if (inputRow != null) {
this.row.setRowKind(inputRow.getRowKind());
}
return this;
}

Expand All @@ -67,7 +94,7 @@ public ProjectionRowData replaceRow(RowData row) {

@Override
public int getArity() {
return projectedFields.length;
return this.row.getArity();
}

@Override
Expand All @@ -82,160 +109,82 @@ public void setRowKind(RowKind rowKind) {

@Override
public boolean isNullAt(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
return true;
}
return rowData.isNullAt(lastProjectedFields[i]);
}

private @Nullable RowData extractInternalRow(int i) {
int[] projectedField = projectedFields[i];
RowData rowData = this.row;
RowType dataType = producedDataType;
for (int j = 0; j < projectedField.length - 1; j++) {
dataType = (RowType) dataType.getTypeAt(projectedField[j]);
if (rowData.isNullAt(projectedField[j])) {
return null;
}
rowData = rowData.getRow(projectedField[j], dataType.getFieldCount());
}
return rowData;
return this.row.isNullAt(i);
}

@Override
public boolean getBoolean(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
throw new NullPointerException();
}
return rowData.getBoolean(lastProjectedFields[i]);
return this.row.getBoolean(i);
}

@Override
public byte getByte(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
throw new NullPointerException();
}
return rowData.getByte(lastProjectedFields[i]);
return this.row.getByte(i);
}

@Override
public short getShort(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
throw new NullPointerException();
}
return rowData.getShort(lastProjectedFields[i]);
return this.row.getShort(i);
}

@Override
public int getInt(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
throw new NullPointerException();
}
return rowData.getInt(lastProjectedFields[i]);
return this.row.getInt(i);
}

@Override
public long getLong(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
throw new NullPointerException();
}
return rowData.getLong(lastProjectedFields[i]);
return this.row.getLong(i);
}

@Override
public float getFloat(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
throw new NullPointerException();
}
return rowData.getFloat(lastProjectedFields[i]);
return this.row.getFloat(i);
}

@Override
public double getDouble(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
throw new NullPointerException();
}
return rowData.getDouble(lastProjectedFields[i]);
return this.row.getDouble(i);
}

@Override
public StringData getString(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
return null;
}
return rowData.getString(lastProjectedFields[i]);
return this.row.getString(i);
}

@Override
public DecimalData getDecimal(int i, int i1, int i2) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
return null;
}
return rowData.getDecimal(lastProjectedFields[i], i1, i2);
return this.row.getDecimal(i, i1, i2);
}

@Override
public TimestampData getTimestamp(int i, int i1) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
return null;
}
return rowData.getTimestamp(lastProjectedFields[i], i1);
return this.row.getTimestamp(i, i1);
}

@Override
public <T> RawValueData<T> getRawValue(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
return null;
}
return rowData.getRawValue(lastProjectedFields[i]);
return this.row.getRawValue(i);
}

@Override
public byte[] getBinary(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
return null;
}
return rowData.getBinary(lastProjectedFields[i]);
return this.row.getBinary(i);
}

@Override
public ArrayData getArray(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
return null;
}
return rowData.getArray(lastProjectedFields[i]);
return this.row.getArray(i);
}

@Override
public MapData getMap(int i) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
return null;
}
return rowData.getMap(lastProjectedFields[i]);
return this.row.getMap(i);
}

@Override
public RowData getRow(int i, int i1) {
RowData rowData = extractInternalRow(i);
if (rowData == null) {
return null;
}
return rowData.getRow(lastProjectedFields[i], i1);
return this.row.getRow(i, i1);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private DataStream<RowData> buildStaticFileSource() {
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE),
dynamicPartitionFilteringInfo,
projectedRowData()));
outerProject()));
}

private DataStream<RowData> buildContinuousFileSource() {
Expand All @@ -203,7 +203,7 @@ private DataStream<RowData> buildContinuousFileSource() {
table.options(),
limit,
bucketMode,
projectedRowData()));
outerProject()));
}

private DataStream<RowData> buildAlignedContinuousFileSource() {
Expand All @@ -214,7 +214,7 @@ private DataStream<RowData> buildAlignedContinuousFileSource() {
table.options(),
limit,
bucketMode,
projectedRowData()));
outerProject()));
}

private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
Expand Down Expand Up @@ -257,10 +257,10 @@ private TypeInformation<RowData> produceTypeInfo() {
.orElse(null);
}

private @Nullable ProjectionRowData projectedRowData() {
private @Nullable ProjectionRowData outerProject() {
return Optional.ofNullable(projectedFields)
.map(Projection::of)
.map(p -> p.getRowData(table.rowType()))
.map(p -> p.getOuterProjectRow(table.rowType()))
.orElse(null);
}

Expand Down Expand Up @@ -310,7 +310,7 @@ public DataStream<RowData> build() {
table,
projectedRowType(),
predicate,
projectedRowData()))
outerProject()))
.addSource(
new LogHybridSourceFactory(logSourceProvider),
Boundedness.CONTINUOUS_UNBOUNDED)
Expand Down Expand Up @@ -346,7 +346,7 @@ private DataStream<RowData> buildContinuousStreamOperator() {
conf.get(
FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
bucketMode,
projectedRowData());
outerProject());
if (parallelism != null) {
dataStream.getTransformation().setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
org.apache.paimon.types.RowType readType = null;
if (projectFields != null) {
Projection projection = Projection.of(projectFields);
rowData = projection.getRowData(table.rowType());
rowData = projection.getOuterProjectRow(table.rowType());
readType = projection.project(table.rowType());
}

Expand Down
Loading

0 comments on commit 1c10be1

Please sign in to comment.