From 4f28814f12e8765418f8d17bf47a06e2f7db7ba5 Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Fri, 4 Aug 2023 18:06:41 -0700 Subject: [PATCH] No reflection for function registery in v2 --- .../calcite/sql/fun/PinotOperatorTable.java | 250 +++++++++++++++--- 1 file changed, 211 insertions(+), 39 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java index 2ee178419ffc..aef949246faf 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java @@ -18,19 +18,21 @@ */ package org.apache.calcite.sql.fun; -import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Set; import org.apache.calcite.sql.PinotSqlAggFunction; import org.apache.calcite.sql.PinotSqlTransformFunction; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.validate.SqlNameMatchers; -import org.apache.calcite.util.Util; import org.apache.pinot.common.function.TransformFunctionType; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -46,11 +48,10 @@ */ @SuppressWarnings("unused") // unused fields are accessed by reflection public class PinotOperatorTable extends SqlStdOperatorTable { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotOperatorTable.class); private static @MonotonicNonNull PinotOperatorTable _instance; - public static final SqlFunction COALESCE = new PinotSqlCoalesceFunction(); - // TODO: clean up lazy init by using Suppliers.memorized(this::computeInstance) and make getter wrapped around // supplier instance. this should replace all lazy init static objects in the codebase public static synchronized PinotOperatorTable instance() { @@ -59,53 +60,46 @@ public static synchronized PinotOperatorTable instance() { // Uses two-phase construction, because we can't initialize the // table until the constructor of the sub-class has completed. _instance = new PinotOperatorTable(); - _instance.initNoDuplicate(); + _instance.initialize(); } return _instance; } /** - * Initialize without duplicate, e.g. when 2 duplicate operator is linked with the same op - * {@link org.apache.calcite.sql.SqlKind} it causes problem. - * - *

