Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Dec 19, 2024
1 parent 8d46783 commit 316bdfb
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

/**
* A reader to read kv snapshot files to {@link ScanRecord}s. It will return the {@link ScanRecord}s
* as a iterator.
* as an iterator.
*/
@NotThreadSafe
class SnapshotFilesReader implements Iterator<ScanRecord>, AutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import com.alibaba.fluss.row.ProjectedRow;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.ProjectedRowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,16 +80,6 @@ public FlinkAsyncLookupFunction(
this.projection = projection;
}

private RowType toPkRowType(RowType rowType, int[] pkIndex) {
LogicalType[] types = new LogicalType[pkIndex.length];
String[] names = new String[pkIndex.length];
for (int i = 0; i < pkIndex.length; i++) {
types[i] = rowType.getTypeAt(pkIndex[i]);
names[i] = rowType.getFieldNames().get(pkIndex[i]);
}
return RowType.of(rowType.isNullable(), types, names);
}

@Override
public void open(FunctionContext context) {
LOG.info("start open ...");
Expand All @@ -100,9 +88,12 @@ public void open(FunctionContext context) {
// TODO: convert to Fluss GenericRow to avoid unnecessary deserialization
flinkRowToFlussRowConverter =
FlinkRowToFlussRowConverter.create(
toPkRowType(flinkRowType, pkIndexes), table.getDescriptor().getKvFormat());
FlinkLookupFunction.filterRowType(flinkRowType, pkIndexes),
table.getDescriptor().getKvFormat());
flussRowToFlinkRowConverter =
new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(flinkRowType));
new FlussRowToFlinkRowConverter(
FlinkConversions.toFlussRowType(
FlinkLookupFunction.filterRowType(flinkRowType, projection)));
LOG.info("end open.");
}

Expand Down Expand Up @@ -178,22 +169,13 @@ private void fetchResult(
if (remainingFilter != null && !remainingFilter.isMatch(flinkRow)) {
resultFuture.complete(Collections.emptyList());
} else {
resultFuture.complete(
Collections.singletonList(maybeProject(flinkRow)));
resultFuture.complete(Collections.singletonList(flinkRow));
}
}
}
});
}

private RowData maybeProject(RowData row) {
if (projection == null) {
return row;
}
// should not reuse objects for async operations
return ProjectedRowData.from(projection).replaceRow(row);
}

@Override
public void close() throws Exception {
LOG.info("start close ...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ public FlinkLookupFunction(
this.projection = projection;
}

private RowType toPkRowType(RowType rowType, int[] pkIndex) {
LogicalType[] types = new LogicalType[pkIndex.length];
String[] names = new String[pkIndex.length];
for (int i = 0; i < pkIndex.length; i++) {
types[i] = rowType.getTypeAt(pkIndex[i]);
names[i] = rowType.getFieldNames().get(pkIndex[i]);
static RowType filterRowType(RowType rowType, int[] filterIndex) {
LogicalType[] types = new LogicalType[filterIndex.length];
String[] names = new String[filterIndex.length];
for (int i = 0; i < filterIndex.length; i++) {
types[i] = rowType.getTypeAt(filterIndex[i]);
names[i] = rowType.getFieldNames().get(filterIndex[i]);
}
return RowType.of(rowType.isNullable(), types, names);
}
Expand All @@ -93,15 +93,14 @@ public void open(FunctionContext context) {
LOG.info("start open ...");
connection = ConnectionFactory.createConnection(flussConfig);
table = connection.getTable(tablePath);
if (projection != null) {
projectedRowData = ProjectedRowData.from(projection);
}
// TODO: convert to Fluss GenericRow to avoid unnecessary deserialization
flinkRowToFlussRowConverter =
FlinkRowToFlussRowConverter.create(
toPkRowType(flinkRowType, pkIndexes), table.getDescriptor().getKvFormat());
filterRowType(flinkRowType, pkIndexes),
table.getDescriptor().getKvFormat());
flussRowToFlinkRowConverter =
new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(flinkRowType));
new FlussRowToFlinkRowConverter(
FlinkConversions.toFlussRowType(filterRowType(flinkRowType, projection)));
LOG.info("end open.");
}

Expand Down Expand Up @@ -129,7 +128,7 @@ public Collection<RowData> lookup(RowData keyRow) {
flussRowToFlinkRowConverter.toFlinkRowData(
ProjectedRow.from(projection).replaceRow(row));
if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) {
return Collections.singletonList(maybeProject(flinkRow));
return Collections.singletonList(flinkRow);
} else {
return Collections.emptyList();
}
Expand All @@ -154,13 +153,6 @@ public Collection<RowData> lookup(RowData keyRow) {
return Collections.emptyList();
}

private RowData maybeProject(RowData row) {
if (projectedRowData == null) {
return row;
}
return projectedRowData.replaceRow(row);
}

@Override
public void close() throws Exception {
LOG.info("start close ...");
Expand Down

0 comments on commit 316bdfb

Please sign in to comment.