Skip to content

Commit

Permalink
feat(sql): support use func with expr params in filter (#473)
Browse files Browse the repository at this point in the history
支持在filter里使用参数为表达式的函数
更新剩余TPC-H语句准备测试
  • Loading branch information
jzl18thu authored Oct 24, 2024
1 parent aff9e62 commit 7f0b139
Show file tree
Hide file tree
Showing 45 changed files with 3,568 additions and 491 deletions.
12 changes: 6 additions & 6 deletions antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ andExpression
;

predicate
: (KEY | path | functionName LR_BRACKET path RR_BRACKET) comparisonOperator constant
| constant comparisonOperator (KEY | path | functionName LR_BRACKET path RR_BRACKET)
: (KEY | path) comparisonOperator constant
| constant comparisonOperator (KEY | path)
| path comparisonOperator path
| path OPERATOR_NOT? stringLikeOperator regex = stringLiteral
| OPERATOR_NOT? LR_BRACKET orExpression RR_BRACKET
Expand All @@ -198,11 +198,11 @@ predicate

predicateWithSubquery
: OPERATOR_NOT? EXISTS subquery
| (path | constant | functionName LR_BRACKET path RR_BRACKET) OPERATOR_NOT? IN subquery
| (path | constant | functionName LR_BRACKET path RR_BRACKET) comparisonOperator quantifier subquery
| (path | constant | functionName LR_BRACKET path RR_BRACKET) comparisonOperator subquery
| subquery comparisonOperator (path | constant | functionName LR_BRACKET path RR_BRACKET)
| (path | constant | expression) OPERATOR_NOT? IN subquery
| (path | constant | expression) comparisonOperator quantifier subquery
| subquery comparisonOperator subquery
| (path | constant | expression) comparisonOperator subquery
| subquery comparisonOperator (path | constant | expression)
;

quantifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ private static void checkSubQueryHasFreeVariables(UnarySelectStatement selectSta
selectStatement.initFreeVariables();
List<String> freeVariables = selectStatement.getFreeVariables();
if (!freeVariables.isEmpty()) {
throw new RuntimeException("Unexpected paths' name: " + freeVariables + ".");
throw new RuntimeException(
String.format(
"Unexpected paths' name: %s, check if there exists missing prefix.", freeVariables));
}
}

Expand Down Expand Up @@ -733,7 +735,7 @@ private static Operator buildReorder(UnarySelectStatement selectStatement, Opera
if (selectStatement.isLastFirst()) {
root = new Reorder(new OperatorSource(root), Arrays.asList("path", "value"));
} else if (hasFuncWithArgs) {
root = new Reorder(new OperatorSource(root), Collections.singletonList("*"));
root = new Reorder(new OperatorSource(root), new ArrayList<>(Collections.singletonList("*")));
} else {
List<String> order = new ArrayList<>();
List<Boolean> isPyUDF = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,21 @@ private static Filter setTrue(Filter filter, Predicate<String> predicate) {
return new BoolFilter(true);
}
return filter;
case Expr:
ExprFilter exprFilter = (ExprFilter) filter;
List<String> pathAList = ExprUtils.getPathFromExpr(exprFilter.getExpressionA());
List<String> pathBList = ExprUtils.getPathFromExpr(exprFilter.getExpressionB());
boolean pathAHasStar = pathAList.stream().anyMatch(s -> s.contains("*"));
boolean pathBHasStar = pathBList.stream().anyMatch(s -> s.contains("*"));
if (Op.isOrOp(((ExprFilter) filter).getOp()) && (pathAHasStar || pathBHasStar)) {
return new BoolFilter(true);
}
boolean matchPathA = pathAList.stream().allMatch(predicate);
boolean matchPathB = pathBList.stream().allMatch(predicate);
if (!matchPathA || !matchPathB) {
return new BoolFilter(true);
}
return filter;
default:
return filter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.expr.*;
import cn.edu.tsinghua.iginx.engine.shared.function.Function;
import cn.edu.tsinghua.iginx.engine.shared.function.FunctionCall;
import cn.edu.tsinghua.iginx.engine.shared.function.FunctionParams;
import cn.edu.tsinghua.iginx.engine.shared.function.MappingType;
import cn.edu.tsinghua.iginx.engine.shared.function.RowMappingFunction;
Expand Down Expand Up @@ -130,14 +131,9 @@ private static Value calculateFuncExprNative(Row row, FuncExpression funcExpr)
funcExpr.getArgs(),
funcExpr.getKvargs(),
funcExpr.isDistinct());
Row ret;
try {
ret = rowMappingFunction.transform(row, params);
} catch (Exception e) {
throw new PhysicalTaskExecuteFailureException(
"encounter error when execute row mapping function " + rowMappingFunction.getIdentifier(),
e);
}
FunctionCall functionCall = new FunctionCall(rowMappingFunction, params);

Row ret = RowUtils.calRowTransform(row, Collections.singletonList(functionCall), false);
int retValueSize = ret.getValues().length;
if (retValueSize != 1) {
throw new InvalidOperatorParameterException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ public static List<Row> cacheFilterResult(List<Row> rows, Filter filter)
try {
return FilterUtils.validate(filter, row);
} catch (PhysicalException e) {
LOGGER.error("execute parallel filter error, cause by: ", e.getCause());
LOGGER.error("execute parallel filter error, cause by: ", e);
return false;
}
})
Expand All @@ -896,7 +896,7 @@ public static List<Row> cacheFilterResult(List<Row> rows, Filter filter)
try {
return FilterUtils.validate(filter, row);
} catch (PhysicalException e) {
LOGGER.error("execute sequence filter error, cause by: ", e.getCause());
LOGGER.error("execute sequence filter error, cause by: ", e);
return false;
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public String getColumnName() {
// 如果是小数,保留小数点后5位
if (value instanceof Double || value instanceof Float) {
return String.format("%.5f", value);
} else if (value instanceof byte[]) {
return "'" + new String((byte[]) value) + "'";
}
return value.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package cn.edu.tsinghua.iginx.engine.shared.expr;

import cn.edu.tsinghua.iginx.engine.shared.function.FunctionUtils;
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -98,6 +99,12 @@ public String getColumnName() {
for (Expression expression : expressions) {
columnName.append(expression.getColumnName()).append(", ");
}
for (Object arg : args) {
columnName.append(ValueUtils.toString(arg)).append(", ");
}
for (Map.Entry<String, Object> kvarg : kvargs.entrySet()) {
columnName.append(kvarg.getValue()).append(", ");
}
columnName.setLength(columnName.length() - 2);
columnName.append(")");
return columnName.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class FunctionUtils {
private static final String VALUE = "value";

private static final Set<String> sysRowToRowFunctionSet =
new HashSet<>(Collections.singletonList("ratio"));
new HashSet<>(Arrays.asList("ratio", "substring"));

private static final Set<String> sysSetToRowFunctionSet =
new HashSet<>(
Expand Down Expand Up @@ -165,6 +165,7 @@ public static String getFunctionName(Function function) {

static Map<String, Integer> expectedParamNumMap = new HashMap<>(); // 此Map用于存储function期望的参数个数

// TODO
static {
expectedParamNumMap.put("avg", 1);
expectedParamNumMap.put("sum", 1);
Expand All @@ -176,6 +177,7 @@ public static String getFunctionName(Function function) {
expectedParamNumMap.put("first", 1);
expectedParamNumMap.put("last", 1);
expectedParamNumMap.put("ratio", 2);
expectedParamNumMap.put("substring", 1);
}

public static int getExpectedParamNum(String identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import cn.edu.tsinghua.iginx.engine.shared.function.system.Max;
import cn.edu.tsinghua.iginx.engine.shared.function.system.Min;
import cn.edu.tsinghua.iginx.engine.shared.function.system.Ratio;
import cn.edu.tsinghua.iginx.engine.shared.function.system.SubString;
import cn.edu.tsinghua.iginx.engine.shared.function.system.Sum;
import cn.edu.tsinghua.iginx.engine.shared.function.udf.python.PyUDAF;
import cn.edu.tsinghua.iginx.engine.shared.function.udf.python.PyUDF;
Expand Down Expand Up @@ -100,6 +101,7 @@ private void initSystemFunctions() {
registerFunction(Sum.getInstance());
registerFunction(ArithmeticExpr.getInstance());
registerFunction(Ratio.getInstance());
registerFunction(SubString.getInstance());
}

private void initBasicUDFFunctions() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* IGinX - the polystore system with high performance
* Copyright (C) Tsinghua University
* [email protected]
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package cn.edu.tsinghua.iginx.engine.shared.function.system;

import cn.edu.tsinghua.iginx.engine.shared.data.Value;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.function.FunctionParams;
import cn.edu.tsinghua.iginx.engine.shared.function.FunctionType;
import cn.edu.tsinghua.iginx.engine.shared.function.MappingType;
import cn.edu.tsinghua.iginx.engine.shared.function.RowMappingFunction;
import cn.edu.tsinghua.iginx.thrift.DataType;
import java.util.Arrays;
import java.util.Collections;

public class SubString implements RowMappingFunction {

public static final String SUB_STRING = "substring";

private static final SubString INSTANCE = new SubString();

private SubString() {}

public static SubString getInstance() {
return INSTANCE;
}

@Override
public FunctionType getFunctionType() {
return FunctionType.System;
}

@Override
public MappingType getMappingType() {
return MappingType.RowMapping;
}

@Override
public String getIdentifier() {
return SUB_STRING;
}

@Override
public Row transform(Row row, FunctionParams params) throws Exception {
if (params.getPaths().size() != 1 || params.getArgs().size() != 2) {
throw new IllegalArgumentException("Unexpected params for substring.");
}

String path = params.getPaths().get(0);
Value valueA = row.getAsValue(path);
if (valueA == null || valueA.isNull()) {
return Row.EMPTY_ROW;
}
if (valueA.getDataType() != DataType.BINARY) {
throw new IllegalArgumentException("Unexpected data type for substring function.");
}

long start, length;
if (!(params.getArgs().get(0) instanceof Long)) {
throw new IllegalArgumentException("The 2nd arg 'start' for substring should be a number.");
}
if (!(params.getArgs().get(1) instanceof Long)) {
throw new IllegalArgumentException("The 3rd arg 'length' for substring should be a number.");
}

start = (Long) params.getArgs().get(0);
length = (Long) params.getArgs().get(1);
byte[] original = valueA.getBinaryV();
byte[] ret = Arrays.copyOfRange(original, (int) (start - 1), (int) length);

Header newHeader =
new Header(
row.getHeader().getKey(),
Collections.singletonList(
new Field(
"substring(" + path + ", " + start + ", " + length + ")", DataType.BINARY)));
return new Row(newHeader, row.getKey(), new Object[] {ret});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ public static String toString(Object value, DataType dataType) {
return "";
}

public static String toString(Object value) {
if (value instanceof byte[]) {
return new String((byte[]) value);
} else {
return value.toString();
}
}

public static int getHash(Value value, boolean needTypeCast) {
if (needTypeCast) {
value = ValueUtils.transformToDouble(value);
Expand Down
Loading

0 comments on commit 7f0b139

Please sign in to comment.