This is a direct copy of the {@link org.apache.calcite.sql.util.ReflectiveSqlOperatorTable} and can be hard to - * debug, suggest changing to a non-dynamic registration. Dynamic function support should happen via catalog. + * Initialize sql functions and operators. + * All duplicates operators linked with the same {@link org.apache.calcite.sql.SqlKind} will cause failure and need + * to resolved manually by dev. * * This also registers aggregation functions defined in {@link org.apache.pinot.segment.spi.AggregationFunctionType} * which are multistage enabled. */ - public final void initNoDuplicate() { - // Use reflection to register the expressions stored in public fields. - for (Field field : getClass().getFields()) { - try { - if (SqlFunction.class.isAssignableFrom(field.getType())) { - SqlFunction op = (SqlFunction) field.get(this); - if (op != null && notRegistered(op)) { - register(op); - } - } else if ( - SqlOperator.class.isAssignableFrom(field.getType())) { - SqlOperator op = (SqlOperator) field.get(this); - if (op != null && notRegistered(op)) { - register(op); - } - } - } catch (IllegalArgumentException | IllegalAccessException e) { - throw Util.throwAsRuntime(Util.causeOrSelf(e)); - } - } + public final void initialize() { + // Register some hand-picked Calcite standard SQL functions first, the list is a subset of functions/operators + // from SqlStdOperatorTable. + // Note, if Pinot functions duplicate in this list, we need to manually comment the functions registered. + initCalciteStandardSql(); + + // Register Pinot specific SqlFunction or SqlOperator. + initPinotOverrideSql(); // Walk through all the Pinot aggregation types and // 1. register those that are supported in multistage in addition to calcite standard opt table. // 2. register special handling that differs from calcite standard. for (AggregationFunctionType aggregationFunctionType : AggregationFunctionType.values()) { if (aggregationFunctionType.getSqlKind() != null) { + Set registeredFunctionNames = new HashSet<>(); + String aggregationFunctionTypeName = aggregationFunctionType.getName(); // 1. Register the aggregation function with Calcite - registerAggregateFunction(aggregationFunctionType.getName(), aggregationFunctionType); + registerAggregateFunction(aggregationFunctionTypeName, aggregationFunctionType); + registeredFunctionNames.add(aggregationFunctionTypeName.toUpperCase()); // 2. Register the aggregation function with Calcite on all alternative names List alternativeFunctionNames = aggregationFunctionType.getAlternativeNames(); for (String alternativeFunctionName : alternativeFunctionNames) { + if (registeredFunctionNames.contains(alternativeFunctionName.toUpperCase())) { + continue; + } registerAggregateFunction(alternativeFunctionName, aggregationFunctionType); + registeredFunctionNames.add(alternativeFunctionName.toUpperCase()); } } } @@ -115,17 +109,196 @@ public final void initNoDuplicate() { // 2. register special handling that differs from calcite standard. for (TransformFunctionType transformFunctionType : TransformFunctionType.values()) { if (transformFunctionType.getSqlKind() != null) { + Set registeredFunctionNames = new HashSet<>(); + String transformFunctionTypeName = transformFunctionType.getName(); // 1. Register the aggregation function with Calcite - registerTransformFunction(transformFunctionType.getName(), transformFunctionType); + registerTransformFunction(transformFunctionTypeName, transformFunctionType); + registeredFunctionNames.add(transformFunctionTypeName.toUpperCase()); + // 2. Register the aggregation function with Calcite on all alternative names List alternativeFunctionNames = transformFunctionType.getAlternativeNames(); for (String alternativeFunctionName : alternativeFunctionNames) { + if (registeredFunctionNames.contains(alternativeFunctionName.toUpperCase())) { + continue; + } registerTransformFunction(alternativeFunctionName, transformFunctionType); + registeredFunctionNames.add(alternativeFunctionName.toUpperCase()); } } } } + private void initPinotOverrideSql() { + register(new PinotSqlCoalesceFunction()); + } + + private void initCalciteStandardSql() { + register(UNION); + register(UNION_ALL); + register(EXCEPT); + register(EXCEPT_ALL); + register(INTERSECT); + register(INTERSECT_ALL); + register(AND); + register(AS); + register(FILTER); + register(CONCAT); + register(DIVIDE); + register(PERCENT_REMAINDER); + register(DOT); + register(EQUALS); + register(GREATER_THAN); + register(IS_DISTINCT_FROM); + register(IS_NOT_DISTINCT_FROM); + register(IS_DIFFERENT_FROM); + register(GREATER_THAN_OR_EQUAL); + register(IN); + register(NOT_IN); + register(LESS_THAN); + register(LESS_THAN_OR_EQUAL); + register(MINUS); + register(MULTIPLY); + register(NOT_EQUALS); + register(OR); + register(PLUS); + register(DESC); + register(NULLS_FIRST); + register(NULLS_LAST); + register(IS_NOT_NULL); + register(IS_NULL); + register(IS_NOT_TRUE); + register(IS_TRUE); + register(IS_NOT_FALSE); + register(IS_FALSE); + register(EXISTS); + register(NOT); + register(UNARY_MINUS); + register(UNARY_PLUS); + register(EXPLICIT_TABLE); + register(SUM); + register(COUNT); + register(MODE); + register(APPROX_COUNT_DISTINCT); + register(MIN); + register(MAX); + register(LAST_VALUE); + register(ANY_VALUE); + register(FIRST_VALUE); + register(NTH_VALUE); + register(LEAD); + register(SINGLE_VALUE); + register(AVG); + register(STDDEV_POP); + register(COVAR_POP); + register(COVAR_SAMP); + register(STDDEV); + register(VARIANCE); + register(BIT_AND); + register(BIT_OR); + register(BIT_XOR); + register(HISTOGRAM_AGG); + register(HISTOGRAM_MIN); + register(HISTOGRAM_MAX); + register(HISTOGRAM_FIRST_VALUE); + register(HISTOGRAM_LAST_VALUE); + register(SUM0); + register(DENSE_RANK); + register(PERCENT_RANK); + register(RANK); + register(ROW_NUMBER); + register(ROW); + register(ARRAY_VALUE_CONSTRUCTOR); + register(MAP_VALUE_CONSTRUCTOR); + register(UNNEST); + register(LATERAL); + register(CONTAINS); + register(EQUALS); + register(VALUES); + register(JSON_EXISTS); + register(JSON_VALUE); + register(JSON_QUERY); + register(JSON_DEPTH); + register(JSON_KEYS); + register(JSON_LENGTH); + register(JSON_PRETTY); + register(JSON_STORAGE_SIZE); + register(JSON_TYPE); + register(BETWEEN); + register(SYMMETRIC_BETWEEN); + register(NOT_BETWEEN); + register(SYMMETRIC_NOT_BETWEEN); + register(NOT_LIKE); + register(LIKE); + register(ESCAPE); + register(CASE); + register(OVER); + register(SUBSTRING); + register(REPLACE); + register(OVERLAY); + register(TRIM); + register(POSITION); + register(CHAR_LENGTH); + register(CHARACTER_LENGTH); + register(OCTET_LENGTH); + register(UPPER); + register(LOWER); + register(ASCII); + register(POWER); + register(SQRT); + register(MOD); + register(LN); + register(LOG10); + register(ABS); + register(ACOS); + register(ASIN); + register(ATAN); + register(ATAN2); + register(CBRT); + register(COS); + register(COT); + register(DEGREES); + register(EXP); + register(RADIANS); + register(ROUND); + register(SIGN); + register(SIN); + register(TAN); + register(TRUNCATE); + register(PI); + register(FIRST); + register(LAST); + register(NULLIF); + register(FLOOR); + register(CEIL); + register(LOCALTIME); + register(LOCALTIMESTAMP); + register(CURRENT_TIME); + register(CURRENT_TIMESTAMP); + register(CURRENT_DATE); + register(TIMESTAMP_ADD); + register(TIMESTAMP_DIFF); + register(CAST); + register(EXTRACT); + register(YEAR); + register(QUARTER); + register(MONTH); + register(WEEK); + register(DAYOFWEEK); + register(DAYOFMONTH); + register(DAYOFYEAR); + register(HOUR); + register(MINUTE); + register(SECOND); + register(LAST_DAY); + register(ELEMENT); + register(ITEM); + register(CARDINALITY); + register(INTERSECTION); + register(CURRENT_VALUE); + register(TUMBLE); + register(HOP); + } + private void registerAggregateFunction(String functionName, AggregationFunctionType functionType) { // register function behavior that's different from Calcite if (functionType.getOperandTypeChecker() != null && functionType.getReturnTypeInference() != null) { @@ -133,7 +306,10 @@ private void registerAggregateFunction(String functionName, AggregationFunctionT functionType.getSqlKind(), functionType.getReturnTypeInference(), null, functionType.getOperandTypeChecker(), functionType.getSqlFunctionCategory()); if (notRegistered(sqlAggFunction)) { + LOGGER.info("Registering Pinot Aggregation Function {}", functionName); register(sqlAggFunction); + } else { + LOGGER.error("Pinot Aggregation Function {} is already registered", functionName); } } } @@ -146,7 +322,10 @@ private void registerTransformFunction(String functionName, TransformFunctionTyp functionType.getSqlKind(), functionType.getReturnTypeInference(), null, functionType.getOperandTypeChecker(), functionType.getSqlFunctionCategory()); if (notRegistered(sqlTransformFunction)) { + LOGGER.info("Registering Pinot Transform Function {}", functionName); register(sqlTransformFunction); + } else { + LOGGER.error("Pinot Transform Function {} is already registered", functionName); } } } @@ -157,11 +336,4 @@ private boolean notRegistered(SqlFunction op) { SqlNameMatchers.withCaseSensitive(false)); return operatorList.size() == 0; } - - private boolean notRegistered(SqlOperator op) { - List operatorList = new ArrayList<>(); - lookupOperatorOverloads(op.getNameAsId(), null, op.getSyntax(), operatorList, - SqlNameMatchers.withCaseSensitive(false)); - return operatorList.size() == 0; - } }