From 89c83355baeca56001394219d8a2637ec2270759 Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Mon, 18 Dec 2023 07:49:56 -0800 Subject: [PATCH] create new function registry mechansim 1. FunctionRegistry keeps the old FUNCTION_INFO_MAP only 2. moved Calcite Catalog-based schema.Function registry to its own package; along with a SqlOperator based PinotOperatorTable 3. both CatalogReader and OperatorTable utilizes ground truth function from PinotFunctionRegistry --> will be default once deprecate FunctionRegistry 4. PinotFunctionRegistry provides argument-type based lookup via the same method SqlValidator utilize to lookup routine (and lookup operator overload) 5. clean up multi-stage engine side accordingly --- .../common/function/FunctionRegistry.java | 145 ++++------- .../function/registry/PinotFunction.java | 29 +++ .../registry/PinotScalarFunction.java | 83 ++++++ .../sql}/PinotCalciteCatalogReader.java | 6 +- .../function/sql/PinotFunctionRegistry.java | 237 ++++++++++++++++++ .../function/sql/PinotOperatorTable.java | 101 ++++++++ .../function}/sql/PinotSqlAggFunction.java | 6 +- .../sql/PinotSqlTransformFunction.java | 5 +- .../calcite/jdbc/CalciteSchemaBuilder.java | 6 +- .../PinotAggregateExchangeNodeInsertRule.java | 2 +- .../{fun => }/PinotSqlCoalesceFunction.java | 5 +- .../calcite/sql/fun/PinotOperatorTable.java | 171 ------------- .../sql/util/PinotSqlStdOperatorTable.java | 98 ++++++++ .../apache/pinot/query/QueryEnvironment.java | 6 +- 14 files changed, 620 insertions(+), 280 deletions(-) create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotFunction.java create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotScalarFunction.java rename {pinot-query-planner/src/main/java/org/apache/calcite/prepare => pinot-common/src/main/java/org/apache/pinot/common/function/sql}/PinotCalciteCatalogReader.java (98%) create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotFunctionRegistry.java create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotOperatorTable.java rename {pinot-query-planner/src/main/java/org/apache/calcite => pinot-common/src/main/java/org/apache/pinot/common/function}/sql/PinotSqlAggFunction.java (91%) rename {pinot-query-planner/src/main/java/org/apache/calcite => pinot-common/src/main/java/org/apache/pinot/common/function}/sql/PinotSqlTransformFunction.java (89%) rename pinot-query-planner/src/main/java/org/apache/calcite/sql/{fun => }/PinotSqlCoalesceFunction.java (90%) delete mode 100644 pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java create mode 100644 pinot-query-planner/src/main/java/org/apache/calcite/sql/util/PinotSqlStdOperatorTable.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java index f02007c4d7b4..e5ee9899d463 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java @@ -23,7 +23,6 @@ import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -31,11 +30,10 @@ import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nullable; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.impl.ReflectiveFunctionBase; -import org.apache.calcite.util.NameMultimap; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.function.registry.PinotFunction; +import org.apache.pinot.common.function.registry.PinotScalarFunction; +import org.apache.pinot.common.function.sql.PinotFunctionRegistry; import org.apache.pinot.spi.annotations.ScalarFunction; import org.apache.pinot.spi.utils.PinotReflectionUtils; import org.slf4j.Logger; @@ -46,15 +44,13 @@ * Registry for scalar functions. */ public class FunctionRegistry { + public static final boolean CASE_SENSITIVITY = false; private static final Logger LOGGER = LoggerFactory.getLogger(FunctionRegistry.class); - // TODO: remove both when FunctionOperatorTable is in used. private static final Map> FUNCTION_INFO_MAP = new HashMap<>(); - private static final NameMultimap FUNCTION_MAP = new NameMultimap<>(); private FunctionRegistry() { } - /** * Registers the scalar functions via reflection. * NOTE: In order to plugin methods using reflection, the methods should be inside a class that includes ".function." @@ -78,8 +74,8 @@ private FunctionRegistry() { FunctionRegistry.registerFunction(method, scalarFunctionNames, nullableParameters); } } - LOGGER.info("Initialized FunctionRegistry with {} functions: {} in {}ms", FUNCTION_MAP.map().size(), - FUNCTION_MAP.map().keySet(), System.currentTimeMillis() - startTimeMs); + LOGGER.info("Initialized FunctionRegistry with {} functions: {} in {}ms", FUNCTION_INFO_MAP.size(), + FUNCTION_INFO_MAP.keySet(), System.currentTimeMillis() - startTimeMs); } /** @@ -90,50 +86,29 @@ private FunctionRegistry() { public static void init() { } - /** - * Registers a method with the name of the method. - */ @VisibleForTesting public static void registerFunction(Method method, boolean nullableParameters) { registerFunction(method, Collections.singleton(method.getName()), nullableParameters); } - private static void registerFunction(Method method, Set alias, boolean nullableParameters) { - if (method.getAnnotation(Deprecated.class) == null) { - for (String name : alias) { - registerFunctionInfoMap(name, method, nullableParameters); - registerCalciteNamedFunctionMap(name, method, nullableParameters); - } - } - } - - private static void registerFunctionInfoMap(String functionName, Method method, boolean nullableParameters) { - FunctionInfo functionInfo = new FunctionInfo(method, method.getDeclaringClass(), nullableParameters); - String canonicalName = canonicalize(functionName); - Map functionInfoMap = FUNCTION_INFO_MAP.computeIfAbsent(canonicalName, k -> new HashMap<>()); - FunctionInfo existFunctionInfo = functionInfoMap.put(method.getParameterCount(), functionInfo); - Preconditions.checkState(existFunctionInfo == null || existFunctionInfo.getMethod() == functionInfo.getMethod(), - "Function: %s with %s parameters is already registered", functionName, method.getParameterCount()); - } - - private static void registerCalciteNamedFunctionMap(String name, Method method, boolean nullableParameters) { - FUNCTION_MAP.put(name, new PinotScalarFunction(name, method, nullableParameters)); - } - - public static Map> getRegisteredCalciteFunctionMap() { - return FUNCTION_MAP.map(); + @VisibleForTesting + public static Set getRegisteredCalciteFunctionNames() { + return PinotFunctionRegistry.getFunctionMap().map().keySet(); } - public static Set getRegisteredCalciteFunctionNames() { - return FUNCTION_MAP.map().keySet(); + /** + * Returns the full list of all registered ScalarFunction to Calcite. + */ + public static Map> getRegisteredCalciteFunctionMap() { + return PinotFunctionRegistry.getFunctionMap().map(); } /** * Returns {@code true} if the given function name is registered, {@code false} otherwise. */ public static boolean containsFunction(String functionName) { - // TODO: remove fallback to FUNCTION_INFO_MAP - return FUNCTION_MAP.containsKey(canonicalize(functionName), false) + // TODO: remove deprecated FUNCTION_INFO_MAP + return PinotFunctionRegistry.getFunctionMap().containsKey(functionName, CASE_SENSITIVITY) || FUNCTION_INFO_MAP.containsKey(canonicalize(functionName)); } @@ -143,40 +118,54 @@ public static boolean containsFunction(String functionName) { * methods are already registered. */ @Nullable - public static FunctionInfo getFunctionInfo(String functionName, int numParameters) { - // TODO: remove fallback to FUNCTION_INFO_MAP + public static FunctionInfo getFunctionInfo(String functionName, int numParams) { try { - return getFunctionInfoFromCalciteNamedMap(functionName, numParameters); + return getFunctionInfoFromCalciteNamedMap(functionName, numParams); } catch (IllegalArgumentException iae) { - return getFunctionInfoFromFunctionInfoMap(functionName, numParameters); + // TODO: remove deprecated FUNCTION_INFO_MAP + return getFunctionInfoFromFunctionInfoMap(functionName, numParams); } } + // TODO: remove deprecated FUNCTION_INFO_MAP + private static void registerFunction(Method method, Set alias, boolean nullableParameters) { + if (method.getAnnotation(Deprecated.class) == null) { + for (String name : alias) { + registerFunctionInfoMap(name, method, nullableParameters); + } + } + } + + private static void registerFunctionInfoMap(String functionName, Method method, boolean nullableParameters) { + FunctionInfo functionInfo = new FunctionInfo(method, method.getDeclaringClass(), nullableParameters); + String canonicalName = canonicalize(functionName); + Map functionInfoMap = FUNCTION_INFO_MAP.computeIfAbsent(canonicalName, k -> new HashMap<>()); + FunctionInfo existFunctionInfo = functionInfoMap.put(method.getParameterCount(), functionInfo); + Preconditions.checkState(existFunctionInfo == null || existFunctionInfo.getMethod() == functionInfo.getMethod(), + "Function: %s with %s parameters is already registered", functionName, method.getParameterCount()); + } + @Nullable - private static FunctionInfo getFunctionInfoFromFunctionInfoMap(String functionName, int numParameters) { + private static FunctionInfo getFunctionInfoFromFunctionInfoMap(String functionName, int numParams) { Map functionInfoMap = FUNCTION_INFO_MAP.get(canonicalize(functionName)); - return functionInfoMap != null ? functionInfoMap.get(numParameters) : null; + return functionInfoMap != null ? functionInfoMap.get(numParams) : null; } @Nullable - private static FunctionInfo getFunctionInfoFromCalciteNamedMap(String functionName, int numParameters) { - List candidates = findByNumParameters(FUNCTION_MAP.range(functionName, false), numParameters); - if (candidates.size() <= 1) { - return candidates.size() == 1 ? candidates.get(0).getFunctionInfo() : null; + private static FunctionInfo getFunctionInfoFromCalciteNamedMap(String functionName, int numParams) { + List candidates = PinotFunctionRegistry.getFunctionMap() + .range(functionName, CASE_SENSITIVITY).stream() + .filter(e -> e.getValue() instanceof PinotScalarFunction && e.getValue().getParameters().size() == numParams) + .map(e -> (PinotScalarFunction) e.getValue()).collect(Collectors.toList()); + if (candidates.size() == 1) { + return candidates.get(0).getFunctionInfo(); } else { throw new IllegalArgumentException( - "Unable to lookup function: " + functionName + " by parameter count: " + numParameters - + " Found multiple candidates. Try to use argument types to resolve the correct one!"); + "Unable to lookup function: " + functionName + " by parameter count: " + numParams + " Found " + + candidates.size() + " candidates. Try to use argument types to resolve the correct one!"); } } - private static List findByNumParameters( - Collection> scalarFunctionList, int numParameters) { - return scalarFunctionList == null ? Collections.emptyList() - : scalarFunctionList.stream().filter(e -> e.getValue().getParameters().size() == numParameters) - .map(Map.Entry::getValue).collect(Collectors.toList()); - } - private static String canonicalize(String functionName) { return StringUtils.remove(functionName, '_').toLowerCase(); } @@ -207,40 +196,4 @@ public static double vectorSimilarity(float[] vector1, float[] vector2) { throw new UnsupportedOperationException("Placeholder scalar function, should not reach here"); } } - - /** - * Pinot specific implementation of the {@link org.apache.calcite.schema.ScalarFunction}. - * - * @see "{@link org.apache.calcite.schema.impl.ScalarFunctionImpl}" - */ - public static class PinotScalarFunction extends ReflectiveFunctionBase - implements org.apache.calcite.schema.ScalarFunction { - private final FunctionInfo _functionInfo; - private final String _name; - private final Method _method; - - public PinotScalarFunction(String name, Method method, boolean isNullableParameter) { - super(method); - _name = name; - _method = method; - _functionInfo = new FunctionInfo(method, method.getDeclaringClass(), isNullableParameter); - } - - @Override - public RelDataType getReturnType(RelDataTypeFactory typeFactory) { - return typeFactory.createJavaType(method.getReturnType()); - } - - public String getName() { - return _name; - } - - public Method getMethod() { - return _method; - } - - public FunctionInfo getFunctionInfo() { - return _functionInfo; - } - } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotFunction.java b/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotFunction.java new file mode 100644 index 000000000000..f0e756513739 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotFunction.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function.registry; + +import org.apache.calcite.schema.Function; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; +import org.apache.calcite.sql.type.SqlReturnTypeInference; + + +public interface PinotFunction extends Function { + SqlOperandTypeChecker getOperandTypeChecker(); + SqlReturnTypeInference getReturnTypeInference(); +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotScalarFunction.java b/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotScalarFunction.java new file mode 100644 index 000000000000..c1708aab4203 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotScalarFunction.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function.registry; + +import java.lang.reflect.Method; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScalarFunction; +import org.apache.calcite.schema.impl.ReflectiveFunctionBase; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.pinot.common.function.FunctionInfo; + + +/** + * Pinot specific implementation of the {@link ScalarFunction}. + * + * @see "{@link org.apache.calcite.schema.impl.ScalarFunctionImpl}" + */ +public class PinotScalarFunction extends ReflectiveFunctionBase implements PinotFunction, ScalarFunction { + private final FunctionInfo _functionInfo; + private final String _name; + private final Method _method; + private final SqlOperandTypeChecker _sqlOperandTypeChecker; + private final SqlReturnTypeInference _sqlReturnTypeInference; + + public PinotScalarFunction(String name, Method method, boolean isNullableParameter) { + this(name, method, isNullableParameter, null, null); + } + + public PinotScalarFunction(String name, Method method, boolean isNullableParameter, + SqlOperandTypeChecker sqlOperandTypeChecker, SqlReturnTypeInference sqlReturnTypeInference) { + super(method); + _name = name; + _method = method; + _functionInfo = new FunctionInfo(method, method.getDeclaringClass(), isNullableParameter); + _sqlOperandTypeChecker = sqlOperandTypeChecker; + _sqlReturnTypeInference = sqlReturnTypeInference; + } + + @Override + public RelDataType getReturnType(RelDataTypeFactory typeFactory) { + return typeFactory.createJavaType(method.getReturnType()); + } + + public String getName() { + return _name; + } + + public Method getMethod() { + return _method; + } + + public FunctionInfo getFunctionInfo() { + return _functionInfo; + } + + @Override + public SqlOperandTypeChecker getOperandTypeChecker() { + return _sqlOperandTypeChecker; + } + + @Override + public SqlReturnTypeInference getReturnTypeInference() { + return _sqlReturnTypeInference; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/prepare/PinotCalciteCatalogReader.java b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotCalciteCatalogReader.java similarity index 98% rename from pinot-query-planner/src/main/java/org/apache/calcite/prepare/PinotCalciteCatalogReader.java rename to pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotCalciteCatalogReader.java index 84c71be601f0..672689dff5c5 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/prepare/PinotCalciteCatalogReader.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotCalciteCatalogReader.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.calcite.prepare; +package org.apache.pinot.common.function.sql; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -35,6 +35,8 @@ import org.apache.calcite.linq4j.function.Hints; import org.apache.calcite.model.ModelHandler; import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.prepare.RelOptTableImpl; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; @@ -310,7 +312,7 @@ public static SqlOperatorTable operatorTable(String... classNames) { } /** Converts a function to a {@link org.apache.calcite.sql.SqlOperator}. */ - private static SqlOperator toOp(SqlIdentifier name, + public static SqlOperator toOp(SqlIdentifier name, final org.apache.calcite.schema.Function function) { final Function> argTypesFactory = typeFactory -> function.getParameters() diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotFunctionRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotFunctionRegistry.java new file mode 100644 index 000000000000..776a95927255 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotFunctionRegistry.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function.sql; + +import com.google.common.annotations.VisibleForTesting; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Function; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.validate.SqlNameMatchers; +import org.apache.calcite.sql.validate.SqlUserDefinedFunction; +import org.apache.calcite.util.NameMultimap; +import org.apache.pinot.common.function.TransformFunctionType; +import org.apache.pinot.common.function.registry.PinotFunction; +import org.apache.pinot.common.function.registry.PinotScalarFunction; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.annotations.ScalarFunction; +import org.apache.pinot.spi.utils.PinotReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Registry for scalar functions. + */ +public class PinotFunctionRegistry { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotFunctionRegistry.class); + private static final NameMultimap OPERATOR_MAP = new NameMultimap<>(); + private static final NameMultimap FUNCTION_MAP = new NameMultimap<>(); + + private PinotFunctionRegistry() { + } + + /** + * Registers the scalar functions via reflection. + * NOTE: In order to plugin methods using reflection, the methods should be inside a class that includes ".function." + * in its class path. This convention can significantly reduce the time of class scanning. + */ + static { + // REGISTER FUNCTIONS + long startTimeMs = System.currentTimeMillis(); + Set methods = PinotReflectionUtils.getMethodsThroughReflection(".*\\.function\\..*", ScalarFunction.class); + for (Method method : methods) { + if (!Modifier.isPublic(method.getModifiers())) { + continue; + } + ScalarFunction scalarFunction = method.getAnnotation(ScalarFunction.class); + if (scalarFunction.enabled()) { + // Parse annotated function names and alias + Set scalarFunctionNames = Arrays.stream(scalarFunction.names()).collect(Collectors.toSet()); + if (scalarFunctionNames.size() == 0) { + scalarFunctionNames.add(method.getName()); + } + boolean nullableParameters = scalarFunction.nullableParameters(); + PinotFunctionRegistry.registerFunction(method, scalarFunctionNames, nullableParameters); + } + } + LOGGER.info("Initialized FunctionRegistry with {} functions: {} in {}ms", FUNCTION_MAP.map().size(), + FUNCTION_MAP.map().keySet(), System.currentTimeMillis() - startTimeMs); + + // REGISTER OPERATORS + // Walk through all the Pinot aggregation types and + // 1. register those that are supported in multistage in addition to calcite standard opt table. + // 2. register special handling that differs from calcite standard. + for (AggregationFunctionType aggregationFunctionType : AggregationFunctionType.values()) { + if (aggregationFunctionType.getSqlKind() != null) { + // 1. Register the aggregation function with Calcite + registerAggregateFunction(aggregationFunctionType.getName(), aggregationFunctionType); + // 2. Register the aggregation function with Calcite on all alternative names + List alternativeFunctionNames = aggregationFunctionType.getAlternativeNames(); + for (String alternativeFunctionName : alternativeFunctionNames) { + registerAggregateFunction(alternativeFunctionName, aggregationFunctionType); + } + } + } + + // Walk through all the Pinot transform types and + // 1. register those that are supported in multistage in addition to calcite standard opt table. + // 2. register special handling that differs from calcite standard. + for (TransformFunctionType transformFunctionType : TransformFunctionType.values()) { + if (transformFunctionType.getSqlKind() != null) { + // 1. Register the transform function with Calcite + registerTransformFunction(transformFunctionType.getName(), transformFunctionType); + // 2. Register the transform function with Calcite on all alternative names + List alternativeFunctionNames = transformFunctionType.getAlternativeNames(); + for (String alternativeFunctionName : alternativeFunctionNames) { + registerTransformFunction(alternativeFunctionName, transformFunctionType); + } + } + } + } + + public static void init() { + } + + @VisibleForTesting + public static void registerFunction(Method method, boolean nullableParameters) { + registerFunction(method, Collections.singleton(method.getName()), nullableParameters); + } + + public static NameMultimap getFunctionMap() { + return FUNCTION_MAP; + } + + public static NameMultimap getOperatorMap() { + return OPERATOR_MAP; + } + + @Nullable + public static PinotScalarFunction getScalarFunction(SqlOperatorTable operatorTable, RelDataTypeFactory typeFactory, + String functionName, List argTypes) { + List relArgTypes = convertArgumentTypes(typeFactory, argTypes); + SqlOperator sqlOperator = SqlUtil.lookupRoutine(operatorTable, typeFactory, + new SqlIdentifier(functionName, SqlParserPos.QUOTED_ZERO), relArgTypes, null, null, SqlSyntax.FUNCTION, + SqlKind.OTHER_FUNCTION, SqlNameMatchers.withCaseSensitive(false), true); + if (sqlOperator instanceof SqlUserDefinedFunction) { + Function function = ((SqlUserDefinedFunction) sqlOperator).getFunction(); + if (function instanceof PinotScalarFunction) { + return (PinotScalarFunction) function; + } + } + return null; + } + + private static void registerFunction(Method method, Set alias, boolean nullableParameters) { + if (method.getAnnotation(Deprecated.class) == null) { + for (String name : alias) { + registerCalciteNamedFunctionMap(name, method, nullableParameters); + } + } + } + + private static void registerCalciteNamedFunctionMap(String name, Method method, boolean nullableParameters) { + FUNCTION_MAP.put(name, new PinotScalarFunction(name, method, nullableParameters)); + } + + private static List convertArgumentTypes(RelDataTypeFactory typeFactory, + List argTypes) { + return argTypes.stream().map(type -> toRelType(typeFactory, type)).collect(Collectors.toList()); + } + + private static RelDataType toRelType(RelDataTypeFactory typeFactory, DataSchema.ColumnDataType dataType) { + switch (dataType) { + case INT: + return typeFactory.createSqlType(SqlTypeName.INTEGER); + case LONG: + return typeFactory.createSqlType(SqlTypeName.BIGINT); + case FLOAT: + return typeFactory.createSqlType(SqlTypeName.REAL); + case DOUBLE: + return typeFactory.createSqlType(SqlTypeName.DOUBLE); + case BIG_DECIMAL: + return typeFactory.createSqlType(SqlTypeName.DECIMAL); + case BOOLEAN: + return typeFactory.createSqlType(SqlTypeName.BOOLEAN); + case TIMESTAMP: + return typeFactory.createSqlType(SqlTypeName.TIMESTAMP); + case JSON: + case STRING: + return typeFactory.createSqlType(SqlTypeName.VARCHAR); + case BYTES: + return typeFactory.createSqlType(SqlTypeName.VARBINARY); + case INT_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.INTEGER), -1); + case LONG_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.BIGINT), -1); + case FLOAT_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.REAL), -1); + case DOUBLE_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.DOUBLE), -1); + case BOOLEAN_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.BOOLEAN), -1); + case TIMESTAMP_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.TIMESTAMP), -1); + case STRING_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.VARCHAR), -1); + case BYTES_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.VARBINARY), -1); + case UNKNOWN: + case OBJECT: + default: + return typeFactory.createSqlType(SqlTypeName.ANY); + } + } + + private static void registerAggregateFunction(String functionName, AggregationFunctionType functionType) { + if (functionType.getOperandTypeChecker() != null && functionType.getReturnTypeInference() != null) { + PinotSqlAggFunction sqlAggFunction = new PinotSqlAggFunction(functionName.toUpperCase(Locale.ROOT), null, + functionType.getSqlKind(), functionType.getReturnTypeInference(), null, + functionType.getOperandTypeChecker(), functionType.getSqlFunctionCategory()); + OPERATOR_MAP.put(functionName.toUpperCase(Locale.ROOT), sqlAggFunction); + } + } + + private static void registerTransformFunction(String functionName, TransformFunctionType functionType) { + if (functionType.getOperandTypeChecker() != null && functionType.getReturnTypeInference() != null) { + PinotSqlTransformFunction sqlTransformFunction = + new PinotSqlTransformFunction(functionName.toUpperCase(Locale.ROOT), + functionType.getSqlKind(), functionType.getReturnTypeInference(), null, + functionType.getOperandTypeChecker(), functionType.getSqlFunctionCategory()); + OPERATOR_MAP.put(functionName.toUpperCase(Locale.ROOT), sqlTransformFunction); + } + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotOperatorTable.java b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotOperatorTable.java new file mode 100644 index 000000000000..ca4513a5ba73 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotOperatorTable.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function.sql; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.sql.validate.SqlNameMatcher; +import org.apache.pinot.common.function.FunctionRegistry; +import org.checkerframework.checker.nullness.qual.Nullable; + + +/** + * Temporary implementation of all dynamic arg/return type inference operators. + * TODO: merge this with {@link PinotCalciteCatalogReader} once we support + * 1. Return/Inference configuration in @ScalarFunction + * 2. Allow @ScalarFunction registry towards class (with multiple impl) + */ +public class PinotOperatorTable implements SqlOperatorTable { + private static final PinotOperatorTable INSTANCE = new PinotOperatorTable(); + + public static synchronized PinotOperatorTable instance() { + return INSTANCE; + } + + @Override public void lookupOperatorOverloads(SqlIdentifier opName, + @Nullable SqlFunctionCategory category, SqlSyntax syntax, + List operatorList, SqlNameMatcher nameMatcher) { + String simpleName = opName.getSimple(); + final Collection list = + lookUpOperators(simpleName); + if (list.isEmpty()) { + return; + } + for (SqlOperator op : list) { + if (op.getSyntax() == syntax) { + operatorList.add(op); + } else if (syntax == SqlSyntax.FUNCTION + && op instanceof SqlFunction) { + // this special case is needed for operators like CAST, + // which are treated as functions but have special syntax + operatorList.add(op); + } + } + + // REVIEW jvs 1-Jan-2005: why is this extra lookup required? + // Shouldn't it be covered by search above? + switch (syntax) { + case BINARY: + case PREFIX: + case POSTFIX: + for (SqlOperator extra + : lookUpOperators(simpleName)) { + // REVIEW: should only search operators added during this method? + if (extra != null && !operatorList.contains(extra)) { + operatorList.add(extra); + } + } + break; + default: + break; + } + } + + /** + * Look up operators based on case-sensitiveness. + */ + private Collection lookUpOperators(String name) { + return PinotFunctionRegistry.getOperatorMap().range(name, FunctionRegistry.CASE_SENSITIVITY).stream() + .map(Map.Entry::getValue).collect(Collectors.toSet()); + } + + @Override + public List getOperatorList() { + return PinotFunctionRegistry.getOperatorMap().map().values().stream().flatMap(List::stream) + .collect(Collectors.toList()); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlAggFunction.java b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlAggFunction.java similarity index 91% rename from pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlAggFunction.java rename to pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlAggFunction.java index 0d4146f4317d..bc8f55474e00 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlAggFunction.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlAggFunction.java @@ -16,8 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.calcite.sql; +package org.apache.pinot.common.function.sql; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlOperandTypeChecker; import org.apache.calcite.sql.type.SqlOperandTypeInference; import org.apache.calcite.sql.type.SqlReturnTypeInference; diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlTransformFunction.java b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlTransformFunction.java similarity index 89% rename from pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlTransformFunction.java rename to pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlTransformFunction.java index 827c9f37337b..c2e8ce8130c1 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlTransformFunction.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlTransformFunction.java @@ -16,8 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.calcite.sql; +package org.apache.pinot.common.function.sql; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlOperandTypeChecker; import org.apache.calcite.sql.type.SqlOperandTypeInference; import org.apache.calcite.sql.type.SqlReturnTypeInference; diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java index efe8b56a07c0..adadcd6992b6 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java @@ -23,7 +23,8 @@ import org.apache.calcite.schema.Function; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; -import org.apache.pinot.common.function.FunctionRegistry; +import org.apache.pinot.common.function.registry.PinotFunction; +import org.apache.pinot.common.function.sql.PinotFunctionRegistry; /** @@ -54,8 +55,7 @@ private CalciteSchemaBuilder() { public static CalciteSchema asRootSchema(Schema root) { CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false, "", root); SchemaPlus schemaPlus = rootSchema.plus(); - for (Map.Entry> e - : FunctionRegistry.getRegisteredCalciteFunctionMap().entrySet()) { + for (Map.Entry> e : PinotFunctionRegistry.getFunctionMap().map().entrySet()) { for (Function f : e.getValue()) { schemaPlus.add(e.getKey(), f); } diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java index df904123d201..68e331eb5d8b 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java @@ -40,7 +40,6 @@ import org.apache.calcite.rel.logical.PinotLogicalExchange; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.PinotSqlAggFunction; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.OperandTypes; @@ -53,6 +52,7 @@ import org.apache.calcite.util.mapping.Mapping; import org.apache.calcite.util.mapping.MappingType; import org.apache.calcite.util.mapping.Mappings; +import org.apache.pinot.common.function.sql.PinotSqlAggFunction; import org.apache.pinot.query.planner.plannode.AggregateNode.AggType; import org.apache.pinot.segment.spi.AggregationFunctionType; diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSqlCoalesceFunction.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlCoalesceFunction.java similarity index 90% rename from pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSqlCoalesceFunction.java rename to pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlCoalesceFunction.java index 92ef85857f9a..2b2ee32f083f 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSqlCoalesceFunction.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlCoalesceFunction.java @@ -16,10 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.calcite.sql.fun; +package org.apache.calcite.sql; -import org.apache.calcite.sql.SqlCall; -import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.fun.SqlCoalesceFunction; import org.apache.calcite.sql.validate.SqlValidator; diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java deleted file mode 100644 index 3617a7c06270..000000000000 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.calcite.sql.fun; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import org.apache.calcite.sql.PinotSqlAggFunction; -import org.apache.calcite.sql.PinotSqlTransformFunction; -import org.apache.calcite.sql.SqlFunction; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.validate.SqlNameMatchers; -import org.apache.calcite.util.Util; -import org.apache.pinot.common.function.TransformFunctionType; -import org.apache.pinot.segment.spi.AggregationFunctionType; -import org.checkerframework.checker.nullness.qual.MonotonicNonNull; - - -/** - * {@link PinotOperatorTable} defines the {@link SqlOperator} overrides on top of the {@link SqlStdOperatorTable}. - * - *

The main purpose of this Pinot specific SQL operator table is to - *

    - *
  • Ensure that any specific SQL validation rules can apply with Pinot override entirely over Calcite's.
  • - *
  • Ability to create customer operators that are not function and cannot use - * {@link org.apache.calcite.prepare.Prepare.CatalogReader} to override
  • - *
  • Still maintain minimum customization and benefit from Calcite's original operator table setting.
  • - *
- */ -@SuppressWarnings("unused") // unused fields are accessed by reflection -public class PinotOperatorTable extends SqlStdOperatorTable { - - private static @MonotonicNonNull PinotOperatorTable _instance; - - // TODO: clean up lazy init by using Suppliers.memorized(this::computeInstance) and make getter wrapped around - // supplier instance. this should replace all lazy init static objects in the codebase - public static synchronized PinotOperatorTable instance() { - if (_instance == null) { - // Creates and initializes the standard operator table. - // Uses two-phase construction, because we can't initialize the - // table until the constructor of the sub-class has completed. - _instance = new PinotOperatorTable(); - _instance.initNoDuplicate(); - } - return _instance; - } - - /** - * Initialize without duplicate, e.g. when 2 duplicate operator is linked with the same op - * {@link org.apache.calcite.sql.SqlKind} it causes problem. - * - *

This is a direct copy of the {@link org.apache.calcite.sql.util.ReflectiveSqlOperatorTable} and can be hard to - * debug, suggest changing to a non-dynamic registration. Dynamic function support should happen via catalog. - * - * This also registers aggregation functions defined in {@link org.apache.pinot.segment.spi.AggregationFunctionType} - * which are multistage enabled. - */ - public final void initNoDuplicate() { - // Pinot supports native COALESCE function, thus no need to create CASE WHEN conversion. - register(new PinotSqlCoalesceFunction()); - // Ensure ArrayValueConstructor is registered before ArrayQueryConstructor - register(ARRAY_VALUE_CONSTRUCTOR); - - // TODO: reflection based registration is not ideal, we should use a static list of operators and register them - // Use reflection to register the expressions stored in public fields. - for (Field field : getClass().getFields()) { - try { - if (SqlFunction.class.isAssignableFrom(field.getType())) { - SqlFunction op = (SqlFunction) field.get(this); - if (op != null && notRegistered(op)) { - register(op); - } - } else if ( - SqlOperator.class.isAssignableFrom(field.getType())) { - SqlOperator op = (SqlOperator) field.get(this); - if (op != null && notRegistered(op)) { - register(op); - } - } - } catch (IllegalArgumentException | IllegalAccessException e) { - throw Util.throwAsRuntime(Util.causeOrSelf(e)); - } - } - - // Walk through all the Pinot aggregation types and - // 1. register those that are supported in multistage in addition to calcite standard opt table. - // 2. register special handling that differs from calcite standard. - for (AggregationFunctionType aggregationFunctionType : AggregationFunctionType.values()) { - if (aggregationFunctionType.getSqlKind() != null) { - // 1. Register the aggregation function with Calcite - registerAggregateFunction(aggregationFunctionType.getName(), aggregationFunctionType); - // 2. Register the aggregation function with Calcite on all alternative names - List alternativeFunctionNames = aggregationFunctionType.getAlternativeNames(); - for (String alternativeFunctionName : alternativeFunctionNames) { - registerAggregateFunction(alternativeFunctionName, aggregationFunctionType); - } - } - } - - // Walk through all the Pinot transform types and - // 1. register those that are supported in multistage in addition to calcite standard opt table. - // 2. register special handling that differs from calcite standard. - for (TransformFunctionType transformFunctionType : TransformFunctionType.values()) { - if (transformFunctionType.getSqlKind() != null) { - // 1. Register the transform function with Calcite - registerTransformFunction(transformFunctionType.getName(), transformFunctionType); - // 2. Register the transform function with Calcite on all alternative names - List alternativeFunctionNames = transformFunctionType.getAlternativeNames(); - for (String alternativeFunctionName : alternativeFunctionNames) { - registerTransformFunction(alternativeFunctionName, transformFunctionType); - } - } - } - } - - private void registerAggregateFunction(String functionName, AggregationFunctionType functionType) { - // register function behavior that's different from Calcite - if (functionType.getOperandTypeChecker() != null && functionType.getReturnTypeInference() != null) { - PinotSqlAggFunction sqlAggFunction = new PinotSqlAggFunction(functionName.toUpperCase(Locale.ROOT), null, - functionType.getSqlKind(), functionType.getReturnTypeInference(), null, - functionType.getOperandTypeChecker(), functionType.getSqlFunctionCategory()); - if (notRegistered(sqlAggFunction)) { - register(sqlAggFunction); - } - } - } - - private void registerTransformFunction(String functionName, TransformFunctionType functionType) { - // register function behavior that's different from Calcite - if (functionType.getOperandTypeChecker() != null && functionType.getReturnTypeInference() != null) { - PinotSqlTransformFunction sqlTransformFunction = - new PinotSqlTransformFunction(functionName.toUpperCase(Locale.ROOT), - functionType.getSqlKind(), functionType.getReturnTypeInference(), null, - functionType.getOperandTypeChecker(), functionType.getSqlFunctionCategory()); - if (notRegistered(sqlTransformFunction)) { - register(sqlTransformFunction); - } - } - } - - private boolean notRegistered(SqlFunction op) { - List operatorList = new ArrayList<>(); - lookupOperatorOverloads(op.getNameAsId(), op.getFunctionType(), op.getSyntax(), operatorList, - SqlNameMatchers.withCaseSensitive(false)); - return operatorList.size() == 0; - } - - private boolean notRegistered(SqlOperator op) { - List operatorList = new ArrayList<>(); - lookupOperatorOverloads(op.getNameAsId(), null, op.getSyntax(), operatorList, - SqlNameMatchers.withCaseSensitive(false)); - return operatorList.size() == 0; - } -} diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/util/PinotSqlStdOperatorTable.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/util/PinotSqlStdOperatorTable.java new file mode 100644 index 000000000000..e3f6d8a72214 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/util/PinotSqlStdOperatorTable.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.calcite.sql.util; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.sql.PinotSqlCoalesceFunction; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.validate.SqlNameMatchers; +import org.apache.calcite.util.Util; +import org.apache.pinot.common.function.FunctionRegistry; + + +public class PinotSqlStdOperatorTable extends SqlStdOperatorTable { + private static PinotSqlStdOperatorTable _instance; + + // supplier instance. this should replace all lazy init static objects in the codebase + public static synchronized PinotSqlStdOperatorTable instance() { + if (_instance == null) { + // Creates and initializes the standard operator table. + // Uses two-phase construction, because we can't initialize the + // table until the constructor of the sub-class has completed. + _instance = new PinotSqlStdOperatorTable(); + _instance.initNoDuplicate(); + } + return _instance; + } + + /** + * Initialize without duplicate, e.g. when 2 duplicate operator is linked with the same op + * {@link org.apache.calcite.sql.SqlKind} it causes problem. + * + *

This is a direct copy of the {@link org.apache.calcite.sql.util.ReflectiveSqlOperatorTable} and can be hard to + * debug, suggest changing to a non-dynamic registration. Dynamic function support should happen via catalog. + * + * This also registers aggregation functions defined in {@link org.apache.pinot.segment.spi.AggregationFunctionType} + * which are multistage enabled. + */ + public final void initNoDuplicate() { + // Pinot supports native COALESCE function, thus no need to create CASE WHEN conversion. + register(new PinotSqlCoalesceFunction()); + // Ensure ArrayValueConstructor is registered before ArrayQueryConstructor + register(ARRAY_VALUE_CONSTRUCTOR); + + // TODO: reflection based registration is not ideal, we should use a static list of operators and register them + // Use reflection to register the expressions stored in public fields. + for (Field field : getClass().getFields()) { + try { + if (SqlFunction.class.isAssignableFrom(field.getType())) { + SqlFunction op = (SqlFunction) field.get(this); + if (op != null && notRegistered(op)) { + register(op); + } + } else if (SqlOperator.class.isAssignableFrom(field.getType())) { + SqlOperator op = (SqlOperator) field.get(this); + if (op != null && notRegistered(op)) { + register(op); + } + } + } catch (IllegalArgumentException | IllegalAccessException e) { + throw Util.throwAsRuntime(Util.causeOrSelf(e)); + } + } + } + + private boolean notRegistered(SqlFunction op) { + List operatorList = new ArrayList<>(); + lookupOperatorOverloads(op.getNameAsId(), op.getFunctionType(), op.getSyntax(), operatorList, + SqlNameMatchers.withCaseSensitive(FunctionRegistry.CASE_SENSITIVITY)); + return operatorList.size() == 0; + } + + private boolean notRegistered(SqlOperator op) { + List operatorList = new ArrayList<>(); + lookupOperatorOverloads(op.getNameAsId(), null, op.getSyntax(), operatorList, + SqlNameMatchers.withCaseSensitive(FunctionRegistry.CASE_SENSITIVITY)); + return operatorList.size() == 0; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 769d6a607fc2..c835eb36cd93 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -35,7 +35,6 @@ import org.apache.calcite.plan.hep.HepMatchOrder; import org.apache.calcite.plan.hep.HepProgram; import org.apache.calcite.plan.hep.HepProgramBuilder; -import org.apache.calcite.prepare.PinotCalciteCatalogReader; import org.apache.calcite.prepare.Prepare; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; @@ -52,8 +51,8 @@ import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.fun.PinotOperatorTable; import org.apache.calcite.sql.util.PinotChainedSqlOperatorTable; +import org.apache.calcite.sql.util.PinotSqlStdOperatorTable; import org.apache.calcite.sql2rel.PinotConvertletTable; import org.apache.calcite.sql2rel.RelDecorrelator; import org.apache.calcite.sql2rel.SqlToRelConverter; @@ -61,6 +60,8 @@ import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.RelBuilder; import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.function.sql.PinotCalciteCatalogReader; +import org.apache.pinot.common.function.sql.PinotOperatorTable; import org.apache.pinot.query.context.PlannerContext; import org.apache.pinot.query.planner.PlannerUtils; import org.apache.pinot.query.planner.QueryPlan; @@ -113,6 +114,7 @@ public QueryEnvironment(TypeFactory typeFactory, CalciteSchema rootSchema, Worke _config = Frameworks.newConfigBuilder().traitDefs() .operatorTable(new PinotChainedSqlOperatorTable(Arrays.asList( + PinotSqlStdOperatorTable.instance(), PinotOperatorTable.instance(), _catalogReader))) .defaultSchema(_rootSchema.plus())