diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index fcf31c34f..5ac139e7e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -605,7 +605,7 @@ public static List findPartitions( if (!StringUtils.isEmpty(readOptions.getFilterQuery())) { sql += " where " + readOptions.getFilterQuery(); } - logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql); + logger.info("Query SQL Sending to Doris FE is: '{}'.", sql); HttpPost httpPost = new HttpPost(getUriStr(options, logger) + QUERY_PLAN); String entity = "{\"sql\": \"" + sql + "\"}"; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index f327b9c07..b198ca327 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -181,7 +181,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { getDorisOptions(helper.getOptions()), getDorisReadOptions(helper.getOptions()), getDorisLookupOptions(helper.getOptions()), - physicalSchema); + physicalSchema, + context.getPhysicalRowDataType()); } private DorisOptions getDorisOptions(ReadableConfig readableConfig) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 9057f1f95..6b09735d4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -21,6 +21,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.source.AsyncTableFunctionProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.InputFormatProvider; @@ -31,8 +32,10 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.StringUtils; import org.apache.doris.flink.cfg.DorisLookupOptions; import org.apache.doris.flink.cfg.DorisOptions; @@ -45,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -58,23 +62,31 @@ * SourceFunction} and its {@link DeserializationSchema} for runtime. Both instances are * parameterized to return internal data structures (i.e. {@link RowData}). */ -public final class DorisDynamicTableSource implements ScanTableSource, LookupTableSource { +public final class DorisDynamicTableSource + implements ScanTableSource, + LookupTableSource, + SupportsFilterPushDown, + SupportsProjectionPushDown { private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSource.class); private final DorisOptions options; private final DorisReadOptions readOptions; private DorisLookupOptions lookupOptions; private TableSchema physicalSchema; + private List resolvedFilterQuery = new ArrayList<>(); + private DataType physicalRowDataType; public DorisDynamicTableSource( DorisOptions options, DorisReadOptions readOptions, DorisLookupOptions lookupOptions, - TableSchema physicalSchema) { + TableSchema physicalSchema, + DataType physicalRowDataType) { this.options = options; this.lookupOptions = lookupOptions; this.readOptions = readOptions; this.physicalSchema = physicalSchema; + this.physicalRowDataType = physicalRowDataType; } public DorisDynamicTableSource( @@ -93,8 +105,11 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); + readOptions.setFilterQuery(filterQuery); + String[] selectFields = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); readOptions.setReadFields( - Arrays.stream(physicalSchema.getFieldNames()) + Arrays.stream(selectFields) .map(item -> String.format("`%s`", item.trim().replace("`", ""))) .collect(Collectors.joining(", "))); @@ -114,7 +129,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon .setTableIdentifier(options.getTableIdentifier()) .setPartitions(dorisPartitions) .setReadOptions(readOptions) - .setRowType((RowType) physicalSchema.toRowDataType().getLogicalType()); + .setRowType((RowType) physicalRowDataType.getLogicalType()); return InputFormatProvider.of(builder.build()); } else { // Read data using the interface of the FLIP-27 specification @@ -124,10 +139,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon .setDorisOptions(options) .setDeserializer( new RowDataDeserializationSchema( - (RowType) - physicalSchema - .toRowDataType() - .getLogicalType())) + (RowType) physicalRowDataType.getLogicalType())) .build(); return SourceProvider.of(build); } @@ -135,7 +147,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { - DataType physicalRowDataType = physicalSchema.toRowDataType(); String[] keyNames = new String[context.getKeys().length]; int[] keyIndexs = new int[context.getKeys().length]; for (int i = 0; i < keyNames.length; i++) { @@ -168,11 +179,43 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { @Override public DynamicTableSource copy() { - return new DorisDynamicTableSource(options, readOptions, lookupOptions, physicalSchema); + DorisDynamicTableSource newSource = + new DorisDynamicTableSource( + options, readOptions, lookupOptions, physicalSchema, physicalRowDataType); + newSource.resolvedFilterQuery = new ArrayList<>(this.resolvedFilterQuery); + return newSource; } @Override public String asSummaryString() { return "Doris Table Source"; } + + @Override + public Result applyFilters(List filters) { + List acceptedFilters = new ArrayList<>(); + List remainingFilters = new ArrayList<>(); + + DorisExpressionVisitor expressionVisitor = new DorisExpressionVisitor(); + for (ResolvedExpression filter : filters) { + String filterQuery = filter.accept(expressionVisitor); + if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) { + acceptedFilters.add(filter); + this.resolvedFilterQuery.add(filterQuery); + } else { + remainingFilters.add(filter); + } + } + return Result.of(acceptedFilters, remainingFilters); + } + + @Override + public boolean supportsNestedProjection() { + return false; + } + + @Override + public void applyProjection(int[][] projectedFields, DataType producedDataType) { + this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java new file mode 100644 index 000000000..3f327fe2d --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.table; + +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionVisitor; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.TypeLiteralExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import java.util.List; + +public class DorisExpressionVisitor implements ExpressionVisitor { + + @Override + public String visit(CallExpression call) { + if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) { + return combineExpression("=", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) { + return combineExpression("<", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { + return combineExpression("<=", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) { + return combineExpression(">", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) { + return combineExpression(">=", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) { + return combineExpression("<>", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) { + return combineExpression("OR", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) { + return combineExpression("AND", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.LIKE.equals(call.getFunctionDefinition())) { + return combineExpression("LIKE", call.getResolvedChildren()); + } + if (BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) { + return combineLeftExpression("IS NULL", call.getResolvedChildren().get(0)); + } + if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) { + return combineLeftExpression("IS NOT NULL", call.getResolvedChildren().get(0)); + } + return null; + } + + private String combineExpression(String operator, List operand) { + String left = operand.get(0).accept(this); + String right = operand.get(1).accept(this); + return String.format("(%s %s %s)", left, operator, right); + } + + private String combineLeftExpression(String operator, ResolvedExpression operand) { + String left = operand.accept(this); + return String.format("(%s %s)", left, operator); + } + + @Override + public String visit(ValueLiteralExpression valueLiteral) { + LogicalTypeRoot typeRoot = valueLiteral.getOutputDataType().getLogicalType().getTypeRoot(); + if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) + || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) + || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE) + || typeRoot.equals(LogicalTypeRoot.DATE)) { + return "'" + valueLiteral + "'"; + } + return valueLiteral.toString(); + } + + @Override + public String visit(FieldReferenceExpression fieldReference) { + return fieldReference.getName(); + } + + @Override + public String visit(TypeLiteralExpression typeLiteral) { + return typeLiteral.getOutputDataType().toString(); + } + + @Override + public String visit(Expression expression) { + return null; + } +}