Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): Fix passing Array to stored procedures for interactive. #3436

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,30 @@ void put_argment(Encoder& encoder, const query::Argument& argment) {
case common::Value::kStr:
encoder.put_string(value.str());
break;
case common::Value::kStrArray:
encoder.put_int(value.str_array().item_size());
for (auto i = 0; i < value.str_array().item_size(); ++i) {
encoder.put_string(value.str_array().item(i));
}
break;
case common::Value::kF64Array:
encoder.put_int(value.f64_array().item_size());
for (auto i = 0; i < value.f64_array().item_size(); ++i) {
encoder.put_double(value.f64_array().item(i));
}
break;
case common::Value::kI32Array:
encoder.put_int(value.i32_array().item_size());
for (auto i = 0; i < value.i32_array().item_size(); ++i) {
encoder.put_int(value.i32_array().item(i));
}
break;
case common::Value::kI64Array:
encoder.put_int(value.i64_array().item_size());
for (auto i = 0; i < value.i64_array().item_size(); ++i) {
encoder.put_long(value.i64_array().item(i));
}
break;
default:
LOG(ERROR) << "Not recognizable param type" << static_cast<int>(item_case);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package com.alibaba.graphscope.common.ir.meta.procedure;

import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.FrontendConfig;
import com.alibaba.graphscope.common.ir.type.GraphTypeFactoryImpl;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.*;
import org.yaml.snakeyaml.Yaml;

Expand All @@ -33,7 +34,11 @@
import java.util.stream.Collectors;

public class StoredProcedureMeta {
private static final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
private static final RelDataTypeFactory typeFactory =
new GraphTypeFactoryImpl(
new Configs(
ImmutableMap.of(
FrontendConfig.CALCITE_DEFAULT_CHARSET.getKey(), "UTF-8")));

private final String name;
private final RelDataType returnType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public List<InputStream> getStoredProcedures() throws IOException {
getProcedureNameWithInputStream(procedureDir);
for (String enableProcedure : enableProcedureList) {
InputStream enableInput = procedureInputMap.get(enableProcedure);
if (enableInput == null) {
logger.error("procedure not found {}", enableProcedure);
continue;
}
Preconditions.checkArgument(
enableInput != null,
"can not find procedure with name=%s under directory=%s, candidates are %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package com.alibaba.graphscope.common.ir.runtime;

import com.alibaba.graphscope.common.ir.runtime.proto.Utils;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.ir.runtime.proto.RexToProtoConverter;
import com.alibaba.graphscope.common.ir.tools.GraphPlanner;
import com.alibaba.graphscope.common.ir.tools.LogicalPlan;
import com.alibaba.graphscope.common.store.IrMeta;
import com.alibaba.graphscope.gaia.proto.Common;
import com.alibaba.graphscope.gaia.proto.StoredProcedure;
import com.google.protobuf.InvalidProtocolBufferException;
Expand All @@ -31,12 +34,18 @@
public class ProcedurePhysicalBuilder extends PhysicalBuilder {
private final StoredProcedure.Query.Builder builder;

public ProcedurePhysicalBuilder(LogicalPlan logicalPlan) {
public ProcedurePhysicalBuilder(Configs configs, IrMeta irMeta, LogicalPlan logicalPlan) {
super(logicalPlan);
this.builder = StoredProcedure.Query.newBuilder();
RexCall procedureCall = (RexCall) logicalPlan.getProcedureCall();
setStoredProcedureName(procedureCall, builder);
setStoredProcedureArgs(procedureCall, builder);
setStoredProcedureArgs(
procedureCall,
builder,
new RexToProtoConverter(
true,
irMeta.getSchema().isColumnId(),
GraphPlanner.rexBuilderFactory.apply(configs)));
}

private void setStoredProcedureName(
Expand All @@ -46,14 +55,16 @@ private void setStoredProcedureName(
}

private void setStoredProcedureArgs(
RexCall procedureCall, StoredProcedure.Query.Builder builder) {
RexCall procedureCall,
StoredProcedure.Query.Builder builder,
RexToProtoConverter converter) {
List<RexNode> operands = procedureCall.getOperands();
for (int i = 0; i < operands.size(); ++i) {
builder.addArguments(
StoredProcedure.Argument.newBuilder()
// param name is omitted
.setParamInd(i)
.setValue(Utils.protoValue((RexLiteral) operands.get(i)))
.setValue(operands.get(i).accept(converter).getOperators(0).getConst())
.build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.calcite.rex.*;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.ArraySqlType;
import org.apache.calcite.util.Sarg;

import java.util.List;
Expand Down Expand Up @@ -88,6 +89,20 @@ private OuterExpression.Expression visitCase(RexCall call) {
}

private OuterExpression.Expression visitArrayValueConstructor(RexCall call) {
// convert to literal if operands have the same primitive type
if (call.getOperands().stream().allMatch(k -> k instanceof RexLiteral)
&& call.getType() instanceof ArraySqlType) {
return OuterExpression.Expression.newBuilder()
.addOperators(
OuterExpression.ExprOpr.newBuilder()
.setConst(
Utils.protoValue(
call.getOperands(),
((ArraySqlType) call.getType())
.getComponentType()))
.build())
.build();
}
OuterExpression.VariableKeys.Builder varsBuilder =
OuterExpression.VariableKeys.newBuilder();
call.getOperands()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;
Expand Down Expand Up @@ -116,6 +117,85 @@ public static final Common.Value protoValue(RexLiteral literal) {
}
}

public static final Common.Value protoValue(List<RexNode> literals, RelDataType literalType) {
switch (literalType.getSqlTypeName()) {
case INTEGER:
return Common.Value.newBuilder()
.setI32Array(
Common.I32Array.newBuilder()
.addAllItem(
literals.stream()
.map(
k ->
((Number)
((RexLiteral)
k)
.getValue())
.intValue())
.collect(Collectors.toList())))
.build();
case BIGINT:
return Common.Value.newBuilder()
.setI64Array(
Common.I64Array.newBuilder()
.addAllItem(
literals.stream()
.map(
k ->
((Number)
((RexLiteral)
k)
.getValue())
.longValue())
.collect(Collectors.toList())))
.build();
case FLOAT:
case DOUBLE:
return Common.Value.newBuilder()
.setF64Array(
Common.DoubleArray.newBuilder()
.addAllItem(
literals.stream()
.map(
k ->
((Number)
((RexLiteral)
k)
.getValue())
.doubleValue())
.collect(Collectors.toList())))
.build();
case CHAR:
return Common.Value.newBuilder()
.setStrArray(
Common.StringArray.newBuilder()
.addAllItem(
literals.stream()
.map(
k -> {
RexLiteral literal =
(RexLiteral) k;
if (literal.getValue()
instanceof NlsString) {
return ((NlsString)
literal
.getValue())
.getValue();
} else {
return (String)
literal.getValue();
}
})
.collect(Collectors.toList())))
.build();
default:
throw new UnsupportedOperationException(
"list of literal type "
+ literalType.getSqlTypeName()
+ " can not be converted to ir core array");
}
}

public static final OuterExpression.Property protoProperty(GraphProperty property) {
switch (property.getOpt()) {
case ID:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public PhysicalPlan planPhysical(LogicalPlan logicalPlan) {
throw new RuntimeException(e);
}
} else {
return new ProcedurePhysicalBuilder(logicalPlan).build();
return new ProcedurePhysicalBuilder(graphConfig, irMeta, logicalPlan).build();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,19 @@ protected AnyValue parseEntry(IrResult.Entry entry, @Nullable RelDataType dataTy
switch (dataType.getSqlTypeName()) {
case MULTISET:
case ARRAY:
if (dataType instanceof ArbitraryArrayType) {
return parseCollection(
entry.getCollection(),
((ArbitraryArrayType) dataType).getComponentTypes());
} else {
return parseCollection(entry.getCollection(), dataType.getComponentType());
switch (entry.getInnerCase()) {
case COLLECTION:
if (dataType instanceof ArbitraryArrayType) {
return parseCollection(
entry.getCollection(),
((ArbitraryArrayType) dataType).getComponentTypes());
} else {
return parseCollection(
entry.getCollection(), dataType.getComponentType());
}
case ELEMENT:
default:
return parseElement(entry.getElement(), dataType.getComponentType());
}
case MAP:
if (dataType instanceof ArbitraryMapType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public void logical_plan_4_test() throws Exception {
LogicalPlan logicalPlan =
com.alibaba.graphscope.cypher.antlr4.Utils.evalLogicalPlan(
"Call ldbc_ic2(10l, 20120112l)");
try (PhysicalBuilder ffiBuilder = new ProcedurePhysicalBuilder(logicalPlan)) {
try (PhysicalBuilder ffiBuilder =
new ProcedurePhysicalBuilder(getMockGraphConfig(), Utils.schemaMeta, logicalPlan)) {
PhysicalPlan plan = ffiBuilder.build();
Assert.assertEquals(
FileUtils.readJsonFromResource("call_procedure.json"), plan.explain());
Expand Down
Loading