Skip to content

Commit

Permalink
create new function registry mechansim
Browse files Browse the repository at this point in the history
1. FunctionRegistry keeps the old FUNCTION_INFO_MAP only
2. moved Calcite Catalog-based schema.Function registry to its own package; along with a SqlOperator based PinotOperatorTable
3. both CatalogReader and OperatorTable utilizes ground truth function from PinotFunctionRegistry --> will be default once deprecate FunctionRegistry
4. PinotFunctionRegistry provides argument-type based lookup via the same method SqlValidator utilize to lookup routine (and lookup operator overload)
5. clean up multi-stage engine side accordingly
  • Loading branch information
Rong Rong committed Jan 4, 2024
1 parent 7a9c87c commit 89c8335
Show file tree
Hide file tree
Showing 14 changed files with 620 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,17 @@
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.impl.ReflectiveFunctionBase;
import org.apache.calcite.util.NameMultimap;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.function.registry.PinotFunction;
import org.apache.pinot.common.function.registry.PinotScalarFunction;
import org.apache.pinot.common.function.sql.PinotFunctionRegistry;
import org.apache.pinot.spi.annotations.ScalarFunction;
import org.apache.pinot.spi.utils.PinotReflectionUtils;
import org.slf4j.Logger;
Expand All @@ -46,15 +44,13 @@
* Registry for scalar functions.
*/
public class FunctionRegistry {
public static final boolean CASE_SENSITIVITY = false;
private static final Logger LOGGER = LoggerFactory.getLogger(FunctionRegistry.class);
// TODO: remove both when FunctionOperatorTable is in used.
private static final Map<String, Map<Integer, FunctionInfo>> FUNCTION_INFO_MAP = new HashMap<>();
private static final NameMultimap<PinotScalarFunction> FUNCTION_MAP = new NameMultimap<>();

private FunctionRegistry() {
}


/**
* Registers the scalar functions via reflection.
* NOTE: In order to plugin methods using reflection, the methods should be inside a class that includes ".function."
Expand All @@ -78,8 +74,8 @@ private FunctionRegistry() {
FunctionRegistry.registerFunction(method, scalarFunctionNames, nullableParameters);
}
}
LOGGER.info("Initialized FunctionRegistry with {} functions: {} in {}ms", FUNCTION_MAP.map().size(),
FUNCTION_MAP.map().keySet(), System.currentTimeMillis() - startTimeMs);
LOGGER.info("Initialized FunctionRegistry with {} functions: {} in {}ms", FUNCTION_INFO_MAP.size(),
FUNCTION_INFO_MAP.keySet(), System.currentTimeMillis() - startTimeMs);
}

/**
Expand All @@ -90,50 +86,29 @@ private FunctionRegistry() {
public static void init() {
}

/**
* Registers a method with the name of the method.
*/
@VisibleForTesting
public static void registerFunction(Method method, boolean nullableParameters) {
registerFunction(method, Collections.singleton(method.getName()), nullableParameters);
}

private static void registerFunction(Method method, Set<String> alias, boolean nullableParameters) {
if (method.getAnnotation(Deprecated.class) == null) {
for (String name : alias) {
registerFunctionInfoMap(name, method, nullableParameters);
registerCalciteNamedFunctionMap(name, method, nullableParameters);
}
}
}

private static void registerFunctionInfoMap(String functionName, Method method, boolean nullableParameters) {
FunctionInfo functionInfo = new FunctionInfo(method, method.getDeclaringClass(), nullableParameters);
String canonicalName = canonicalize(functionName);
Map<Integer, FunctionInfo> functionInfoMap = FUNCTION_INFO_MAP.computeIfAbsent(canonicalName, k -> new HashMap<>());
FunctionInfo existFunctionInfo = functionInfoMap.put(method.getParameterCount(), functionInfo);
Preconditions.checkState(existFunctionInfo == null || existFunctionInfo.getMethod() == functionInfo.getMethod(),
"Function: %s with %s parameters is already registered", functionName, method.getParameterCount());
}

private static void registerCalciteNamedFunctionMap(String name, Method method, boolean nullableParameters) {
FUNCTION_MAP.put(name, new PinotScalarFunction(name, method, nullableParameters));
}

public static Map<String, List<PinotScalarFunction>> getRegisteredCalciteFunctionMap() {
return FUNCTION_MAP.map();
@VisibleForTesting
public static Set<String> getRegisteredCalciteFunctionNames() {
return PinotFunctionRegistry.getFunctionMap().map().keySet();
}

public static Set<String> getRegisteredCalciteFunctionNames() {
return FUNCTION_MAP.map().keySet();
/**
* Returns the full list of all registered ScalarFunction to Calcite.
*/
public static Map<String, List<PinotFunction>> getRegisteredCalciteFunctionMap() {
return PinotFunctionRegistry.getFunctionMap().map();
}

/**
* Returns {@code true} if the given function name is registered, {@code false} otherwise.
*/
public static boolean containsFunction(String functionName) {
// TODO: remove fallback to FUNCTION_INFO_MAP
return FUNCTION_MAP.containsKey(canonicalize(functionName), false)
// TODO: remove deprecated FUNCTION_INFO_MAP
return PinotFunctionRegistry.getFunctionMap().containsKey(functionName, CASE_SENSITIVITY)
|| FUNCTION_INFO_MAP.containsKey(canonicalize(functionName));
}

Expand All @@ -143,40 +118,54 @@ public static boolean containsFunction(String functionName) {
* methods are already registered.
*/
@Nullable
public static FunctionInfo getFunctionInfo(String functionName, int numParameters) {
// TODO: remove fallback to FUNCTION_INFO_MAP
public static FunctionInfo getFunctionInfo(String functionName, int numParams) {
try {
return getFunctionInfoFromCalciteNamedMap(functionName, numParameters);
return getFunctionInfoFromCalciteNamedMap(functionName, numParams);
} catch (IllegalArgumentException iae) {
return getFunctionInfoFromFunctionInfoMap(functionName, numParameters);
// TODO: remove deprecated FUNCTION_INFO_MAP
return getFunctionInfoFromFunctionInfoMap(functionName, numParams);
}
}

// TODO: remove deprecated FUNCTION_INFO_MAP
private static void registerFunction(Method method, Set<String> alias, boolean nullableParameters) {
if (method.getAnnotation(Deprecated.class) == null) {
for (String name : alias) {
registerFunctionInfoMap(name, method, nullableParameters);
}
}
}

private static void registerFunctionInfoMap(String functionName, Method method, boolean nullableParameters) {
FunctionInfo functionInfo = new FunctionInfo(method, method.getDeclaringClass(), nullableParameters);
String canonicalName = canonicalize(functionName);
Map<Integer, FunctionInfo> functionInfoMap = FUNCTION_INFO_MAP.computeIfAbsent(canonicalName, k -> new HashMap<>());
FunctionInfo existFunctionInfo = functionInfoMap.put(method.getParameterCount(), functionInfo);
Preconditions.checkState(existFunctionInfo == null || existFunctionInfo.getMethod() == functionInfo.getMethod(),
"Function: %s with %s parameters is already registered", functionName, method.getParameterCount());
}

@Nullable
private static FunctionInfo getFunctionInfoFromFunctionInfoMap(String functionName, int numParameters) {
private static FunctionInfo getFunctionInfoFromFunctionInfoMap(String functionName, int numParams) {
Map<Integer, FunctionInfo> functionInfoMap = FUNCTION_INFO_MAP.get(canonicalize(functionName));
return functionInfoMap != null ? functionInfoMap.get(numParameters) : null;
return functionInfoMap != null ? functionInfoMap.get(numParams) : null;
}

@Nullable
private static FunctionInfo getFunctionInfoFromCalciteNamedMap(String functionName, int numParameters) {
List<PinotScalarFunction> candidates = findByNumParameters(FUNCTION_MAP.range(functionName, false), numParameters);
if (candidates.size() <= 1) {
return candidates.size() == 1 ? candidates.get(0).getFunctionInfo() : null;
private static FunctionInfo getFunctionInfoFromCalciteNamedMap(String functionName, int numParams) {
List<PinotScalarFunction> candidates = PinotFunctionRegistry.getFunctionMap()
.range(functionName, CASE_SENSITIVITY).stream()
.filter(e -> e.getValue() instanceof PinotScalarFunction && e.getValue().getParameters().size() == numParams)
.map(e -> (PinotScalarFunction) e.getValue()).collect(Collectors.toList());
if (candidates.size() == 1) {
return candidates.get(0).getFunctionInfo();
} else {
throw new IllegalArgumentException(
"Unable to lookup function: " + functionName + " by parameter count: " + numParameters
+ " Found multiple candidates. Try to use argument types to resolve the correct one!");
"Unable to lookup function: " + functionName + " by parameter count: " + numParams + " Found "
+ candidates.size() + " candidates. Try to use argument types to resolve the correct one!");
}
}

private static List<PinotScalarFunction> findByNumParameters(
Collection<Map.Entry<String, PinotScalarFunction>> scalarFunctionList, int numParameters) {
return scalarFunctionList == null ? Collections.emptyList()
: scalarFunctionList.stream().filter(e -> e.getValue().getParameters().size() == numParameters)
.map(Map.Entry::getValue).collect(Collectors.toList());
}

private static String canonicalize(String functionName) {
return StringUtils.remove(functionName, '_').toLowerCase();
}
Expand Down Expand Up @@ -207,40 +196,4 @@ public static double vectorSimilarity(float[] vector1, float[] vector2) {
throw new UnsupportedOperationException("Placeholder scalar function, should not reach here");
}
}

/**
* Pinot specific implementation of the {@link org.apache.calcite.schema.ScalarFunction}.
*
* @see "{@link org.apache.calcite.schema.impl.ScalarFunctionImpl}"
*/
public static class PinotScalarFunction extends ReflectiveFunctionBase
implements org.apache.calcite.schema.ScalarFunction {
private final FunctionInfo _functionInfo;
private final String _name;
private final Method _method;

public PinotScalarFunction(String name, Method method, boolean isNullableParameter) {
super(method);
_name = name;
_method = method;
_functionInfo = new FunctionInfo(method, method.getDeclaringClass(), isNullableParameter);
}

@Override
public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
return typeFactory.createJavaType(method.getReturnType());
}

public String getName() {
return _name;
}

public Method getMethod() {
return _method;
}

public FunctionInfo getFunctionInfo() {
return _functionInfo;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.function.registry;

import org.apache.calcite.schema.Function;
import org.apache.calcite.sql.type.SqlOperandTypeChecker;
import org.apache.calcite.sql.type.SqlReturnTypeInference;


public interface PinotFunction extends Function {
SqlOperandTypeChecker getOperandTypeChecker();
SqlReturnTypeInference getReturnTypeInference();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.function.registry;

import java.lang.reflect.Method;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScalarFunction;
import org.apache.calcite.schema.impl.ReflectiveFunctionBase;
import org.apache.calcite.sql.type.SqlOperandTypeChecker;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.pinot.common.function.FunctionInfo;


/**
* Pinot specific implementation of the {@link ScalarFunction}.
*
* @see "{@link org.apache.calcite.schema.impl.ScalarFunctionImpl}"
*/
public class PinotScalarFunction extends ReflectiveFunctionBase implements PinotFunction, ScalarFunction {
private final FunctionInfo _functionInfo;
private final String _name;
private final Method _method;
private final SqlOperandTypeChecker _sqlOperandTypeChecker;
private final SqlReturnTypeInference _sqlReturnTypeInference;

public PinotScalarFunction(String name, Method method, boolean isNullableParameter) {
this(name, method, isNullableParameter, null, null);
}

public PinotScalarFunction(String name, Method method, boolean isNullableParameter,
SqlOperandTypeChecker sqlOperandTypeChecker, SqlReturnTypeInference sqlReturnTypeInference) {
super(method);
_name = name;
_method = method;
_functionInfo = new FunctionInfo(method, method.getDeclaringClass(), isNullableParameter);
_sqlOperandTypeChecker = sqlOperandTypeChecker;
_sqlReturnTypeInference = sqlReturnTypeInference;
}

@Override
public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
return typeFactory.createJavaType(method.getReturnType());
}

public String getName() {
return _name;
}

public Method getMethod() {
return _method;
}

public FunctionInfo getFunctionInfo() {
return _functionInfo;
}

@Override
public SqlOperandTypeChecker getOperandTypeChecker() {
return _sqlOperandTypeChecker;
}

@Override
public SqlReturnTypeInference getReturnTypeInference() {
return _sqlReturnTypeInference;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.calcite.prepare;
package org.apache.pinot.common.function.sql;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
Expand All @@ -35,6 +35,8 @@
import org.apache.calcite.linq4j.function.Hints;
import org.apache.calcite.model.ModelHandler;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
Expand Down Expand Up @@ -310,7 +312,7 @@ public static SqlOperatorTable operatorTable(String... classNames) {
}

/** Converts a function to a {@link org.apache.calcite.sql.SqlOperator}. */
private static SqlOperator toOp(SqlIdentifier name,
public static SqlOperator toOp(SqlIdentifier name,
final org.apache.calcite.schema.Function function) {
final Function<RelDataTypeFactory, List<RelDataType>> argTypesFactory =
typeFactory -> function.getParameters()
Expand Down
Loading

0 comments on commit 89c8335

Please sign in to comment.