Skip to content

Commit

Permalink
[client] Indexed row support projection
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong authored and wuchong committed Dec 25, 2024
1 parent 04eb514 commit 8b62653
Show file tree
Hide file tree
Showing 19 changed files with 526 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.record.LogRecordBatch;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.row.GenericRow;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.ProjectedRow;
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.utils.CloseableIterator;
Expand Down Expand Up @@ -58,7 +60,7 @@ abstract class CompletedFetch {
private final boolean isCheckCrcs;
private final Iterator<LogRecordBatch> batches;
private final LogScannerStatus logScannerStatus;
private final LogRecordReadContext readContext;
protected final LogRecordReadContext readContext;
@Nullable protected final Projection projection;
protected final InternalRow.FieldGetter[] fieldGetters;

Expand All @@ -81,8 +83,7 @@ public CompletedFetch(
LogRecordReadContext readContext,
LogScannerStatus logScannerStatus,
boolean isCheckCrcs,
long fetchOffset,
@Nullable Projection projection) {
long fetchOffset) {
this.tableBucket = tableBucket;
this.error = error;
this.sizeInBytes = sizeInBytes;
Expand All @@ -91,12 +92,31 @@ public CompletedFetch(
this.readContext = readContext;
this.isCheckCrcs = isCheckCrcs;
this.logScannerStatus = logScannerStatus;
this.projection = projection;
this.projection = readContext.getProjection();
this.nextFetchOffset = fetchOffset;
this.fieldGetters = readContext.getFieldGetters();
this.fieldGetters = readContext.getProjectedFieldGetters();
}

protected abstract ScanRecord toScanRecord(LogRecord record);
// TODO: optimize this to avoid deep copying the record.
// refactor #fetchRecords to return an iterator which lazily deserialize
// from underlying record stream and arrow buffer.
ScanRecord toScanRecord(LogRecord record) {
GenericRow newRow = new GenericRow(fieldGetters.length);
InternalRow internalRow = record.getRow();
for (int i = 0; i < fieldGetters.length; i++) {
newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow));
}
if (projection != null && projection.isReorderingNeeded()) {
return new ScanRecord(
record.logOffset(),
record.timestamp(),
record.getRowKind(),
ProjectedRow.from(projection.getReorderingIndexes()).replaceRow(newRow));
} else {
return new ScanRecord(
record.logOffset(), record.timestamp(), record.getRowKind(), newRow);
}
}

