diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java index 529aad54..fb6c3d21 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java @@ -26,17 +26,13 @@ import com.alibaba.fluss.record.LogRecordReadContext; import com.alibaba.fluss.row.GenericRow; import com.alibaba.fluss.row.InternalRow; -import com.alibaba.fluss.row.ProjectedRow; import com.alibaba.fluss.rpc.messages.FetchLogRequest; import com.alibaba.fluss.rpc.protocol.ApiError; import com.alibaba.fluss.utils.CloseableIterator; -import com.alibaba.fluss.utils.Projection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; @@ -61,8 +57,7 @@ abstract class CompletedFetch { private final Iterator batches; private final LogScannerStatus logScannerStatus; protected final LogRecordReadContext readContext; - @Nullable protected final Projection projection; - protected final InternalRow.FieldGetter[] fieldGetters; + protected final InternalRow.FieldGetter[] selectedFieldGetters; private LogRecordBatch currentBatch; private LogRecord lastRecord; @@ -92,30 +87,20 @@ public CompletedFetch( this.readContext = readContext; this.isCheckCrcs = isCheckCrcs; this.logScannerStatus = logScannerStatus; - this.projection = readContext.getProjection(); this.nextFetchOffset = fetchOffset; - this.fieldGetters = readContext.getProjectedFieldGetters(); + this.selectedFieldGetters = readContext.getSelectedFieldGetters(); } // TODO: optimize this to avoid deep copying the record. // refactor #fetchRecords to return an iterator which lazily deserialize // from underlying record stream and arrow buffer. ScanRecord toScanRecord(LogRecord record) { - GenericRow newRow = new GenericRow(fieldGetters.length); + GenericRow newRow = new GenericRow(selectedFieldGetters.length); InternalRow internalRow = record.getRow(); - for (int i = 0; i < fieldGetters.length; i++) { - newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow)); - } - if (projection != null && projection.isReorderingNeeded()) { - return new ScanRecord( - record.logOffset(), - record.timestamp(), - record.getRowKind(), - ProjectedRow.from(projection.getReorderingIndexes()).replaceRow(newRow)); - } else { - return new ScanRecord( - record.logOffset(), record.timestamp(), record.getRowKind(), newRow); + for (int i = 0; i < selectedFieldGetters.length; i++) { + newRow.setField(i, selectedFieldGetters[i].getFieldOrNull(internalRow)); } + return new ScanRecord(record.logOffset(), record.timestamp(), record.getRowKind(), newRow); } boolean isConsumed() { @@ -243,7 +228,7 @@ private LogRecord nextFetchedRecord() throws Exception { private void maybeEnsureValid(LogRecordBatch batch) { if (isCheckCrcs) { - if (projection != null) { + if (readContext.isProjectionPushDowned()) { LOG.debug("Skipping CRC check for column projected log record batch."); return; } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java index 0a741aeb..0371c09e 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java @@ -27,7 +27,6 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.InvalidMetadataException; import com.alibaba.fluss.fs.FsPath; -import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableInfo; @@ -85,8 +84,8 @@ public class LogFetcher implements Closeable { private final boolean isPartitioned; private final LogRecordReadContext readContext; // TODO this context can be merge with readContext. Introduce it only because log remote read - // currently can only do project when generate scanRecord instead of doing project while read - // bytes from remote file. + // currently can only do project when generate scanRecord instead of doing project while read + // bytes from remote file. private final LogRecordReadContext remoteReadContext; @Nullable private final Projection projection; private final RpcClient rpcClient; @@ -97,7 +96,6 @@ public class LogFetcher implements Closeable { private final LogFetchBuffer logFetchBuffer; private final LogFetchCollector logFetchCollector; private final RemoteLogDownloader remoteLogDownloader; - private final LogFormat logFormat; @GuardedBy("this") private final Set nodesWithPendingFetchRequests; @@ -119,7 +117,6 @@ public LogFetcher( RemoteFileDownloader remoteFileDownloader) { this.tablePath = tableInfo.getTablePath(); this.isPartitioned = tableInfo.getTableDescriptor().isPartitioned(); - this.logFormat = tableInfo.getTableDescriptor().getLogFormat(); this.readContext = LogRecordReadContext.createReadContext(tableInfo, false, projection); this.remoteReadContext = LogRecordReadContext.createReadContext(tableInfo, true, projection); @@ -317,6 +314,8 @@ private synchronized void handleFetchLogResponse( fetchResultForBucket, readContext, logScannerStatus, + // skipping CRC check if projection push downed as + // the data is pruned isCheckCrcs, fetchOffset); logFetchBuffer.add(completedFetch); @@ -423,7 +422,8 @@ private Map prepareFetchLogRequests() { .setMaxBytes(maxFetchBytes); PbFetchLogReqForTable reqForTable = new PbFetchLogReqForTable().setTableId(finalTableId); - if (projectionPushDownEnable()) { + if (readContext.isProjectionPushDowned()) { + assert projection != null; reqForTable .setProjectionPushdownEnabled(true) .setProjectedFields(projection.getProjectionInOrder()); @@ -449,13 +449,6 @@ private List fetchableBuckets() { return logScannerStatus.fetchableBuckets(tableBucket -> !exclude.contains(tableBucket)); } - private boolean projectionPushDownEnable() { - // Currently, only ARROW log format supports projection push down to server. Other log - // formats will do project in client, see DefaultCompletedFetch#toScanRecord() for more - // details. - return projection != null && logFormat == LogFormat.ARROW; - } - private Integer getTableBucketLeader(TableBucket tableBucket) { metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket); if (metadataUpdater.getBucketLocation(tableBucket).isPresent()) { diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java index c27eebf5..6fac413d 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java @@ -21,6 +21,7 @@ import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.InternalRow.FieldGetter; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; @@ -32,6 +33,7 @@ import javax.annotation.Nullable; import java.util.List; +import java.util.stream.IntStream; import static com.alibaba.fluss.utils.Preconditions.checkArgument; import static com.alibaba.fluss.utils.Preconditions.checkNotNull; @@ -45,16 +47,14 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo private final RowType dataRowType; // the static schemaId of the table, should support dynamic schema evolution in the future private final int schemaId; - // the projectedFieldGetter to get the log value of the table after projection; - private final InternalRow.FieldGetter[] projectedFieldGetters; // the Arrow vector schema root of the table, should be null if not ARROW log format @Nullable private final VectorSchemaRoot vectorSchemaRoot; // the Arrow memory buffer allocator for the table, should be null if not ARROW log format @Nullable private final BufferAllocator bufferAllocator; - // whether context is to read from remote - private final boolean readFromRemote; - // the projection info - @Nullable private final Projection projection; + // the final selected fields of the read data + private final FieldGetter[] selectedFieldGetters; + // whether the projection is push downed to the server side and the returned data is pruned. + private final boolean projectionPushDowned; /** * Creates a LogRecordReadContext for the given table information and projection information. @@ -64,60 +64,51 @@ public static LogRecordReadContext createReadContext( TableDescriptor desc = tableInfo.getTableDescriptor(); RowType rowType = desc.getSchema().toRowType(); LogFormat logFormat = desc.getLogFormat(); + // only for arrow log format, the projection can be push downed to the server side + boolean projectionPushDowned = projection != null && logFormat == LogFormat.ARROW; int schemaId = tableInfo.getSchemaId(); + if (projection == null) { + // set a default dummy projection to simplify code + projection = Projection.of(IntStream.range(0, rowType.getFieldCount()).toArray()); + } if (logFormat == LogFormat.ARROW) { - // TODO: use a more reasonable memory limit - BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); - - // currently, For remote read, arrow log format do not support projection in remote. - if (projection == null || readFromRemote) { - VectorSchemaRoot vectorRoot = - VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); - return createArrowReadContext( - rowType, schemaId, vectorRoot, allocator, readFromRemote, projection); + if (readFromRemote) { + // currently, for remote read, arrow log doesn't support projection pushdown, + // so set the rowType as is. + int[] selectedFields = projection.getProjection(); + return createArrowReadContext(rowType, schemaId, selectedFields, false); } else { + // arrow data that returned from server has been projected (in order) RowType projectedRowType = projection.projectInOrder(rowType); - VectorSchemaRoot vectorRoot = - VectorSchemaRoot.create( - ArrowUtils.toArrowSchema(projectedRowType), allocator); + // need to reorder the fields for final output + int[] selectedFields = projection.getReorderingIndexes(); return createArrowReadContext( - projectedRowType, schemaId, vectorRoot, allocator, false, projection); + projectedRowType, schemaId, selectedFields, projectionPushDowned); } } else if (logFormat == LogFormat.INDEXED) { - return createIndexedReadContext(rowType, schemaId, readFromRemote, projection); + int[] selectedFields = projection.getProjection(); + return createIndexedReadContext(rowType, schemaId, selectedFields); } else { throw new IllegalArgumentException("Unsupported log format: " + logFormat); } } - /** - * Creates a LogRecordReadContext for ARROW log format. - * - * @param dataRowType the schema of the date read form server or remote - * @param schemaId the schemaId of the table - * @param vectorSchemaRoot the shared vector schema root for the table - * @param bufferAllocator the shared buffer allocator for the table - * @param readFromRemote whether context is to read from remote - * @param projection the projection info - */ - public static LogRecordReadContext createArrowReadContext( - RowType dataRowType, - int schemaId, - VectorSchemaRoot vectorSchemaRoot, - BufferAllocator bufferAllocator, - boolean readFromRemote, - @Nullable Projection projection) { - checkNotNull(vectorSchemaRoot); - checkNotNull(bufferAllocator); + private static LogRecordReadContext createArrowReadContext( + RowType dataRowType, int schemaId, int[] selectedFields, boolean projectionPushDowned) { + // TODO: use a more reasonable memory limit + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + VectorSchemaRoot vectorRoot = + VectorSchemaRoot.create(ArrowUtils.toArrowSchema(dataRowType), allocator); + FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields); return new LogRecordReadContext( LogFormat.ARROW, dataRowType, schemaId, - vectorSchemaRoot, - bufferAllocator, - readFromRemote, - projection); + vectorRoot, + allocator, + fieldGetters, + projectionPushDowned); } /** @@ -129,10 +120,8 @@ public static LogRecordReadContext createArrowReadContext( */ @VisibleForTesting public static LogRecordReadContext createArrowReadContext(RowType rowType, int schemaId) { - BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); - VectorSchemaRoot vectorRoot = - VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); - return createArrowReadContext(rowType, schemaId, vectorRoot, allocator, false, null); + int[] selectedFields = IntStream.range(0, rowType.getFieldCount()).toArray(); + return createArrowReadContext(rowType, schemaId, selectedFields, false); } /** @@ -142,25 +131,23 @@ public static LogRecordReadContext createArrowReadContext(RowType rowType, int s * @param schemaId the schemaId of the table */ public static LogRecordReadContext createIndexedReadContext(RowType rowType, int schemaId) { - return new LogRecordReadContext( - LogFormat.INDEXED, rowType, schemaId, null, null, false, null); + int[] selectedFields = IntStream.range(0, rowType.getFieldCount()).toArray(); + return createIndexedReadContext(rowType, schemaId, selectedFields); } /** * Creates a LogRecordReadContext for INDEXED log format. * - * @param rowType the schema of the table + * @param rowType the schema of the read data * @param schemaId the schemaId of the table - * @param readFromRemote whether context is to read from remote - * @param projection the projection info + * @param selectedFields the final selected fields of the read data */ public static LogRecordReadContext createIndexedReadContext( - RowType rowType, - int schemaId, - boolean readFromRemote, - @Nullable Projection projection) { + RowType rowType, int schemaId, int[] selectedFields) { + FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields); + // for INDEXED log format, the projection is NEVER push downed to the server side return new LogRecordReadContext( - LogFormat.INDEXED, rowType, schemaId, null, null, readFromRemote, projection); + LogFormat.INDEXED, rowType, schemaId, null, null, fieldGetters, false); } private LogRecordReadContext( @@ -169,16 +156,15 @@ private LogRecordReadContext( int schemaId, VectorSchemaRoot vectorSchemaRoot, BufferAllocator bufferAllocator, - boolean readFromRemote, - @Nullable Projection projection) { + FieldGetter[] selectedFieldGetters, + boolean projectionPushDowned) { this.logFormat = logFormat; this.dataRowType = dataRowType; this.schemaId = schemaId; this.vectorSchemaRoot = vectorSchemaRoot; this.bufferAllocator = bufferAllocator; - this.readFromRemote = readFromRemote; - this.projection = projection; - this.projectedFieldGetters = buildProjectedFieldGetters(); + this.selectedFieldGetters = selectedFieldGetters; + this.projectionPushDowned = projectionPushDowned; } @Override @@ -196,54 +182,14 @@ public RowType getRowType(int schemaId) { return dataRowType; } - public RowType getDataRowType() { - return dataRowType; + /** Get the selected field getters for the read data. */ + public FieldGetter[] getSelectedFieldGetters() { + return selectedFieldGetters; } - private InternalRow.FieldGetter[] buildProjectedFieldGetters() { - InternalRow.FieldGetter[] fieldGetters; - List dataTypeList = dataRowType.getChildren(); - if (readFromRemote) { - if (projection != null) { - int[] projectionInOrder = projection.getProjectionInOrder(); - fieldGetters = new InternalRow.FieldGetter[projectionInOrder.length]; - for (int i = 0; i < fieldGetters.length; i++) { - fieldGetters[i] = - InternalRow.createFieldGetter( - dataTypeList.get(projectionInOrder[i]), projectionInOrder[i]); - } - } else { - fieldGetters = new InternalRow.FieldGetter[dataTypeList.size()]; - for (int i = 0; i < fieldGetters.length; i++) { - fieldGetters[i] = InternalRow.createFieldGetter(dataTypeList.get(i), i); - } - } - } else { - // Arrow log format already project in the server side. - if (projection != null && logFormat != LogFormat.ARROW) { - int[] projectionInOrder = projection.getProjectionInOrder(); - fieldGetters = new InternalRow.FieldGetter[projectionInOrder.length]; - for (int i = 0; i < fieldGetters.length; i++) { - fieldGetters[i] = - InternalRow.createFieldGetter( - dataTypeList.get(projectionInOrder[i]), projectionInOrder[i]); - } - } else { - fieldGetters = new InternalRow.FieldGetter[dataTypeList.size()]; - for (int i = 0; i < fieldGetters.length; i++) { - fieldGetters[i] = InternalRow.createFieldGetter(dataTypeList.get(i), i); - } - } - } - return fieldGetters; - } - - public InternalRow.FieldGetter[] getProjectedFieldGetters() { - return projectedFieldGetters; - } - - public @Nullable Projection getProjection() { - return projection; + /** Whether the projection is push downed to the server side and the returned data is pruned. */ + public boolean isProjectionPushDowned() { + return projectionPushDowned; } @Override @@ -278,4 +224,15 @@ public void close() { bufferAllocator.close(); } } + + private static FieldGetter[] buildProjectedFieldGetters(RowType rowType, int[] selectedFields) { + List dataTypeList = rowType.getChildren(); + FieldGetter[] fieldGetters = new FieldGetter[selectedFields.length]; + for (int i = 0; i < fieldGetters.length; i++) { + fieldGetters[i] = + InternalRow.createFieldGetter( + dataTypeList.get(selectedFields[i]), selectedFields[i]); + } + return fieldGetters; + } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java b/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java index df3279ee..0968a392 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java @@ -84,10 +84,6 @@ public int getFieldIndex(String fieldName) { } public RowType project(int[] projectFields) { - if (projectFields.length == 0) { - return this; - } - List projectedFields = new ArrayList<>(); for (int projectField : projectFields) { projectedFields.add(this.fields.get(projectField)); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/Projection.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/Projection.java index 0aade6d2..e04b3a80 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/Projection.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/Projection.java @@ -70,7 +70,7 @@ public RowType projectInOrder(RowType rowType) { return rowType.project(projectionInOrder); } - public int[] projection() { + public int[] getProjection() { return projection; }