From f913f824f3ab30f49b019f5503a33f2ecb44e874 Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Tue, 12 Dec 2023 11:45:25 -0800 Subject: [PATCH 1/4] initial commit to make FunctionRegistry use Calcite functions --- .../common/function/FunctionRegistry.java | 130 +++++++++++------- .../InbuiltFunctionEvaluatorTest.java | 2 +- .../calcite/jdbc/CalciteSchemaBuilder.java | 3 +- 3 files changed, 85 insertions(+), 50 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java index 97fa972bee1..417e6e62e53 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java @@ -18,16 +18,20 @@ */ package org.apache.pinot.common.function; -import com.google.common.base.Preconditions; +import com.google.common.annotations.VisibleForTesting; import java.lang.reflect.Method; import java.lang.reflect.Modifier; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; -import org.apache.calcite.schema.Function; -import org.apache.calcite.schema.impl.ScalarFunctionImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.impl.ReflectiveFunctionBase; import org.apache.calcite.util.NameMultimap; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.annotations.ScalarFunction; @@ -46,11 +50,7 @@ private FunctionRegistry() { private static final Logger LOGGER = LoggerFactory.getLogger(FunctionRegistry.class); - // TODO: consolidate the following 2 - // This FUNCTION_INFO_MAP is used by Pinot server to look up function by # of arguments - private static final Map> FUNCTION_INFO_MAP = new HashMap<>(); - // This FUNCTION_MAP is used by Calcite function catalog to look up function by function signature. - private static final NameMultimap FUNCTION_MAP = new NameMultimap<>(); + private static final NameMultimap FUNCTION_MAP = new NameMultimap<>(); /** * Registers the scalar functions via reflection. @@ -66,20 +66,15 @@ private FunctionRegistry() { } ScalarFunction scalarFunction = method.getAnnotation(ScalarFunction.class); if (scalarFunction.enabled()) { - // Annotated function names - String[] scalarFunctionNames = scalarFunction.names(); + // Parse annotated function names and alias + Set scalarFunctionNames = Arrays.stream(scalarFunction.names()).collect(Collectors.toSet()); + scalarFunctionNames.add(method.getName()); boolean nullableParameters = scalarFunction.nullableParameters(); - if (scalarFunctionNames.length > 0) { - for (String name : scalarFunctionNames) { - FunctionRegistry.registerFunction(name, method, nullableParameters, scalarFunction.isPlaceholder()); - } - } else { - FunctionRegistry.registerFunction(method, nullableParameters, scalarFunction.isPlaceholder()); - } + FunctionRegistry.registerFunction(method, scalarFunctionNames, nullableParameters); } } - LOGGER.info("Initialized FunctionRegistry with {} functions: {} in {}ms", FUNCTION_INFO_MAP.size(), - FUNCTION_INFO_MAP.keySet(), System.currentTimeMillis() - startTimeMs); + LOGGER.info("Initialized FunctionRegistry with {} functions: {} in {}ms", FUNCTION_MAP.map().size(), + FUNCTION_MAP.map().keySet(), System.currentTimeMillis() - startTimeMs); } /** @@ -93,37 +88,21 @@ public static void init() { /** * Registers a method with the name of the method. */ - public static void registerFunction(Method method, boolean nullableParameters, boolean isPlaceholder) { - registerFunction(method.getName(), method, nullableParameters, isPlaceholder); + @VisibleForTesting + public static void registerFunction(Method method, boolean nullableParameters) { + registerFunction(method, Collections.singleton(method.getName()), nullableParameters); } - /** - * Registers a method with the given function name. - */ - public static void registerFunction(String functionName, Method method, boolean nullableParameters, - boolean isPlaceholder) { - if (!isPlaceholder) { - registerFunctionInfoMap(functionName, method, nullableParameters); - } - registerCalciteNamedFunctionMap(functionName, method, nullableParameters); - } - - private static void registerFunctionInfoMap(String functionName, Method method, boolean nullableParameters) { - FunctionInfo functionInfo = new FunctionInfo(method, method.getDeclaringClass(), nullableParameters); - String canonicalName = canonicalize(functionName); - Map functionInfoMap = FUNCTION_INFO_MAP.computeIfAbsent(canonicalName, k -> new HashMap<>()); - FunctionInfo existFunctionInfo = functionInfoMap.put(method.getParameterCount(), functionInfo); - Preconditions.checkState(existFunctionInfo == null || existFunctionInfo.getMethod() == functionInfo.getMethod(), - "Function: %s with %s parameters is already registered", functionName, method.getParameterCount()); - } - - private static void registerCalciteNamedFunctionMap(String functionName, Method method, boolean nullableParameters) { + private static void registerFunction(Method method, Set alias, boolean nullableParameters) { if (method.getAnnotation(Deprecated.class) == null) { - FUNCTION_MAP.put(functionName, ScalarFunctionImpl.create(method)); +// String name = canonicalize(method.getName()); + for (String name : alias) { + FUNCTION_MAP.put(name, new PinotScalarFunction(name, alias, method, nullableParameters)); + } } } - public static Map> getRegisteredCalciteFunctionMap() { + public static Map> getRegisteredCalciteFunctionMap() { return FUNCTION_MAP.map(); } @@ -135,7 +114,7 @@ public static Set getRegisteredCalciteFunctionNames() { * Returns {@code true} if the given function name is registered, {@code false} otherwise. */ public static boolean containsFunction(String functionName) { - return FUNCTION_INFO_MAP.containsKey(canonicalize(functionName)); + return FUNCTION_MAP.containsKey(canonicalize(functionName), false); } /** @@ -145,8 +124,21 @@ public static boolean containsFunction(String functionName) { */ @Nullable public static FunctionInfo getFunctionInfo(String functionName, int numParameters) { - Map functionInfoMap = FUNCTION_INFO_MAP.get(canonicalize(functionName)); - return functionInfoMap != null ? functionInfoMap.get(numParameters) : null; + List candidates = findByNumParameters(FUNCTION_MAP.range(functionName, false), numParameters); + if (candidates.size() <= 1) { + return candidates.size() == 1 ? candidates.get(0).getFunctionInfo() : null; + } else { + throw new IllegalArgumentException( + "Unable to lookup function: " + functionName + " by parameter count: " + numParameters + + " Found multiple candidates. Try to use argument types to resolve the correct one!"); + } + } + + private static List findByNumParameters( + Collection> scalarFunctionList, int numParameters) { + return scalarFunctionList == null ? Collections.emptyList() + : scalarFunctionList.stream().filter(e -> e.getValue().getParameters().size() == numParameters) + .map(Map.Entry::getValue).collect(Collectors.toList()); } private static String canonicalize(String functionName) { @@ -179,4 +171,46 @@ public static double vectorSimilarity(float[] vector1, float[] vector2) { throw new UnsupportedOperationException("Placeholder scalar function, should not reach here"); } } + + /** + * Pinot specific implementation of the {@link org.apache.calcite.schema.ScalarFunction}. + * + * @see "{@link org.apache.calcite.schema.impl.ScalarFunctionImpl}" + */ + public static class PinotScalarFunction extends ReflectiveFunctionBase + implements org.apache.calcite.schema.ScalarFunction { + private final FunctionInfo _functionInfo; + private final String _name; + private final Set _alias; + private final Method _method; + + public PinotScalarFunction(String name, Set alias, Method method, boolean isNullableParameter) { + super(method); + _name = name; + _alias = alias; + _method = method; + _functionInfo = new FunctionInfo(method, method.getClass(), isNullableParameter); + } + + @Override + public RelDataType getReturnType(RelDataTypeFactory typeFactory) { + return typeFactory.createJavaType(method.getReturnType()); + } + + public String getName() { + return _name; + } + + public Set getAlias() { + return _alias; + } + + public Method getMethod() { + return _method; + } + + public FunctionInfo getFunctionInfo() { + return _functionInfo; + } + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionEvaluatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionEvaluatorTest.java index 82be9bcf52c..d455faf2451 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionEvaluatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionEvaluatorTest.java @@ -131,7 +131,7 @@ public void testStateSharedBetweenRowsForExecution() throws Exception { MyFunc myFunc = new MyFunc(); Method method = myFunc.getClass().getDeclaredMethod("appendToStringAndReturn", String.class); - FunctionRegistry.registerFunction(method, false, false); + FunctionRegistry.registerFunction(method, false); String expression = "appendToStringAndReturn('test ')"; InbuiltFunctionEvaluator evaluator = new InbuiltFunctionEvaluator(expression); assertTrue(evaluator.getArguments().isEmpty()); diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java index edb2d74bf07..efe8b56a07c 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java @@ -54,7 +54,8 @@ private CalciteSchemaBuilder() { public static CalciteSchema asRootSchema(Schema root) { CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false, "", root); SchemaPlus schemaPlus = rootSchema.plus(); - for (Map.Entry> e : FunctionRegistry.getRegisteredCalciteFunctionMap().entrySet()) { + for (Map.Entry> e + : FunctionRegistry.getRegisteredCalciteFunctionMap().entrySet()) { for (Function f : e.getValue()) { schemaPlus.add(e.getKey(), f); } From 770dfd9fc89d61945d0aee0e966ee480869fb639 Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Thu, 14 Dec 2023 06:55:10 -0800 Subject: [PATCH 2/4] adding back fallback option to current resolution --- .../common/function/FunctionRegistry.java | 62 ++++++++++++++----- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java index 417e6e62e53..f02007c4d7b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java @@ -19,11 +19,13 @@ package org.apache.pinot.common.function; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,15 +44,16 @@ /** * Registry for scalar functions. - *

TODO: Merge FunctionRegistry and FunctionDefinitionRegistry to provide one single registry for all functions. */ public class FunctionRegistry { + private static final Logger LOGGER = LoggerFactory.getLogger(FunctionRegistry.class); + // TODO: remove both when FunctionOperatorTable is in used. + private static final Map> FUNCTION_INFO_MAP = new HashMap<>(); + private static final NameMultimap FUNCTION_MAP = new NameMultimap<>(); + private FunctionRegistry() { } - private static final Logger LOGGER = LoggerFactory.getLogger(FunctionRegistry.class); - - private static final NameMultimap FUNCTION_MAP = new NameMultimap<>(); /** * Registers the scalar functions via reflection. @@ -68,7 +71,9 @@ private FunctionRegistry() { if (scalarFunction.enabled()) { // Parse annotated function names and alias Set scalarFunctionNames = Arrays.stream(scalarFunction.names()).collect(Collectors.toSet()); - scalarFunctionNames.add(method.getName()); + if (scalarFunctionNames.size() == 0) { + scalarFunctionNames.add(method.getName()); + } boolean nullableParameters = scalarFunction.nullableParameters(); FunctionRegistry.registerFunction(method, scalarFunctionNames, nullableParameters); } @@ -95,13 +100,26 @@ public static void registerFunction(Method method, boolean nullableParameters) { private static void registerFunction(Method method, Set alias, boolean nullableParameters) { if (method.getAnnotation(Deprecated.class) == null) { -// String name = canonicalize(method.getName()); for (String name : alias) { - FUNCTION_MAP.put(name, new PinotScalarFunction(name, alias, method, nullableParameters)); + registerFunctionInfoMap(name, method, nullableParameters); + registerCalciteNamedFunctionMap(name, method, nullableParameters); } } } + private static void registerFunctionInfoMap(String functionName, Method method, boolean nullableParameters) { + FunctionInfo functionInfo = new FunctionInfo(method, method.getDeclaringClass(), nullableParameters); + String canonicalName = canonicalize(functionName); + Map functionInfoMap = FUNCTION_INFO_MAP.computeIfAbsent(canonicalName, k -> new HashMap<>()); + FunctionInfo existFunctionInfo = functionInfoMap.put(method.getParameterCount(), functionInfo); + Preconditions.checkState(existFunctionInfo == null || existFunctionInfo.getMethod() == functionInfo.getMethod(), + "Function: %s with %s parameters is already registered", functionName, method.getParameterCount()); + } + + private static void registerCalciteNamedFunctionMap(String name, Method method, boolean nullableParameters) { + FUNCTION_MAP.put(name, new PinotScalarFunction(name, method, nullableParameters)); + } + public static Map> getRegisteredCalciteFunctionMap() { return FUNCTION_MAP.map(); } @@ -114,7 +132,9 @@ public static Set getRegisteredCalciteFunctionNames() { * Returns {@code true} if the given function name is registered, {@code false} otherwise. */ public static boolean containsFunction(String functionName) { - return FUNCTION_MAP.containsKey(canonicalize(functionName), false); + // TODO: remove fallback to FUNCTION_INFO_MAP + return FUNCTION_MAP.containsKey(canonicalize(functionName), false) + || FUNCTION_INFO_MAP.containsKey(canonicalize(functionName)); } /** @@ -124,6 +144,22 @@ public static boolean containsFunction(String functionName) { */ @Nullable public static FunctionInfo getFunctionInfo(String functionName, int numParameters) { + // TODO: remove fallback to FUNCTION_INFO_MAP + try { + return getFunctionInfoFromCalciteNamedMap(functionName, numParameters); + } catch (IllegalArgumentException iae) { + return getFunctionInfoFromFunctionInfoMap(functionName, numParameters); + } + } + + @Nullable + private static FunctionInfo getFunctionInfoFromFunctionInfoMap(String functionName, int numParameters) { + Map functionInfoMap = FUNCTION_INFO_MAP.get(canonicalize(functionName)); + return functionInfoMap != null ? functionInfoMap.get(numParameters) : null; + } + + @Nullable + private static FunctionInfo getFunctionInfoFromCalciteNamedMap(String functionName, int numParameters) { List candidates = findByNumParameters(FUNCTION_MAP.range(functionName, false), numParameters); if (candidates.size() <= 1) { return candidates.size() == 1 ? candidates.get(0).getFunctionInfo() : null; @@ -181,15 +217,13 @@ public static class PinotScalarFunction extends ReflectiveFunctionBase implements org.apache.calcite.schema.ScalarFunction { private final FunctionInfo _functionInfo; private final String _name; - private final Set _alias; private final Method _method; - public PinotScalarFunction(String name, Set alias, Method method, boolean isNullableParameter) { + public PinotScalarFunction(String name, Method method, boolean isNullableParameter) { super(method); _name = name; - _alias = alias; _method = method; - _functionInfo = new FunctionInfo(method, method.getClass(), isNullableParameter); + _functionInfo = new FunctionInfo(method, method.getDeclaringClass(), isNullableParameter); } @Override @@ -201,10 +235,6 @@ public String getName() { return _name; } - public Set getAlias() { - return _alias; - } - public Method getMethod() { return _method; } From 7a9c87cdd6d79e23d994520cdba6ce89943f83cd Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Thu, 14 Dec 2023 08:59:53 -0800 Subject: [PATCH 3/4] fix tests --- .../common/function/scalar/StringFunctions.java | 2 +- .../function/InbuiltFunctionEvaluatorTest.java | 16 ---------------- .../PostAggregationFunctionTest.java | 2 +- 3 files changed, 2 insertions(+), 18 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java index 5a49314943b..3af1405f6cd 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java @@ -140,7 +140,7 @@ public static String substring(String input, int beginIndex, int length) { * @param seperator * @return The two input strings joined by the seperator */ - @ScalarFunction(names = "concat_ws") + @ScalarFunction(names = {"concatWS", "concat_ws"}) public static String concatws(String seperator, String input1, String input2) { return concat(input1, input2, seperator); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionEvaluatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionEvaluatorTest.java index d455faf2451..5c6835e293f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionEvaluatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/InbuiltFunctionEvaluatorTest.java @@ -30,7 +30,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; public class InbuiltFunctionEvaluatorTest { @@ -158,21 +157,6 @@ public void testNullReturnedByInbuiltFunctionEvaluatorThatCannotTakeNull() { } } - @Test - public void testPlaceholderFunctionShouldNotBeRegistered() - throws Exception { - GenericRow row = new GenericRow(); - row.putValue("testColumn", "testValue"); - String expression = "text_match(testColumn, 'pattern')"; - try { - InbuiltFunctionEvaluator evaluator = new InbuiltFunctionEvaluator(expression); - evaluator.evaluate(row); - fail(); - } catch (Throwable t) { - assertTrue(t.getMessage().contains("text_match")); - } - } - public static class MyFunc { String _baseString = ""; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunctionTest.java index 0c7b0e3e52d..a69f537687a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunctionTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunctionTest.java @@ -56,7 +56,7 @@ public void testPostAggregationFunction() { assertEquals(function.invoke(new Object[]{"1234567890"}), "0987654321"); // ST_AsText - function = new PostAggregationFunction("ST_AsText", new ColumnDataType[]{ColumnDataType.BYTES}); + function = new PostAggregationFunction("ST_As_Text", new ColumnDataType[]{ColumnDataType.BYTES}); assertEquals(function.getResultType(), ColumnDataType.STRING); assertEquals(function.invoke( new Object[]{GeometrySerializer.serialize(GeometryUtils.GEOMETRY_FACTORY.createPoint(new Coordinate(10, 20)))}), From 89c83355baeca56001394219d8a2637ec2270759 Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Mon, 18 Dec 2023 07:49:56 -0800 Subject: [PATCH 4/4] create new function registry mechansim 1. FunctionRegistry keeps the old FUNCTION_INFO_MAP only 2. moved Calcite Catalog-based schema.Function registry to its own package; along with a SqlOperator based PinotOperatorTable 3. both CatalogReader and OperatorTable utilizes ground truth function from PinotFunctionRegistry --> will be default once deprecate FunctionRegistry 4. PinotFunctionRegistry provides argument-type based lookup via the same method SqlValidator utilize to lookup routine (and lookup operator overload) 5. clean up multi-stage engine side accordingly --- .../common/function/FunctionRegistry.java | 145 ++++------- .../function/registry/PinotFunction.java | 29 +++ .../registry/PinotScalarFunction.java | 83 ++++++ .../sql}/PinotCalciteCatalogReader.java | 6 +- .../function/sql/PinotFunctionRegistry.java | 237 ++++++++++++++++++ .../function/sql/PinotOperatorTable.java | 101 ++++++++ .../function}/sql/PinotSqlAggFunction.java | 6 +- .../sql/PinotSqlTransformFunction.java | 5 +- .../calcite/jdbc/CalciteSchemaBuilder.java | 6 +- .../PinotAggregateExchangeNodeInsertRule.java | 2 +- .../{fun => }/PinotSqlCoalesceFunction.java | 5 +- .../calcite/sql/fun/PinotOperatorTable.java | 171 ------------- .../sql/util/PinotSqlStdOperatorTable.java | 98 ++++++++ .../apache/pinot/query/QueryEnvironment.java | 6 +- 14 files changed, 620 insertions(+), 280 deletions(-) create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotFunction.java create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotScalarFunction.java rename {pinot-query-planner/src/main/java/org/apache/calcite/prepare => pinot-common/src/main/java/org/apache/pinot/common/function/sql}/PinotCalciteCatalogReader.java (98%) create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotFunctionRegistry.java create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotOperatorTable.java rename {pinot-query-planner/src/main/java/org/apache/calcite => pinot-common/src/main/java/org/apache/pinot/common/function}/sql/PinotSqlAggFunction.java (91%) rename {pinot-query-planner/src/main/java/org/apache/calcite => pinot-common/src/main/java/org/apache/pinot/common/function}/sql/PinotSqlTransformFunction.java (89%) rename pinot-query-planner/src/main/java/org/apache/calcite/sql/{fun => }/PinotSqlCoalesceFunction.java (90%) delete mode 100644 pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java create mode 100644 pinot-query-planner/src/main/java/org/apache/calcite/sql/util/PinotSqlStdOperatorTable.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java index f02007c4d7b..e5ee9899d46 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionRegistry.java @@ -23,7 +23,6 @@ import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -31,11 +30,10 @@ import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nullable; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.impl.ReflectiveFunctionBase; -import org.apache.calcite.util.NameMultimap; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.function.registry.PinotFunction; +import org.apache.pinot.common.function.registry.PinotScalarFunction; +import org.apache.pinot.common.function.sql.PinotFunctionRegistry; import org.apache.pinot.spi.annotations.ScalarFunction; import org.apache.pinot.spi.utils.PinotReflectionUtils; import org.slf4j.Logger; @@ -46,15 +44,13 @@ * Registry for scalar functions. */ public class FunctionRegistry { + public static final boolean CASE_SENSITIVITY = false; private static final Logger LOGGER = LoggerFactory.getLogger(FunctionRegistry.class); - // TODO: remove both when FunctionOperatorTable is in used. private static final Map> FUNCTION_INFO_MAP = new HashMap<>(); - private static final NameMultimap FUNCTION_MAP = new NameMultimap<>(); private FunctionRegistry() { } - /** * Registers the scalar functions via reflection. * NOTE: In order to plugin methods using reflection, the methods should be inside a class that includes ".function." @@ -78,8 +74,8 @@ private FunctionRegistry() { FunctionRegistry.registerFunction(method, scalarFunctionNames, nullableParameters); } } - LOGGER.info("Initialized FunctionRegistry with {} functions: {} in {}ms", FUNCTION_MAP.map().size(), - FUNCTION_MAP.map().keySet(), System.currentTimeMillis() - startTimeMs); + LOGGER.info("Initialized FunctionRegistry with {} functions: {} in {}ms", FUNCTION_INFO_MAP.size(), + FUNCTION_INFO_MAP.keySet(), System.currentTimeMillis() - startTimeMs); } /** @@ -90,50 +86,29 @@ private FunctionRegistry() { public static void init() { } - /** - * Registers a method with the name of the method. - */ @VisibleForTesting public static void registerFunction(Method method, boolean nullableParameters) { registerFunction(method, Collections.singleton(method.getName()), nullableParameters); } - private static void registerFunction(Method method, Set alias, boolean nullableParameters) { - if (method.getAnnotation(Deprecated.class) == null) { - for (String name : alias) { - registerFunctionInfoMap(name, method, nullableParameters); - registerCalciteNamedFunctionMap(name, method, nullableParameters); - } - } - } - - private static void registerFunctionInfoMap(String functionName, Method method, boolean nullableParameters) { - FunctionInfo functionInfo = new FunctionInfo(method, method.getDeclaringClass(), nullableParameters); - String canonicalName = canonicalize(functionName); - Map functionInfoMap = FUNCTION_INFO_MAP.computeIfAbsent(canonicalName, k -> new HashMap<>()); - FunctionInfo existFunctionInfo = functionInfoMap.put(method.getParameterCount(), functionInfo); - Preconditions.checkState(existFunctionInfo == null || existFunctionInfo.getMethod() == functionInfo.getMethod(), - "Function: %s with %s parameters is already registered", functionName, method.getParameterCount()); - } - - private static void registerCalciteNamedFunctionMap(String name, Method method, boolean nullableParameters) { - FUNCTION_MAP.put(name, new PinotScalarFunction(name, method, nullableParameters)); - } - - public static Map> getRegisteredCalciteFunctionMap() { - return FUNCTION_MAP.map(); + @VisibleForTesting + public static Set getRegisteredCalciteFunctionNames() { + return PinotFunctionRegistry.getFunctionMap().map().keySet(); } - public static Set getRegisteredCalciteFunctionNames() { - return FUNCTION_MAP.map().keySet(); + /** + * Returns the full list of all registered ScalarFunction to Calcite. + */ + public static Map> getRegisteredCalciteFunctionMap() { + return PinotFunctionRegistry.getFunctionMap().map(); } /** * Returns {@code true} if the given function name is registered, {@code false} otherwise. */ public static boolean containsFunction(String functionName) { - // TODO: remove fallback to FUNCTION_INFO_MAP - return FUNCTION_MAP.containsKey(canonicalize(functionName), false) + // TODO: remove deprecated FUNCTION_INFO_MAP + return PinotFunctionRegistry.getFunctionMap().containsKey(functionName, CASE_SENSITIVITY) || FUNCTION_INFO_MAP.containsKey(canonicalize(functionName)); } @@ -143,40 +118,54 @@ public static boolean containsFunction(String functionName) { * methods are already registered. */ @Nullable - public static FunctionInfo getFunctionInfo(String functionName, int numParameters) { - // TODO: remove fallback to FUNCTION_INFO_MAP + public static FunctionInfo getFunctionInfo(String functionName, int numParams) { try { - return getFunctionInfoFromCalciteNamedMap(functionName, numParameters); + return getFunctionInfoFromCalciteNamedMap(functionName, numParams); } catch (IllegalArgumentException iae) { - return getFunctionInfoFromFunctionInfoMap(functionName, numParameters); + // TODO: remove deprecated FUNCTION_INFO_MAP + return getFunctionInfoFromFunctionInfoMap(functionName, numParams); } } + // TODO: remove deprecated FUNCTION_INFO_MAP + private static void registerFunction(Method method, Set alias, boolean nullableParameters) { + if (method.getAnnotation(Deprecated.class) == null) { + for (String name : alias) { + registerFunctionInfoMap(name, method, nullableParameters); + } + } + } + + private static void registerFunctionInfoMap(String functionName, Method method, boolean nullableParameters) { + FunctionInfo functionInfo = new FunctionInfo(method, method.getDeclaringClass(), nullableParameters); + String canonicalName = canonicalize(functionName); + Map functionInfoMap = FUNCTION_INFO_MAP.computeIfAbsent(canonicalName, k -> new HashMap<>()); + FunctionInfo existFunctionInfo = functionInfoMap.put(method.getParameterCount(), functionInfo); + Preconditions.checkState(existFunctionInfo == null || existFunctionInfo.getMethod() == functionInfo.getMethod(), + "Function: %s with %s parameters is already registered", functionName, method.getParameterCount()); + } + @Nullable - private static FunctionInfo getFunctionInfoFromFunctionInfoMap(String functionName, int numParameters) { + private static FunctionInfo getFunctionInfoFromFunctionInfoMap(String functionName, int numParams) { Map functionInfoMap = FUNCTION_INFO_MAP.get(canonicalize(functionName)); - return functionInfoMap != null ? functionInfoMap.get(numParameters) : null; + return functionInfoMap != null ? functionInfoMap.get(numParams) : null; } @Nullable - private static FunctionInfo getFunctionInfoFromCalciteNamedMap(String functionName, int numParameters) { - List candidates = findByNumParameters(FUNCTION_MAP.range(functionName, false), numParameters); - if (candidates.size() <= 1) { - return candidates.size() == 1 ? candidates.get(0).getFunctionInfo() : null; + private static FunctionInfo getFunctionInfoFromCalciteNamedMap(String functionName, int numParams) { + List candidates = PinotFunctionRegistry.getFunctionMap() + .range(functionName, CASE_SENSITIVITY).stream() + .filter(e -> e.getValue() instanceof PinotScalarFunction && e.getValue().getParameters().size() == numParams) + .map(e -> (PinotScalarFunction) e.getValue()).collect(Collectors.toList()); + if (candidates.size() == 1) { + return candidates.get(0).getFunctionInfo(); } else { throw new IllegalArgumentException( - "Unable to lookup function: " + functionName + " by parameter count: " + numParameters - + " Found multiple candidates. Try to use argument types to resolve the correct one!"); + "Unable to lookup function: " + functionName + " by parameter count: " + numParams + " Found " + + candidates.size() + " candidates. Try to use argument types to resolve the correct one!"); } } - private static List findByNumParameters( - Collection> scalarFunctionList, int numParameters) { - return scalarFunctionList == null ? Collections.emptyList() - : scalarFunctionList.stream().filter(e -> e.getValue().getParameters().size() == numParameters) - .map(Map.Entry::getValue).collect(Collectors.toList()); - } - private static String canonicalize(String functionName) { return StringUtils.remove(functionName, '_').toLowerCase(); } @@ -207,40 +196,4 @@ public static double vectorSimilarity(float[] vector1, float[] vector2) { throw new UnsupportedOperationException("Placeholder scalar function, should not reach here"); } } - - /** - * Pinot specific implementation of the {@link org.apache.calcite.schema.ScalarFunction}. - * - * @see "{@link org.apache.calcite.schema.impl.ScalarFunctionImpl}" - */ - public static class PinotScalarFunction extends ReflectiveFunctionBase - implements org.apache.calcite.schema.ScalarFunction { - private final FunctionInfo _functionInfo; - private final String _name; - private final Method _method; - - public PinotScalarFunction(String name, Method method, boolean isNullableParameter) { - super(method); - _name = name; - _method = method; - _functionInfo = new FunctionInfo(method, method.getDeclaringClass(), isNullableParameter); - } - - @Override - public RelDataType getReturnType(RelDataTypeFactory typeFactory) { - return typeFactory.createJavaType(method.getReturnType()); - } - - public String getName() { - return _name; - } - - public Method getMethod() { - return _method; - } - - public FunctionInfo getFunctionInfo() { - return _functionInfo; - } - } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotFunction.java b/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotFunction.java new file mode 100644 index 00000000000..f0e75651373 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotFunction.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function.registry; + +import org.apache.calcite.schema.Function; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; +import org.apache.calcite.sql.type.SqlReturnTypeInference; + + +public interface PinotFunction extends Function { + SqlOperandTypeChecker getOperandTypeChecker(); + SqlReturnTypeInference getReturnTypeInference(); +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotScalarFunction.java b/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotScalarFunction.java new file mode 100644 index 00000000000..c1708aab420 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/registry/PinotScalarFunction.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function.registry; + +import java.lang.reflect.Method; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScalarFunction; +import org.apache.calcite.schema.impl.ReflectiveFunctionBase; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.pinot.common.function.FunctionInfo; + + +/** + * Pinot specific implementation of the {@link ScalarFunction}. + * + * @see "{@link org.apache.calcite.schema.impl.ScalarFunctionImpl}" + */ +public class PinotScalarFunction extends ReflectiveFunctionBase implements PinotFunction, ScalarFunction { + private final FunctionInfo _functionInfo; + private final String _name; + private final Method _method; + private final SqlOperandTypeChecker _sqlOperandTypeChecker; + private final SqlReturnTypeInference _sqlReturnTypeInference; + + public PinotScalarFunction(String name, Method method, boolean isNullableParameter) { + this(name, method, isNullableParameter, null, null); + } + + public PinotScalarFunction(String name, Method method, boolean isNullableParameter, + SqlOperandTypeChecker sqlOperandTypeChecker, SqlReturnTypeInference sqlReturnTypeInference) { + super(method); + _name = name; + _method = method; + _functionInfo = new FunctionInfo(method, method.getDeclaringClass(), isNullableParameter); + _sqlOperandTypeChecker = sqlOperandTypeChecker; + _sqlReturnTypeInference = sqlReturnTypeInference; + } + + @Override + public RelDataType getReturnType(RelDataTypeFactory typeFactory) { + return typeFactory.createJavaType(method.getReturnType()); + } + + public String getName() { + return _name; + } + + public Method getMethod() { + return _method; + } + + public FunctionInfo getFunctionInfo() { + return _functionInfo; + } + + @Override + public SqlOperandTypeChecker getOperandTypeChecker() { + return _sqlOperandTypeChecker; + } + + @Override + public SqlReturnTypeInference getReturnTypeInference() { + return _sqlReturnTypeInference; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/prepare/PinotCalciteCatalogReader.java b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotCalciteCatalogReader.java similarity index 98% rename from pinot-query-planner/src/main/java/org/apache/calcite/prepare/PinotCalciteCatalogReader.java rename to pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotCalciteCatalogReader.java index 84c71be601f..672689dff5c 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/prepare/PinotCalciteCatalogReader.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotCalciteCatalogReader.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.calcite.prepare; +package org.apache.pinot.common.function.sql; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -35,6 +35,8 @@ import org.apache.calcite.linq4j.function.Hints; import org.apache.calcite.model.ModelHandler; import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.prepare.RelOptTableImpl; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; @@ -310,7 +312,7 @@ public static SqlOperatorTable operatorTable(String... classNames) { } /** Converts a function to a {@link org.apache.calcite.sql.SqlOperator}. */ - private static SqlOperator toOp(SqlIdentifier name, + public static SqlOperator toOp(SqlIdentifier name, final org.apache.calcite.schema.Function function) { final Function> argTypesFactory = typeFactory -> function.getParameters() diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotFunctionRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotFunctionRegistry.java new file mode 100644 index 00000000000..776a9592725 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotFunctionRegistry.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function.sql; + +import com.google.common.annotations.VisibleForTesting; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Function; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.validate.SqlNameMatchers; +import org.apache.calcite.sql.validate.SqlUserDefinedFunction; +import org.apache.calcite.util.NameMultimap; +import org.apache.pinot.common.function.TransformFunctionType; +import org.apache.pinot.common.function.registry.PinotFunction; +import org.apache.pinot.common.function.registry.PinotScalarFunction; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.annotations.ScalarFunction; +import org.apache.pinot.spi.utils.PinotReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Registry for scalar functions. + */ +public class PinotFunctionRegistry { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotFunctionRegistry.class); + private static final NameMultimap OPERATOR_MAP = new NameMultimap<>(); + private static final NameMultimap FUNCTION_MAP = new NameMultimap<>(); + + private PinotFunctionRegistry() { + } + + /** + * Registers the scalar functions via reflection. + * NOTE: In order to plugin methods using reflection, the methods should be inside a class that includes ".function." + * in its class path. This convention can significantly reduce the time of class scanning. + */ + static { + // REGISTER FUNCTIONS + long startTimeMs = System.currentTimeMillis(); + Set methods = PinotReflectionUtils.getMethodsThroughReflection(".*\\.function\\..*", ScalarFunction.class); + for (Method method : methods) { + if (!Modifier.isPublic(method.getModifiers())) { + continue; + } + ScalarFunction scalarFunction = method.getAnnotation(ScalarFunction.class); + if (scalarFunction.enabled()) { + // Parse annotated function names and alias + Set scalarFunctionNames = Arrays.stream(scalarFunction.names()).collect(Collectors.toSet()); + if (scalarFunctionNames.size() == 0) { + scalarFunctionNames.add(method.getName()); + } + boolean nullableParameters = scalarFunction.nullableParameters(); + PinotFunctionRegistry.registerFunction(method, scalarFunctionNames, nullableParameters); + } + } + LOGGER.info("Initialized FunctionRegistry with {} functions: {} in {}ms", FUNCTION_MAP.map().size(), + FUNCTION_MAP.map().keySet(), System.currentTimeMillis() - startTimeMs); + + // REGISTER OPERATORS + // Walk through all the Pinot aggregation types and + // 1. register those that are supported in multistage in addition to calcite standard opt table. + // 2. register special handling that differs from calcite standard. + for (AggregationFunctionType aggregationFunctionType : AggregationFunctionType.values()) { + if (aggregationFunctionType.getSqlKind() != null) { + // 1. Register the aggregation function with Calcite + registerAggregateFunction(aggregationFunctionType.getName(), aggregationFunctionType); + // 2. Register the aggregation function with Calcite on all alternative names + List alternativeFunctionNames = aggregationFunctionType.getAlternativeNames(); + for (String alternativeFunctionName : alternativeFunctionNames) { + registerAggregateFunction(alternativeFunctionName, aggregationFunctionType); + } + } + } + + // Walk through all the Pinot transform types and + // 1. register those that are supported in multistage in addition to calcite standard opt table. + // 2. register special handling that differs from calcite standard. + for (TransformFunctionType transformFunctionType : TransformFunctionType.values()) { + if (transformFunctionType.getSqlKind() != null) { + // 1. Register the transform function with Calcite + registerTransformFunction(transformFunctionType.getName(), transformFunctionType); + // 2. Register the transform function with Calcite on all alternative names + List alternativeFunctionNames = transformFunctionType.getAlternativeNames(); + for (String alternativeFunctionName : alternativeFunctionNames) { + registerTransformFunction(alternativeFunctionName, transformFunctionType); + } + } + } + } + + public static void init() { + } + + @VisibleForTesting + public static void registerFunction(Method method, boolean nullableParameters) { + registerFunction(method, Collections.singleton(method.getName()), nullableParameters); + } + + public static NameMultimap getFunctionMap() { + return FUNCTION_MAP; + } + + public static NameMultimap getOperatorMap() { + return OPERATOR_MAP; + } + + @Nullable + public static PinotScalarFunction getScalarFunction(SqlOperatorTable operatorTable, RelDataTypeFactory typeFactory, + String functionName, List argTypes) { + List relArgTypes = convertArgumentTypes(typeFactory, argTypes); + SqlOperator sqlOperator = SqlUtil.lookupRoutine(operatorTable, typeFactory, + new SqlIdentifier(functionName, SqlParserPos.QUOTED_ZERO), relArgTypes, null, null, SqlSyntax.FUNCTION, + SqlKind.OTHER_FUNCTION, SqlNameMatchers.withCaseSensitive(false), true); + if (sqlOperator instanceof SqlUserDefinedFunction) { + Function function = ((SqlUserDefinedFunction) sqlOperator).getFunction(); + if (function instanceof PinotScalarFunction) { + return (PinotScalarFunction) function; + } + } + return null; + } + + private static void registerFunction(Method method, Set alias, boolean nullableParameters) { + if (method.getAnnotation(Deprecated.class) == null) { + for (String name : alias) { + registerCalciteNamedFunctionMap(name, method, nullableParameters); + } + } + } + + private static void registerCalciteNamedFunctionMap(String name, Method method, boolean nullableParameters) { + FUNCTION_MAP.put(name, new PinotScalarFunction(name, method, nullableParameters)); + } + + private static List convertArgumentTypes(RelDataTypeFactory typeFactory, + List argTypes) { + return argTypes.stream().map(type -> toRelType(typeFactory, type)).collect(Collectors.toList()); + } + + private static RelDataType toRelType(RelDataTypeFactory typeFactory, DataSchema.ColumnDataType dataType) { + switch (dataType) { + case INT: + return typeFactory.createSqlType(SqlTypeName.INTEGER); + case LONG: + return typeFactory.createSqlType(SqlTypeName.BIGINT); + case FLOAT: + return typeFactory.createSqlType(SqlTypeName.REAL); + case DOUBLE: + return typeFactory.createSqlType(SqlTypeName.DOUBLE); + case BIG_DECIMAL: + return typeFactory.createSqlType(SqlTypeName.DECIMAL); + case BOOLEAN: + return typeFactory.createSqlType(SqlTypeName.BOOLEAN); + case TIMESTAMP: + return typeFactory.createSqlType(SqlTypeName.TIMESTAMP); + case JSON: + case STRING: + return typeFactory.createSqlType(SqlTypeName.VARCHAR); + case BYTES: + return typeFactory.createSqlType(SqlTypeName.VARBINARY); + case INT_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.INTEGER), -1); + case LONG_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.BIGINT), -1); + case FLOAT_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.REAL), -1); + case DOUBLE_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.DOUBLE), -1); + case BOOLEAN_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.BOOLEAN), -1); + case TIMESTAMP_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.TIMESTAMP), -1); + case STRING_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.VARCHAR), -1); + case BYTES_ARRAY: + return typeFactory.createArrayType(typeFactory.createSqlType(SqlTypeName.VARBINARY), -1); + case UNKNOWN: + case OBJECT: + default: + return typeFactory.createSqlType(SqlTypeName.ANY); + } + } + + private static void registerAggregateFunction(String functionName, AggregationFunctionType functionType) { + if (functionType.getOperandTypeChecker() != null && functionType.getReturnTypeInference() != null) { + PinotSqlAggFunction sqlAggFunction = new PinotSqlAggFunction(functionName.toUpperCase(Locale.ROOT), null, + functionType.getSqlKind(), functionType.getReturnTypeInference(), null, + functionType.getOperandTypeChecker(), functionType.getSqlFunctionCategory()); + OPERATOR_MAP.put(functionName.toUpperCase(Locale.ROOT), sqlAggFunction); + } + } + + private static void registerTransformFunction(String functionName, TransformFunctionType functionType) { + if (functionType.getOperandTypeChecker() != null && functionType.getReturnTypeInference() != null) { + PinotSqlTransformFunction sqlTransformFunction = + new PinotSqlTransformFunction(functionName.toUpperCase(Locale.ROOT), + functionType.getSqlKind(), functionType.getReturnTypeInference(), null, + functionType.getOperandTypeChecker(), functionType.getSqlFunctionCategory()); + OPERATOR_MAP.put(functionName.toUpperCase(Locale.ROOT), sqlTransformFunction); + } + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotOperatorTable.java b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotOperatorTable.java new file mode 100644 index 00000000000..ca4513a5ba7 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotOperatorTable.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function.sql; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.sql.validate.SqlNameMatcher; +import org.apache.pinot.common.function.FunctionRegistry; +import org.checkerframework.checker.nullness.qual.Nullable; + + +/** + * Temporary implementation of all dynamic arg/return type inference operators. + * TODO: merge this with {@link PinotCalciteCatalogReader} once we support + * 1. Return/Inference configuration in @ScalarFunction + * 2. Allow @ScalarFunction registry towards class (with multiple impl) + */ +public class PinotOperatorTable implements SqlOperatorTable { + private static final PinotOperatorTable INSTANCE = new PinotOperatorTable(); + + public static synchronized PinotOperatorTable instance() { + return INSTANCE; + } + + @Override public void lookupOperatorOverloads(SqlIdentifier opName, + @Nullable SqlFunctionCategory category, SqlSyntax syntax, + List operatorList, SqlNameMatcher nameMatcher) { + String simpleName = opName.getSimple(); + final Collection list = + lookUpOperators(simpleName); + if (list.isEmpty()) { + return; + } + for (SqlOperator op : list) { + if (op.getSyntax() == syntax) { + operatorList.add(op); + } else if (syntax == SqlSyntax.FUNCTION + && op instanceof SqlFunction) { + // this special case is needed for operators like CAST, + // which are treated as functions but have special syntax + operatorList.add(op); + } + } + + // REVIEW jvs 1-Jan-2005: why is this extra lookup required? + // Shouldn't it be covered by search above? + switch (syntax) { + case BINARY: + case PREFIX: + case POSTFIX: + for (SqlOperator extra + : lookUpOperators(simpleName)) { + // REVIEW: should only search operators added during this method? + if (extra != null && !operatorList.contains(extra)) { + operatorList.add(extra); + } + } + break; + default: + break; + } + } + + /** + * Look up operators based on case-sensitiveness. + */ + private Collection lookUpOperators(String name) { + return PinotFunctionRegistry.getOperatorMap().range(name, FunctionRegistry.CASE_SENSITIVITY).stream() + .map(Map.Entry::getValue).collect(Collectors.toSet()); + } + + @Override + public List getOperatorList() { + return PinotFunctionRegistry.getOperatorMap().map().values().stream().flatMap(List::stream) + .collect(Collectors.toList()); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlAggFunction.java b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlAggFunction.java similarity index 91% rename from pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlAggFunction.java rename to pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlAggFunction.java index 0d4146f4317..bc8f55474e0 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlAggFunction.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlAggFunction.java @@ -16,8 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.calcite.sql; +package org.apache.pinot.common.function.sql; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlOperandTypeChecker; import org.apache.calcite.sql.type.SqlOperandTypeInference; import org.apache.calcite.sql.type.SqlReturnTypeInference; diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlTransformFunction.java b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlTransformFunction.java similarity index 89% rename from pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlTransformFunction.java rename to pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlTransformFunction.java index 827c9f37337..c2e8ce8130c 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlTransformFunction.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/sql/PinotSqlTransformFunction.java @@ -16,8 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.calcite.sql; +package org.apache.pinot.common.function.sql; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlOperandTypeChecker; import org.apache.calcite.sql.type.SqlOperandTypeInference; import org.apache.calcite.sql.type.SqlReturnTypeInference; diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java index efe8b56a07c..adadcd6992b 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java @@ -23,7 +23,8 @@ import org.apache.calcite.schema.Function; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; -import org.apache.pinot.common.function.FunctionRegistry; +import org.apache.pinot.common.function.registry.PinotFunction; +import org.apache.pinot.common.function.sql.PinotFunctionRegistry; /** @@ -54,8 +55,7 @@ private CalciteSchemaBuilder() { public static CalciteSchema asRootSchema(Schema root) { CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false, "", root); SchemaPlus schemaPlus = rootSchema.plus(); - for (Map.Entry> e - : FunctionRegistry.getRegisteredCalciteFunctionMap().entrySet()) { + for (Map.Entry> e : PinotFunctionRegistry.getFunctionMap().map().entrySet()) { for (Function f : e.getValue()) { schemaPlus.add(e.getKey(), f); } diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java index df904123d20..68e331eb5d8 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java @@ -40,7 +40,6 @@ import org.apache.calcite.rel.logical.PinotLogicalExchange; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.PinotSqlAggFunction; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.OperandTypes; @@ -53,6 +52,7 @@ import org.apache.calcite.util.mapping.Mapping; import org.apache.calcite.util.mapping.MappingType; import org.apache.calcite.util.mapping.Mappings; +import org.apache.pinot.common.function.sql.PinotSqlAggFunction; import org.apache.pinot.query.planner.plannode.AggregateNode.AggType; import org.apache.pinot.segment.spi.AggregationFunctionType; diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSqlCoalesceFunction.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlCoalesceFunction.java similarity index 90% rename from pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSqlCoalesceFunction.java rename to pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlCoalesceFunction.java index 92ef85857f9..2b2ee32f083 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSqlCoalesceFunction.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/PinotSqlCoalesceFunction.java @@ -16,10 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.calcite.sql.fun; +package org.apache.calcite.sql; -import org.apache.calcite.sql.SqlCall; -import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.fun.SqlCoalesceFunction; import org.apache.calcite.sql.validate.SqlValidator; diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java deleted file mode 100644 index 3617a7c0627..00000000000 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.calcite.sql.fun; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import org.apache.calcite.sql.PinotSqlAggFunction; -import org.apache.calcite.sql.PinotSqlTransformFunction; -import org.apache.calcite.sql.SqlFunction; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.validate.SqlNameMatchers; -import org.apache.calcite.util.Util; -import org.apache.pinot.common.function.TransformFunctionType; -import org.apache.pinot.segment.spi.AggregationFunctionType; -import org.checkerframework.checker.nullness.qual.MonotonicNonNull; - - -/** - * {@link PinotOperatorTable} defines the {@link SqlOperator} overrides on top of the {@link SqlStdOperatorTable}. - * - *

The main purpose of this Pinot specific SQL operator table is to - *

    - *
  • Ensure that any specific SQL validation rules can apply with Pinot override entirely over Calcite's.
  • - *
  • Ability to create customer operators that are not function and cannot use - * {@link org.apache.calcite.prepare.Prepare.CatalogReader} to override
  • - *
  • Still maintain minimum customization and benefit from Calcite's original operator table setting.
  • - *
- */ -@SuppressWarnings("unused") // unused fields are accessed by reflection -public class PinotOperatorTable extends SqlStdOperatorTable { - - private static @MonotonicNonNull PinotOperatorTable _instance; - - // TODO: clean up lazy init by using Suppliers.memorized(this::computeInstance) and make getter wrapped around - // supplier instance. this should replace all lazy init static objects in the codebase - public static synchronized PinotOperatorTable instance() { - if (_instance == null) { - // Creates and initializes the standard operator table. - // Uses two-phase construction, because we can't initialize the - // table until the constructor of the sub-class has completed. - _instance = new PinotOperatorTable(); - _instance.initNoDuplicate(); - } - return _instance; - } - - /** - * Initialize without duplicate, e.g. when 2 duplicate operator is linked with the same op - * {@link org.apache.calcite.sql.SqlKind} it causes problem. - * - *

This is a direct copy of the {@link org.apache.calcite.sql.util.ReflectiveSqlOperatorTable} and can be hard to - * debug, suggest changing to a non-dynamic registration. Dynamic function support should happen via catalog. - * - * This also registers aggregation functions defined in {@link org.apache.pinot.segment.spi.AggregationFunctionType} - * which are multistage enabled. - */ - public final void initNoDuplicate() { - // Pinot supports native COALESCE function, thus no need to create CASE WHEN conversion. - register(new PinotSqlCoalesceFunction()); - // Ensure ArrayValueConstructor is registered before ArrayQueryConstructor - register(ARRAY_VALUE_CONSTRUCTOR); - - // TODO: reflection based registration is not ideal, we should use a static list of operators and register them - // Use reflection to register the expressions stored in public fields. - for (Field field : getClass().getFields()) { - try { - if (SqlFunction.class.isAssignableFrom(field.getType())) { - SqlFunction op = (SqlFunction) field.get(this); - if (op != null && notRegistered(op)) { - register(op); - } - } else if ( - SqlOperator.class.isAssignableFrom(field.getType())) { - SqlOperator op = (SqlOperator) field.get(this); - if (op != null && notRegistered(op)) { - register(op); - } - } - } catch (IllegalArgumentException | IllegalAccessException e) { - throw Util.throwAsRuntime(Util.causeOrSelf(e)); - } - } - - // Walk through all the Pinot aggregation types and - // 1. register those that are supported in multistage in addition to calcite standard opt table. - // 2. register special handling that differs from calcite standard. - for (AggregationFunctionType aggregationFunctionType : AggregationFunctionType.values()) { - if (aggregationFunctionType.getSqlKind() != null) { - // 1. Register the aggregation function with Calcite - registerAggregateFunction(aggregationFunctionType.getName(), aggregationFunctionType); - // 2. Register the aggregation function with Calcite on all alternative names - List alternativeFunctionNames = aggregationFunctionType.getAlternativeNames(); - for (String alternativeFunctionName : alternativeFunctionNames) { - registerAggregateFunction(alternativeFunctionName, aggregationFunctionType); - } - } - } - - // Walk through all the Pinot transform types and - // 1. register those that are supported in multistage in addition to calcite standard opt table. - // 2. register special handling that differs from calcite standard. - for (TransformFunctionType transformFunctionType : TransformFunctionType.values()) { - if (transformFunctionType.getSqlKind() != null) { - // 1. Register the transform function with Calcite - registerTransformFunction(transformFunctionType.getName(), transformFunctionType); - // 2. Register the transform function with Calcite on all alternative names - List alternativeFunctionNames = transformFunctionType.getAlternativeNames(); - for (String alternativeFunctionName : alternativeFunctionNames) { - registerTransformFunction(alternativeFunctionName, transformFunctionType); - } - } - } - } - - private void registerAggregateFunction(String functionName, AggregationFunctionType functionType) { - // register function behavior that's different from Calcite - if (functionType.getOperandTypeChecker() != null && functionType.getReturnTypeInference() != null) { - PinotSqlAggFunction sqlAggFunction = new PinotSqlAggFunction(functionName.toUpperCase(Locale.ROOT), null, - functionType.getSqlKind(), functionType.getReturnTypeInference(), null, - functionType.getOperandTypeChecker(), functionType.getSqlFunctionCategory()); - if (notRegistered(sqlAggFunction)) { - register(sqlAggFunction); - } - } - } - - private void registerTransformFunction(String functionName, TransformFunctionType functionType) { - // register function behavior that's different from Calcite - if (functionType.getOperandTypeChecker() != null && functionType.getReturnTypeInference() != null) { - PinotSqlTransformFunction sqlTransformFunction = - new PinotSqlTransformFunction(functionName.toUpperCase(Locale.ROOT), - functionType.getSqlKind(), functionType.getReturnTypeInference(), null, - functionType.getOperandTypeChecker(), functionType.getSqlFunctionCategory()); - if (notRegistered(sqlTransformFunction)) { - register(sqlTransformFunction); - } - } - } - - private boolean notRegistered(SqlFunction op) { - List operatorList = new ArrayList<>(); - lookupOperatorOverloads(op.getNameAsId(), op.getFunctionType(), op.getSyntax(), operatorList, - SqlNameMatchers.withCaseSensitive(false)); - return operatorList.size() == 0; - } - - private boolean notRegistered(SqlOperator op) { - List operatorList = new ArrayList<>(); - lookupOperatorOverloads(op.getNameAsId(), null, op.getSyntax(), operatorList, - SqlNameMatchers.withCaseSensitive(false)); - return operatorList.size() == 0; - } -} diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/util/PinotSqlStdOperatorTable.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/util/PinotSqlStdOperatorTable.java new file mode 100644 index 00000000000..e3f6d8a7221 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/util/PinotSqlStdOperatorTable.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.calcite.sql.util; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.sql.PinotSqlCoalesceFunction; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.validate.SqlNameMatchers; +import org.apache.calcite.util.Util; +import org.apache.pinot.common.function.FunctionRegistry; + + +public class PinotSqlStdOperatorTable extends SqlStdOperatorTable { + private static PinotSqlStdOperatorTable _instance; + + // supplier instance. this should replace all lazy init static objects in the codebase + public static synchronized PinotSqlStdOperatorTable instance() { + if (_instance == null) { + // Creates and initializes the standard operator table. + // Uses two-phase construction, because we can't initialize the + // table until the constructor of the sub-class has completed. + _instance = new PinotSqlStdOperatorTable(); + _instance.initNoDuplicate(); + } + return _instance; + } + + /** + * Initialize without duplicate, e.g. when 2 duplicate operator is linked with the same op + * {@link org.apache.calcite.sql.SqlKind} it causes problem. + * + *

This is a direct copy of the {@link org.apache.calcite.sql.util.ReflectiveSqlOperatorTable} and can be hard to + * debug, suggest changing to a non-dynamic registration. Dynamic function support should happen via catalog. + * + * This also registers aggregation functions defined in {@link org.apache.pinot.segment.spi.AggregationFunctionType} + * which are multistage enabled. + */ + public final void initNoDuplicate() { + // Pinot supports native COALESCE function, thus no need to create CASE WHEN conversion. + register(new PinotSqlCoalesceFunction()); + // Ensure ArrayValueConstructor is registered before ArrayQueryConstructor + register(ARRAY_VALUE_CONSTRUCTOR); + + // TODO: reflection based registration is not ideal, we should use a static list of operators and register them + // Use reflection to register the expressions stored in public fields. + for (Field field : getClass().getFields()) { + try { + if (SqlFunction.class.isAssignableFrom(field.getType())) { + SqlFunction op = (SqlFunction) field.get(this); + if (op != null && notRegistered(op)) { + register(op); + } + } else if (SqlOperator.class.isAssignableFrom(field.getType())) { + SqlOperator op = (SqlOperator) field.get(this); + if (op != null && notRegistered(op)) { + register(op); + } + } + } catch (IllegalArgumentException | IllegalAccessException e) { + throw Util.throwAsRuntime(Util.causeOrSelf(e)); + } + } + } + + private boolean notRegistered(SqlFunction op) { + List operatorList = new ArrayList<>(); + lookupOperatorOverloads(op.getNameAsId(), op.getFunctionType(), op.getSyntax(), operatorList, + SqlNameMatchers.withCaseSensitive(FunctionRegistry.CASE_SENSITIVITY)); + return operatorList.size() == 0; + } + + private boolean notRegistered(SqlOperator op) { + List operatorList = new ArrayList<>(); + lookupOperatorOverloads(op.getNameAsId(), null, op.getSyntax(), operatorList, + SqlNameMatchers.withCaseSensitive(FunctionRegistry.CASE_SENSITIVITY)); + return operatorList.size() == 0; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 769d6a607fc..c835eb36cd9 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -35,7 +35,6 @@ import org.apache.calcite.plan.hep.HepMatchOrder; import org.apache.calcite.plan.hep.HepProgram; import org.apache.calcite.plan.hep.HepProgramBuilder; -import org.apache.calcite.prepare.PinotCalciteCatalogReader; import org.apache.calcite.prepare.Prepare; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; @@ -52,8 +51,8 @@ import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.fun.PinotOperatorTable; import org.apache.calcite.sql.util.PinotChainedSqlOperatorTable; +import org.apache.calcite.sql.util.PinotSqlStdOperatorTable; import org.apache.calcite.sql2rel.PinotConvertletTable; import org.apache.calcite.sql2rel.RelDecorrelator; import org.apache.calcite.sql2rel.SqlToRelConverter; @@ -61,6 +60,8 @@ import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.RelBuilder; import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.function.sql.PinotCalciteCatalogReader; +import org.apache.pinot.common.function.sql.PinotOperatorTable; import org.apache.pinot.query.context.PlannerContext; import org.apache.pinot.query.planner.PlannerUtils; import org.apache.pinot.query.planner.QueryPlan; @@ -113,6 +114,7 @@ public QueryEnvironment(TypeFactory typeFactory, CalciteSchema rootSchema, Worke _config = Frameworks.newConfigBuilder().traitDefs() .operatorTable(new PinotChainedSqlOperatorTable(Arrays.asList( + PinotSqlStdOperatorTable.instance(), PinotOperatorTable.instance(), _catalogReader))) .defaultSchema(_rootSchema.plus())