boolean isConsumed() {
return isConsumed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,10 @@
package com.alibaba.fluss.client.scanner.log;

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.client.scanner.ScanRecord;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.row.GenericRow;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.ProjectedRow;
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.utils.Projection;

import javax.annotation.Nullable;

/**
* {@link DefaultCompletedFetch} is a {@link CompletedFetch} that represents a completed fetch that
Expand All @@ -43,8 +35,7 @@ public DefaultCompletedFetch(
LogRecordReadContext readContext,
LogScannerStatus logScannerStatus,
boolean isCheckCrc,
Long fetchOffset,
@Nullable Projection projection) {
Long fetchOffset) {
super(
tableBucket,
fetchLogResultForBucket.getError(),
Expand All @@ -54,29 +45,6 @@ public DefaultCompletedFetch(
readContext,
logScannerStatus,
isCheckCrc,
fetchOffset,
projection);
}

// TODO: optimize this to avoid deep copying the record.
// refactor #fetchRecords to return an iterator which lazily deserialize
// from underlying record stream and arrow buffer.
@Override
protected ScanRecord toScanRecord(LogRecord record) {
GenericRow newRow = new GenericRow(fieldGetters.length);
InternalRow internalRow = record.getRow();
for (int i = 0; i < fieldGetters.length; i++) {
newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow));
}
if (projection != null && projection.isReorderingNeeded()) {
return new ScanRecord(
record.logOffset(),
record.timestamp(),
record.getRowKind(),
ProjectedRow.from(projection.getReorderingIndexes()).replaceRow(newRow));
} else {
return new ScanRecord(
record.logOffset(), record.timestamp(), record.getRowKind(), newRow);
}
fetchOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.alibaba.fluss.client.scanner.RemoteFileDownloader;
import com.alibaba.fluss.client.scanner.ScanRecord;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
Expand Down Expand Up @@ -113,13 +112,6 @@ public FlussLogScanner(
private Projection sanityProjection(@Nullable int[] projectedFields, TableInfo tableInfo) {
RowType tableRowType = tableInfo.getTableDescriptor().getSchema().toRowType();
if (projectedFields != null) {
LogFormat logFormat = tableInfo.getTableDescriptor().getLogFormat();
if (logFormat != LogFormat.ARROW) {
throw new IllegalArgumentException(
String.format(
"Only ARROW log format supports column projection, but the log format of table '%s' is %s",
tableInfo.getTablePath(), logFormat));
}
for (int projectedField : projectedFields) {
if (projectedField < 0 || projectedField >= tableRowType.getFieldCount()) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.InvalidMetadataException;
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class LogFetcher implements Closeable {
private final LogFetchBuffer logFetchBuffer;
private final LogFetchCollector logFetchCollector;
private final RemoteLogDownloader remoteLogDownloader;
private final LogFormat logFormat;

@GuardedBy("this")
private final Set<Integer> nodesWithPendingFetchRequests;
Expand All @@ -117,8 +119,10 @@ public LogFetcher(
RemoteFileDownloader remoteFileDownloader) {
this.tablePath = tableInfo.getTablePath();
this.isPartitioned = tableInfo.getTableDescriptor().isPartitioned();
this.readContext = LogRecordReadContext.createReadContext(tableInfo, projection);
this.remoteReadContext = LogRecordReadContext.createReadContext(tableInfo, null);
this.logFormat = tableInfo.getTableDescriptor().getLogFormat();
this.readContext = LogRecordReadContext.createReadContext(tableInfo, false, projection);
this.remoteReadContext =
LogRecordReadContext.createReadContext(tableInfo, true, projection);
this.projection = projection;
this.rpcClient = rpcClient;
this.logScannerStatus = logScannerStatus;
Expand Down Expand Up @@ -314,8 +318,7 @@ private synchronized void handleFetchLogResponse(
readContext,
logScannerStatus,
isCheckCrcs,
fetchOffset,
projection);
fetchOffset);
logFetchBuffer.add(completedFetch);
}
}
Expand Down Expand Up @@ -352,8 +355,7 @@ private void pendRemoteFetches(
highWatermark,
remoteReadContext,
logScannerStatus,
isCheckCrcs,
projection);
isCheckCrcs);
logFetchBuffer.pend(pendingFetch);
downloadFuture.onComplete(logFetchBuffer::tryComplete);
}
Expand Down Expand Up @@ -421,7 +423,7 @@ private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
.setMaxBytes(maxFetchBytes);
PbFetchLogReqForTable reqForTable =
new PbFetchLogReqForTable().setTableId(finalTableId);
if (projection != null) {
if (projectionPushDownEnable()) {
reqForTable
.setProjectionPushdownEnabled(true)
.setProjectedFields(projection.getProjectionInOrder());
Expand All @@ -447,6 +449,13 @@ private List<TableBucket> fetchableBuckets() {
return logScannerStatus.fetchableBuckets(tableBucket -> !exclude.contains(tableBucket));
}

private boolean projectionPushDownEnable() {
// Currently, only ARROW log format supports projection push down to server. Other log
// formats will do project in client, see DefaultCompletedFetch#toScanRecord() for more
// details.
return projection != null && logFormat == LogFormat.ARROW;
}

private Integer getTableBucketLeader(TableBucket tableBucket) {
metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket);
if (metadataUpdater.getBucketLocation(tableBucket).isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,10 @@
package com.alibaba.fluss.client.scanner.log;

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.client.scanner.ScanRecord;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.FileLogRecords;
import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.row.GenericRow;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.ProjectedRow;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.utils.Projection;

import javax.annotation.Nullable;

import java.io.IOException;

Expand All @@ -52,7 +44,6 @@ class RemoteCompletedFetch extends CompletedFetch {
LogScannerStatus logScannerStatus,
boolean isCheckCrc,
long fetchOffset,
@Nullable Projection projection,
Runnable recycleCallback) {
super(
tableBucket,
Expand All @@ -63,8 +54,7 @@ class RemoteCompletedFetch extends CompletedFetch {
readContext,
logScannerStatus,
isCheckCrc,
fetchOffset,
projection);
fetchOffset);
this.fileLogRecords = fileLogRecords;
this.recycleCallback = recycleCallback;
}
Expand All @@ -81,26 +71,4 @@ void drain() {
// call recycle to remove the fetched files and increment the prefetch semaphore
recycleCallback.run();
}

// TODO: optimize this to avoid deep copying the record.
// refactor #fetchRecords to return an iterator which lazily deserialize
// from underlying record stream and arrow buffer.
@Override
protected ScanRecord toScanRecord(LogRecord record) {
GenericRow newRow = new GenericRow(fieldGetters.length);
InternalRow internalRow = record.getRow();
for (int i = 0; i < fieldGetters.length; i++) {
newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow));
}
if (projection != null) {
return new ScanRecord(
record.logOffset(),
record.timestamp(),
record.getRowKind(),
ProjectedRow.from(projection.projection()).replaceRow(newRow));
} else {
return new ScanRecord(
record.logOffset(), record.timestamp(), record.getRowKind(), newRow);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import com.alibaba.fluss.record.FileLogRecords;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.remote.RemoteLogSegment;
import com.alibaba.fluss.utils.Projection;

import javax.annotation.Nullable;

/**
* {@link RemotePendingFetch} is a {@link PendingFetch} that represents a pending fetch that waiting
Expand All @@ -39,7 +36,6 @@ class RemotePendingFetch implements PendingFetch {
private final LogRecordReadContext readContext;
private final LogScannerStatus logScannerStatus;
private final boolean isCheckCrc;
private final @Nullable Projection projection;

RemotePendingFetch(
RemoteLogSegment remoteLogSegment,
Expand All @@ -49,8 +45,7 @@ class RemotePendingFetch implements PendingFetch {
long highWatermark,
LogRecordReadContext readContext,
LogScannerStatus logScannerStatus,
boolean isCheckCrc,
@Nullable Projection projection) {
boolean isCheckCrc) {
this.remoteLogSegment = remoteLogSegment;
this.downloadFuture = downloadFuture;
this.posInLogSegment = posInLogSegment;
Expand All @@ -59,7 +54,6 @@ class RemotePendingFetch implements PendingFetch {
this.readContext = readContext;
this.logScannerStatus = logScannerStatus;
this.isCheckCrc = isCheckCrc;
this.projection = projection;
}

@Override
Expand All @@ -83,7 +77,6 @@ public CompletedFetch toCompletedFetch() {
logScannerStatus,
isCheckCrc,
fetchOffset,
projection,
downloadFuture.getRecycleCallback());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private List<ScanRecord> parseLimitScanResponse(
}
} else {
LogRecordReadContext readContext =
LogRecordReadContext.createReadContext(tableInfo, null);
LogRecordReadContext.createReadContext(tableInfo, false, null);
LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer);
for (LogRecordBatch logRecordBatch : records.batches()) {
// A batch of log record maybe little more than limit, thus we need slice the
Expand Down
Loading

0 comments on commit 8b62653

Please sign in to comment.