Skip to content

Commit

Permalink
SupportsFilterPushDown and ProjectionPushDown
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Mar 20, 2024
1 parent 61372f3 commit 5c3da4e
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ public static List<PartitionDefinition> findPartitions(
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " where " + readOptions.getFilterQuery();
}
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);

HttpPost httpPost = new HttpPost(getUriStr(options, logger) + QUERY_PLAN);
String entity = "{\"sql\": \"" + sql + "\"}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisLookupOptions(helper.getOptions()),
physicalSchema);
physicalSchema,
context.getPhysicalRowDataType());
}

private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
Expand All @@ -31,8 +32,10 @@
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.StringUtils;

import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
Expand All @@ -45,6 +48,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -58,23 +62,31 @@
* SourceFunction} and its {@link DeserializationSchema} for runtime. Both instances are
* parameterized to return internal data structures (i.e. {@link RowData}).
*/
public final class DorisDynamicTableSource implements ScanTableSource, LookupTableSource {
public final class DorisDynamicTableSource
implements ScanTableSource,
LookupTableSource,
SupportsFilterPushDown,
SupportsProjectionPushDown {

private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSource.class);
private final DorisOptions options;
private final DorisReadOptions readOptions;
private DorisLookupOptions lookupOptions;
private TableSchema physicalSchema;
private List<String> resolvedFilterQuery = new ArrayList<>();
private DataType physicalRowDataType;

public DorisDynamicTableSource(
DorisOptions options,
DorisReadOptions readOptions,
DorisLookupOptions lookupOptions,
TableSchema physicalSchema) {
TableSchema physicalSchema,
DataType physicalRowDataType) {
this.options = options;
this.lookupOptions = lookupOptions;
this.readOptions = readOptions;
this.physicalSchema = physicalSchema;
this.physicalRowDataType = physicalRowDataType;
}

public DorisDynamicTableSource(
Expand All @@ -93,8 +105,11 @@ public ChangelogMode getChangelogMode() {

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
readOptions.setFilterQuery(filterQuery);
String[] selectFields = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
readOptions.setReadFields(
Arrays.stream(physicalSchema.getFieldNames())
Arrays.stream(selectFields)
.map(item -> String.format("`%s`", item.trim().replace("`", "")))
.collect(Collectors.joining(", ")));

Expand All @@ -114,7 +129,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
.setTableIdentifier(options.getTableIdentifier())
.setPartitions(dorisPartitions)
.setReadOptions(readOptions)
.setRowType((RowType) physicalSchema.toRowDataType().getLogicalType());
.setRowType((RowType) physicalRowDataType.getLogicalType());
return InputFormatProvider.of(builder.build());
} else {
// Read data using the interface of the FLIP-27 specification
Expand All @@ -124,18 +139,14 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
.setDorisOptions(options)
.setDeserializer(
new RowDataDeserializationSchema(
(RowType)
physicalSchema
.toRowDataType()
.getLogicalType()))
(RowType) physicalRowDataType.getLogicalType()))
.build();
return SourceProvider.of(build);
}
}

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
DataType physicalRowDataType = physicalSchema.toRowDataType();
String[] keyNames = new String[context.getKeys().length];
int[] keyIndexs = new int[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
Expand Down Expand Up @@ -168,11 +179,43 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {

@Override
public DynamicTableSource copy() {
return new DorisDynamicTableSource(options, readOptions, lookupOptions, physicalSchema);
DorisDynamicTableSource newSource =
new DorisDynamicTableSource(
options, readOptions, lookupOptions, physicalSchema, physicalRowDataType);
newSource.resolvedFilterQuery = new ArrayList<>(this.resolvedFilterQuery);
return newSource;
}

@Override
public String asSummaryString() {
return "Doris Table Source";
}

@Override
public Result applyFilters(List<ResolvedExpression> filters) {
List<ResolvedExpression> acceptedFilters = new ArrayList<>();
List<ResolvedExpression> remainingFilters = new ArrayList<>();

DorisExpressionVisitor expressionVisitor = new DorisExpressionVisitor();
for (ResolvedExpression filter : filters) {
String filterQuery = filter.accept(expressionVisitor);
if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
acceptedFilters.add(filter);
this.resolvedFilterQuery.add(filterQuery);
} else {
remainingFilters.add(filter);
}
}
return Result.of(acceptedFilters, remainingFilters);
}

@Override
public boolean supportsNestedProjection() {
return false;
}

@Override
public void applyProjection(int[][] projectedFields, DataType producedDataType) {
this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.table;

import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionVisitor;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.TypeLiteralExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import java.util.List;

public class DorisExpressionVisitor implements ExpressionVisitor<String> {

@Override
public String visit(CallExpression call) {
if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
return combineExpression("=", call.getResolvedChildren());
}
if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
return combineExpression("<", call.getResolvedChildren());
}
if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
return combineExpression("<=", call.getResolvedChildren());
}
if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
return combineExpression(">", call.getResolvedChildren());
}
if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
return combineExpression(">=", call.getResolvedChildren());
}
if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
return combineExpression("<>", call.getResolvedChildren());
}
if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
return combineExpression("OR", call.getResolvedChildren());
}
if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
return combineExpression("AND", call.getResolvedChildren());
}
if (BuiltInFunctionDefinitions.LIKE.equals(call.getFunctionDefinition())) {
return combineExpression("LIKE", call.getResolvedChildren());
}
if (BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) {
return combineLeftExpression("IS NULL", call.getResolvedChildren().get(0));
}
if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
return combineLeftExpression("IS NOT NULL", call.getResolvedChildren().get(0));
}
return null;
}

private String combineExpression(String operator, List<ResolvedExpression> operand) {
String left = operand.get(0).accept(this);
String right = operand.get(1).accept(this);
return String.format("(%s %s %s)", left, operator, right);
}

private String combineLeftExpression(String operator, ResolvedExpression operand) {
String left = operand.accept(this);
return String.format("(%s %s)", left, operator);
}

@Override
public String visit(ValueLiteralExpression valueLiteral) {
LogicalTypeRoot typeRoot = valueLiteral.getOutputDataType().getLogicalType().getTypeRoot();
if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
|| typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
|| typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
|| typeRoot.equals(LogicalTypeRoot.DATE)) {
return "'" + valueLiteral + "'";
}
return valueLiteral.toString();
}

@Override
public String visit(FieldReferenceExpression fieldReference) {
return fieldReference.getName();
}

@Override
public String visit(TypeLiteralExpression typeLiteral) {
return typeLiteral.getOutputDataType().toString();
}

@Override
public String visit(Expression expression) {
return null;
}
}

0 comments on commit 5c3da4e

Please sign in to comment.