From 55abeed6585e37833ae22ea24b338677026dee78 Mon Sep 17 00:00:00 2001 From: ZackYoung Date: Mon, 6 Nov 2023 21:36:16 +0800 Subject: [PATCH 1/2] fix_udf_python --- .../python/PythonScalarFunction.java | 164 ++++++++++++++++++ .../functions/python/PythonTableFunction.java | 163 +++++++++++++++++ .../python/PythonScalarFunction.java | 164 ++++++++++++++++++ .../functions/python/PythonTableFunction.java | 163 +++++++++++++++++ .../python/PythonScalarFunction.java | 164 ++++++++++++++++++ .../functions/python/PythonTableFunction.java | 163 +++++++++++++++++ .../java/org/dinky/function/util/UDFUtil.java | 4 +- .../src/main/resources/getPyFuncList.py | 67 +++++-- 8 files changed, 1037 insertions(+), 15 deletions(-) create mode 100644 dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java create mode 100644 dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java create mode 100644 dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java create mode 100644 dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java create mode 100644 dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java create mode 100644 dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java new file mode 100644 index 0000000000..190baa7fbd --- /dev/null +++ b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.functions.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.table.types.inference.TypeStrategies; +import org.apache.flink.table.types.utils.TypeConversions; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * The wrapper of user defined python scalar function. + */ +@Internal +public class PythonScalarFunction extends ScalarFunction implements PythonFunction { + + private static final long serialVersionUID = 1L; + + private final String name; + private final byte[] serializedScalarFunction; + private final PythonFunctionKind pythonFunctionKind; + private final boolean deterministic; + private final PythonEnv pythonEnv; + private final boolean takesRowAsInput; + + private TypeInformation[] inputTypes; + private String[] inputTypesString; + private TypeInformation resultType; + private String resultTypeString; + + public PythonScalarFunction( + String name, + byte[] serializedScalarFunction, + TypeInformation[] inputTypes, + TypeInformation resultType, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypes = inputTypes; + this.resultType = resultType; + } + + public PythonScalarFunction( + String name, + byte[] serializedScalarFunction, + String[] inputTypesString, + String resultTypeString, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypesString = inputTypesString; + this.resultTypeString = resultTypeString; + } + + public PythonScalarFunction( + String name, + byte[] serializedScalarFunction, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this.name = name; + this.serializedScalarFunction = serializedScalarFunction; + this.pythonFunctionKind = pythonFunctionKind; + this.deterministic = deterministic; + this.pythonEnv = pythonEnv; + this.takesRowAsInput = takesRowAsInput; + } + + public Object eval(Object... args) { + throw new UnsupportedOperationException("This method is a placeholder and should not be called."); + } + + @Override + public byte[] getSerializedPythonFunction() { + return serializedScalarFunction; + } + + @Override + public PythonEnv getPythonEnv() { + return pythonEnv; + } + + @Override + public PythonFunctionKind getPythonFunctionKind() { + return pythonFunctionKind; + } + + @Override + public boolean takesRowAsInput() { + return takesRowAsInput; + } + + @Override + public boolean isDeterministic() { + return deterministic; + } + + @Override + public TypeInformation[] getParameterTypes(Class[] signature) { + if (inputTypes != null) { + return inputTypes; + } else { + return super.getParameterTypes(signature); + } + } + + @Override + public TypeInformation getResultType(Class[] signature) { + if (resultType == null && resultTypeString != null) { + throw new RuntimeException( + "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); + } + return resultType; + } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + TypeInference.Builder builder = TypeInference.newBuilder(); + + if (inputTypes != null) { + final List argumentDataTypes = Stream.of(inputTypes) + .map(TypeConversions::fromLegacyInfoToDataType) + .collect(Collectors.toList()); + builder.typedArguments(argumentDataTypes); + } + + return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) + .build(); + } + + @Override + public String toString() { + return name; + } +} diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java new file mode 100644 index 0000000000..61073d2f22 --- /dev/null +++ b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java @@ -0,0 +1,163 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.functions.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.table.types.inference.TypeStrategies; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** The wrapper of user defined python table function. */ +@Internal +public class PythonTableFunction extends TableFunction implements PythonFunction { + + private static final long serialVersionUID = 1L; + + private final String name; + private final byte[] serializedTableFunction; + private final PythonFunctionKind pythonFunctionKind; + private final boolean deterministic; + private final PythonEnv pythonEnv; + private final boolean takesRowAsInput; + + private TypeInformation[] inputTypes; + private String[] inputTypesString; + private TypeInformation resultType; + private String resultTypeString; + + public PythonTableFunction( + String name, + byte[] serializedScalarFunction, + TypeInformation[] inputTypes, + TypeInformation resultType, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypes = inputTypes; + this.resultType = resultType; + } + + public PythonTableFunction( + String name, + byte[] serializedScalarFunction, + String[] inputTypesString, + String resultTypeString, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypesString = inputTypesString; + this.resultTypeString = resultTypeString; + } + + public PythonTableFunction( + String name, + byte[] serializedScalarFunction, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this.name = name; + this.serializedTableFunction = serializedScalarFunction; + this.pythonFunctionKind = pythonFunctionKind; + this.deterministic = deterministic; + this.pythonEnv = pythonEnv; + this.takesRowAsInput = takesRowAsInput; + } + + public void eval(Object... args) { + throw new UnsupportedOperationException("This method is a placeholder and should not be called."); + } + + @Override + public byte[] getSerializedPythonFunction() { + return serializedTableFunction; + } + + @Override + public PythonEnv getPythonEnv() { + return pythonEnv; + } + + @Override + public PythonFunctionKind getPythonFunctionKind() { + return pythonFunctionKind; + } + + @Override + public boolean takesRowAsInput() { + return takesRowAsInput; + } + + @Override + public boolean isDeterministic() { + return deterministic; + } + + @Override + public TypeInformation[] getParameterTypes(Class[] signature) { + if (inputTypes != null) { + return inputTypes; + } else { + return super.getParameterTypes(signature); + } + } + + @Override + public TypeInformation getResultType() { + if (resultType == null && resultTypeString != null) { + throw new RuntimeException( + "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); + } + return (TypeInformation) resultType; + } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + TypeInference.Builder builder = TypeInference.newBuilder(); + + if (inputTypes != null) { + final List argumentDataTypes = Stream.of(inputTypes) + .map(TypeConversions::fromLegacyInfoToDataType) + .collect(Collectors.toList()); + builder.typedArguments(argumentDataTypes); + } + + return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) + .build(); + } + + @Override + public String toString() { + return name; + } +} diff --git a/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java new file mode 100644 index 0000000000..190baa7fbd --- /dev/null +++ b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.functions.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.table.types.inference.TypeStrategies; +import org.apache.flink.table.types.utils.TypeConversions; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * The wrapper of user defined python scalar function. + */ +@Internal +public class PythonScalarFunction extends ScalarFunction implements PythonFunction { + + private static final long serialVersionUID = 1L; + + private final String name; + private final byte[] serializedScalarFunction; + private final PythonFunctionKind pythonFunctionKind; + private final boolean deterministic; + private final PythonEnv pythonEnv; + private final boolean takesRowAsInput; + + private TypeInformation[] inputTypes; + private String[] inputTypesString; + private TypeInformation resultType; + private String resultTypeString; + + public PythonScalarFunction( + String name, + byte[] serializedScalarFunction, + TypeInformation[] inputTypes, + TypeInformation resultType, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypes = inputTypes; + this.resultType = resultType; + } + + public PythonScalarFunction( + String name, + byte[] serializedScalarFunction, + String[] inputTypesString, + String resultTypeString, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypesString = inputTypesString; + this.resultTypeString = resultTypeString; + } + + public PythonScalarFunction( + String name, + byte[] serializedScalarFunction, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this.name = name; + this.serializedScalarFunction = serializedScalarFunction; + this.pythonFunctionKind = pythonFunctionKind; + this.deterministic = deterministic; + this.pythonEnv = pythonEnv; + this.takesRowAsInput = takesRowAsInput; + } + + public Object eval(Object... args) { + throw new UnsupportedOperationException("This method is a placeholder and should not be called."); + } + + @Override + public byte[] getSerializedPythonFunction() { + return serializedScalarFunction; + } + + @Override + public PythonEnv getPythonEnv() { + return pythonEnv; + } + + @Override + public PythonFunctionKind getPythonFunctionKind() { + return pythonFunctionKind; + } + + @Override + public boolean takesRowAsInput() { + return takesRowAsInput; + } + + @Override + public boolean isDeterministic() { + return deterministic; + } + + @Override + public TypeInformation[] getParameterTypes(Class[] signature) { + if (inputTypes != null) { + return inputTypes; + } else { + return super.getParameterTypes(signature); + } + } + + @Override + public TypeInformation getResultType(Class[] signature) { + if (resultType == null && resultTypeString != null) { + throw new RuntimeException( + "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); + } + return resultType; + } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + TypeInference.Builder builder = TypeInference.newBuilder(); + + if (inputTypes != null) { + final List argumentDataTypes = Stream.of(inputTypes) + .map(TypeConversions::fromLegacyInfoToDataType) + .collect(Collectors.toList()); + builder.typedArguments(argumentDataTypes); + } + + return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) + .build(); + } + + @Override + public String toString() { + return name; + } +} diff --git a/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java new file mode 100644 index 0000000000..61073d2f22 --- /dev/null +++ b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java @@ -0,0 +1,163 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.functions.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.table.types.inference.TypeStrategies; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** The wrapper of user defined python table function. */ +@Internal +public class PythonTableFunction extends TableFunction implements PythonFunction { + + private static final long serialVersionUID = 1L; + + private final String name; + private final byte[] serializedTableFunction; + private final PythonFunctionKind pythonFunctionKind; + private final boolean deterministic; + private final PythonEnv pythonEnv; + private final boolean takesRowAsInput; + + private TypeInformation[] inputTypes; + private String[] inputTypesString; + private TypeInformation resultType; + private String resultTypeString; + + public PythonTableFunction( + String name, + byte[] serializedScalarFunction, + TypeInformation[] inputTypes, + TypeInformation resultType, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypes = inputTypes; + this.resultType = resultType; + } + + public PythonTableFunction( + String name, + byte[] serializedScalarFunction, + String[] inputTypesString, + String resultTypeString, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypesString = inputTypesString; + this.resultTypeString = resultTypeString; + } + + public PythonTableFunction( + String name, + byte[] serializedScalarFunction, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this.name = name; + this.serializedTableFunction = serializedScalarFunction; + this.pythonFunctionKind = pythonFunctionKind; + this.deterministic = deterministic; + this.pythonEnv = pythonEnv; + this.takesRowAsInput = takesRowAsInput; + } + + public void eval(Object... args) { + throw new UnsupportedOperationException("This method is a placeholder and should not be called."); + } + + @Override + public byte[] getSerializedPythonFunction() { + return serializedTableFunction; + } + + @Override + public PythonEnv getPythonEnv() { + return pythonEnv; + } + + @Override + public PythonFunctionKind getPythonFunctionKind() { + return pythonFunctionKind; + } + + @Override + public boolean takesRowAsInput() { + return takesRowAsInput; + } + + @Override + public boolean isDeterministic() { + return deterministic; + } + + @Override + public TypeInformation[] getParameterTypes(Class[] signature) { + if (inputTypes != null) { + return inputTypes; + } else { + return super.getParameterTypes(signature); + } + } + + @Override + public TypeInformation getResultType() { + if (resultType == null && resultTypeString != null) { + throw new RuntimeException( + "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); + } + return (TypeInformation) resultType; + } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + TypeInference.Builder builder = TypeInference.newBuilder(); + + if (inputTypes != null) { + final List argumentDataTypes = Stream.of(inputTypes) + .map(TypeConversions::fromLegacyInfoToDataType) + .collect(Collectors.toList()); + builder.typedArguments(argumentDataTypes); + } + + return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) + .build(); + } + + @Override + public String toString() { + return name; + } +} diff --git a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java new file mode 100644 index 0000000000..190baa7fbd --- /dev/null +++ b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.functions.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.table.types.inference.TypeStrategies; +import org.apache.flink.table.types.utils.TypeConversions; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * The wrapper of user defined python scalar function. + */ +@Internal +public class PythonScalarFunction extends ScalarFunction implements PythonFunction { + + private static final long serialVersionUID = 1L; + + private final String name; + private final byte[] serializedScalarFunction; + private final PythonFunctionKind pythonFunctionKind; + private final boolean deterministic; + private final PythonEnv pythonEnv; + private final boolean takesRowAsInput; + + private TypeInformation[] inputTypes; + private String[] inputTypesString; + private TypeInformation resultType; + private String resultTypeString; + + public PythonScalarFunction( + String name, + byte[] serializedScalarFunction, + TypeInformation[] inputTypes, + TypeInformation resultType, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypes = inputTypes; + this.resultType = resultType; + } + + public PythonScalarFunction( + String name, + byte[] serializedScalarFunction, + String[] inputTypesString, + String resultTypeString, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypesString = inputTypesString; + this.resultTypeString = resultTypeString; + } + + public PythonScalarFunction( + String name, + byte[] serializedScalarFunction, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this.name = name; + this.serializedScalarFunction = serializedScalarFunction; + this.pythonFunctionKind = pythonFunctionKind; + this.deterministic = deterministic; + this.pythonEnv = pythonEnv; + this.takesRowAsInput = takesRowAsInput; + } + + public Object eval(Object... args) { + throw new UnsupportedOperationException("This method is a placeholder and should not be called."); + } + + @Override + public byte[] getSerializedPythonFunction() { + return serializedScalarFunction; + } + + @Override + public PythonEnv getPythonEnv() { + return pythonEnv; + } + + @Override + public PythonFunctionKind getPythonFunctionKind() { + return pythonFunctionKind; + } + + @Override + public boolean takesRowAsInput() { + return takesRowAsInput; + } + + @Override + public boolean isDeterministic() { + return deterministic; + } + + @Override + public TypeInformation[] getParameterTypes(Class[] signature) { + if (inputTypes != null) { + return inputTypes; + } else { + return super.getParameterTypes(signature); + } + } + + @Override + public TypeInformation getResultType(Class[] signature) { + if (resultType == null && resultTypeString != null) { + throw new RuntimeException( + "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); + } + return resultType; + } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + TypeInference.Builder builder = TypeInference.newBuilder(); + + if (inputTypes != null) { + final List argumentDataTypes = Stream.of(inputTypes) + .map(TypeConversions::fromLegacyInfoToDataType) + .collect(Collectors.toList()); + builder.typedArguments(argumentDataTypes); + } + + return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) + .build(); + } + + @Override + public String toString() { + return name; + } +} diff --git a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java new file mode 100644 index 0000000000..61073d2f22 --- /dev/null +++ b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java @@ -0,0 +1,163 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.functions.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.table.types.inference.TypeStrategies; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** The wrapper of user defined python table function. */ +@Internal +public class PythonTableFunction extends TableFunction implements PythonFunction { + + private static final long serialVersionUID = 1L; + + private final String name; + private final byte[] serializedTableFunction; + private final PythonFunctionKind pythonFunctionKind; + private final boolean deterministic; + private final PythonEnv pythonEnv; + private final boolean takesRowAsInput; + + private TypeInformation[] inputTypes; + private String[] inputTypesString; + private TypeInformation resultType; + private String resultTypeString; + + public PythonTableFunction( + String name, + byte[] serializedScalarFunction, + TypeInformation[] inputTypes, + TypeInformation resultType, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypes = inputTypes; + this.resultType = resultType; + } + + public PythonTableFunction( + String name, + byte[] serializedScalarFunction, + String[] inputTypesString, + String resultTypeString, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); + this.inputTypesString = inputTypesString; + this.resultTypeString = resultTypeString; + } + + public PythonTableFunction( + String name, + byte[] serializedScalarFunction, + PythonFunctionKind pythonFunctionKind, + boolean deterministic, + boolean takesRowAsInput, + PythonEnv pythonEnv) { + this.name = name; + this.serializedTableFunction = serializedScalarFunction; + this.pythonFunctionKind = pythonFunctionKind; + this.deterministic = deterministic; + this.pythonEnv = pythonEnv; + this.takesRowAsInput = takesRowAsInput; + } + + public void eval(Object... args) { + throw new UnsupportedOperationException("This method is a placeholder and should not be called."); + } + + @Override + public byte[] getSerializedPythonFunction() { + return serializedTableFunction; + } + + @Override + public PythonEnv getPythonEnv() { + return pythonEnv; + } + + @Override + public PythonFunctionKind getPythonFunctionKind() { + return pythonFunctionKind; + } + + @Override + public boolean takesRowAsInput() { + return takesRowAsInput; + } + + @Override + public boolean isDeterministic() { + return deterministic; + } + + @Override + public TypeInformation[] getParameterTypes(Class[] signature) { + if (inputTypes != null) { + return inputTypes; + } else { + return super.getParameterTypes(signature); + } + } + + @Override + public TypeInformation getResultType() { + if (resultType == null && resultTypeString != null) { + throw new RuntimeException( + "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); + } + return (TypeInformation) resultType; + } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + TypeInference.Builder builder = TypeInference.newBuilder(); + + if (inputTypes != null) { + final List argumentDataTypes = Stream.of(inputTypes) + .map(TypeConversions::fromLegacyInfoToDataType) + .collect(Collectors.toList()); + builder.typedArguments(argumentDataTypes); + } + + return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) + .build(); + } + + @Override + public String toString() { + return name; + } +} diff --git a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java index ad084ebc11..e08f65f260 100644 --- a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java +++ b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java @@ -394,7 +394,7 @@ public static List getPythonUdfList(String pythonPath, String udfFile) { continue; } Configuration configuration = new Configuration(); - configuration.set(PythonOptions.PYTHON_FILES, udfFile + ".zip"); + configuration.set(PythonOptions.PYTHON_FILES, udfFile); configuration.set(PythonOptions.PYTHON_CLIENT_EXECUTABLE, pythonPath); configuration.set(PythonOptions.PYTHON_EXECUTABLE, pythonPath); try { @@ -413,7 +413,7 @@ private static List execPyAndGetUdfNameList(String pyPath, String pyFile String shell = StrUtil.join(" ", Arrays.asList(Opt.ofBlankAble(pyPath).orElse("python3"), pyFile, checkPyFile)); - return StrUtil.split(RuntimeUtil.execForStr(shell), ","); + return StrUtil.split(StrUtil.trim(RuntimeUtil.execForStr(shell)), ","); } catch (Exception e) { throw new DinkyException(e); } diff --git a/dinky-function/src/main/resources/getPyFuncList.py b/dinky-function/src/main/resources/getPyFuncList.py index 231610b6d7..af8020f85d 100644 --- a/dinky-function/src/main/resources/getPyFuncList.py +++ b/dinky-function/src/main/resources/getPyFuncList.py @@ -1,7 +1,12 @@ -# -*- coding: utf-8 -*- +import hashlib import importlib +import os import sys -import platform +import uuid +import zipfile +import appdirs +import shutil + import pyflink # import ast @@ -41,9 +46,6 @@ # except Exception as e: # pass - -import os - if len(sys.argv) < 2: raise Exception("Please enter the file path") @@ -52,23 +54,62 @@ udf_name_list = set() +def get_file_md5(path): + """ + 获取文件内容的MD5值 + :param path: 文件所在路径 + :return: + """ + with open(path, 'rb') as file: + data = file.read() + + diff_check = hashlib.md5() + diff_check.update(data) + md5_code = diff_check.hexdigest() + return md5_code + + def list_modules(root_dir): """返回给定目录下所有模块和子模块的名称""" modules = [] - for dirpath, _, filenames in os.walk(root_dir): - for filename in filenames: - if filename.endswith(".py"): - p_ = project_path.replace(os.sep, ".") - module_name = os.path.splitext(os.path.join(dirpath, filename))[0].replace(os.sep, ".").replace( - p_ + ".", "") - modules.append(module_name.replace(root_dir, "")) + if os.path.isdir(root_dir): + sys.path.append(project_path) + for dirpath, _, filenames in os.walk(root_dir): + for filename in filenames: + parse_file(dirpath, filename, modules, root_dir) + else: + file_dir = os.path.dirname(root_dir) + sys.path.append(file_dir) + parse_file(file_dir, root_dir, modules, file_dir) + if project_path.endswith(".py"): + sys.path.append(file_dir) + elif project_path.endswith(".zip"): + tmp_dir = appdirs.user_cache_dir() + file = zipfile.ZipFile(project_path) + unzip_file_path = os.path.normpath(tmp_dir + "/dinky/udf_parse/" + get_file_md5(project_path)) + if os.path.exists(unzip_file_path): + shutil.rmtree(unzip_file_path) + file.extractall(unzip_file_path) + sys.path.append(unzip_file_path) + for dirpath, _, filenames in os.walk(unzip_file_path): + for filename in filenames: + parse_file(dirpath, filename, modules, unzip_file_path) + file.close() return modules +def parse_file(dirpath, filename, modules, root_dir): + root_dir = os.path.normpath(root_dir) + if filename.endswith(".py"): + p_ = root_dir.replace(os.sep, ".") + module_name = os.path.splitext(os.path.join(dirpath, filename))[0].replace(os.sep, ".").replace( + p_ + ".", "") + modules.append(module_name.replace(root_dir, "")) + + if __name__ == '__main__': modules = list_modules(project_path) - sys.path.append(project_path) for module_name in modules: try: module = importlib.import_module(module_name) From f0bdba03ab059048a4efc9711a1e0745e3714202 Mon Sep 17 00:00:00 2001 From: ZackYoung Date: Tue, 7 Nov 2023 14:02:03 +0800 Subject: [PATCH 2/2] fix_udf_python --- .../python/PythonScalarFunction.java | 164 ------------------ .../functions/python/PythonTableFunction.java | 163 ----------------- .../python/PythonScalarFunction.java | 164 ------------------ .../functions/python/PythonTableFunction.java | 163 ----------------- .../python/PythonScalarFunction.java | 164 ------------------ .../functions/python/PythonTableFunction.java | 163 ----------------- 6 files changed, 981 deletions(-) delete mode 100644 dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java delete mode 100644 dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java delete mode 100644 dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java delete mode 100644 dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java delete mode 100644 dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java delete mode 100644 dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java deleted file mode 100644 index 190baa7fbd..0000000000 --- a/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.flink.table.functions.python; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.catalog.DataTypeFactory; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.TypeInference; -import org.apache.flink.table.types.inference.TypeStrategies; -import org.apache.flink.table.types.utils.TypeConversions; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * The wrapper of user defined python scalar function. - */ -@Internal -public class PythonScalarFunction extends ScalarFunction implements PythonFunction { - - private static final long serialVersionUID = 1L; - - private final String name; - private final byte[] serializedScalarFunction; - private final PythonFunctionKind pythonFunctionKind; - private final boolean deterministic; - private final PythonEnv pythonEnv; - private final boolean takesRowAsInput; - - private TypeInformation[] inputTypes; - private String[] inputTypesString; - private TypeInformation resultType; - private String resultTypeString; - - public PythonScalarFunction( - String name, - byte[] serializedScalarFunction, - TypeInformation[] inputTypes, - TypeInformation resultType, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypes = inputTypes; - this.resultType = resultType; - } - - public PythonScalarFunction( - String name, - byte[] serializedScalarFunction, - String[] inputTypesString, - String resultTypeString, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypesString = inputTypesString; - this.resultTypeString = resultTypeString; - } - - public PythonScalarFunction( - String name, - byte[] serializedScalarFunction, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this.name = name; - this.serializedScalarFunction = serializedScalarFunction; - this.pythonFunctionKind = pythonFunctionKind; - this.deterministic = deterministic; - this.pythonEnv = pythonEnv; - this.takesRowAsInput = takesRowAsInput; - } - - public Object eval(Object... args) { - throw new UnsupportedOperationException("This method is a placeholder and should not be called."); - } - - @Override - public byte[] getSerializedPythonFunction() { - return serializedScalarFunction; - } - - @Override - public PythonEnv getPythonEnv() { - return pythonEnv; - } - - @Override - public PythonFunctionKind getPythonFunctionKind() { - return pythonFunctionKind; - } - - @Override - public boolean takesRowAsInput() { - return takesRowAsInput; - } - - @Override - public boolean isDeterministic() { - return deterministic; - } - - @Override - public TypeInformation[] getParameterTypes(Class[] signature) { - if (inputTypes != null) { - return inputTypes; - } else { - return super.getParameterTypes(signature); - } - } - - @Override - public TypeInformation getResultType(Class[] signature) { - if (resultType == null && resultTypeString != null) { - throw new RuntimeException( - "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); - } - return resultType; - } - - @Override - public TypeInference getTypeInference(DataTypeFactory typeFactory) { - TypeInference.Builder builder = TypeInference.newBuilder(); - - if (inputTypes != null) { - final List argumentDataTypes = Stream.of(inputTypes) - .map(TypeConversions::fromLegacyInfoToDataType) - .collect(Collectors.toList()); - builder.typedArguments(argumentDataTypes); - } - - return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) - .build(); - } - - @Override - public String toString() { - return name; - } -} diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java deleted file mode 100644 index 61073d2f22..0000000000 --- a/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.flink.table.functions.python; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.catalog.DataTypeFactory; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.TypeInference; -import org.apache.flink.table.types.inference.TypeStrategies; -import org.apache.flink.table.types.utils.TypeConversions; -import org.apache.flink.types.Row; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** The wrapper of user defined python table function. */ -@Internal -public class PythonTableFunction extends TableFunction implements PythonFunction { - - private static final long serialVersionUID = 1L; - - private final String name; - private final byte[] serializedTableFunction; - private final PythonFunctionKind pythonFunctionKind; - private final boolean deterministic; - private final PythonEnv pythonEnv; - private final boolean takesRowAsInput; - - private TypeInformation[] inputTypes; - private String[] inputTypesString; - private TypeInformation resultType; - private String resultTypeString; - - public PythonTableFunction( - String name, - byte[] serializedScalarFunction, - TypeInformation[] inputTypes, - TypeInformation resultType, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypes = inputTypes; - this.resultType = resultType; - } - - public PythonTableFunction( - String name, - byte[] serializedScalarFunction, - String[] inputTypesString, - String resultTypeString, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypesString = inputTypesString; - this.resultTypeString = resultTypeString; - } - - public PythonTableFunction( - String name, - byte[] serializedScalarFunction, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this.name = name; - this.serializedTableFunction = serializedScalarFunction; - this.pythonFunctionKind = pythonFunctionKind; - this.deterministic = deterministic; - this.pythonEnv = pythonEnv; - this.takesRowAsInput = takesRowAsInput; - } - - public void eval(Object... args) { - throw new UnsupportedOperationException("This method is a placeholder and should not be called."); - } - - @Override - public byte[] getSerializedPythonFunction() { - return serializedTableFunction; - } - - @Override - public PythonEnv getPythonEnv() { - return pythonEnv; - } - - @Override - public PythonFunctionKind getPythonFunctionKind() { - return pythonFunctionKind; - } - - @Override - public boolean takesRowAsInput() { - return takesRowAsInput; - } - - @Override - public boolean isDeterministic() { - return deterministic; - } - - @Override - public TypeInformation[] getParameterTypes(Class[] signature) { - if (inputTypes != null) { - return inputTypes; - } else { - return super.getParameterTypes(signature); - } - } - - @Override - public TypeInformation getResultType() { - if (resultType == null && resultTypeString != null) { - throw new RuntimeException( - "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); - } - return (TypeInformation) resultType; - } - - @Override - public TypeInference getTypeInference(DataTypeFactory typeFactory) { - TypeInference.Builder builder = TypeInference.newBuilder(); - - if (inputTypes != null) { - final List argumentDataTypes = Stream.of(inputTypes) - .map(TypeConversions::fromLegacyInfoToDataType) - .collect(Collectors.toList()); - builder.typedArguments(argumentDataTypes); - } - - return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) - .build(); - } - - @Override - public String toString() { - return name; - } -} diff --git a/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java deleted file mode 100644 index 190baa7fbd..0000000000 --- a/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.flink.table.functions.python; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.catalog.DataTypeFactory; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.TypeInference; -import org.apache.flink.table.types.inference.TypeStrategies; -import org.apache.flink.table.types.utils.TypeConversions; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * The wrapper of user defined python scalar function. - */ -@Internal -public class PythonScalarFunction extends ScalarFunction implements PythonFunction { - - private static final long serialVersionUID = 1L; - - private final String name; - private final byte[] serializedScalarFunction; - private final PythonFunctionKind pythonFunctionKind; - private final boolean deterministic; - private final PythonEnv pythonEnv; - private final boolean takesRowAsInput; - - private TypeInformation[] inputTypes; - private String[] inputTypesString; - private TypeInformation resultType; - private String resultTypeString; - - public PythonScalarFunction( - String name, - byte[] serializedScalarFunction, - TypeInformation[] inputTypes, - TypeInformation resultType, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypes = inputTypes; - this.resultType = resultType; - } - - public PythonScalarFunction( - String name, - byte[] serializedScalarFunction, - String[] inputTypesString, - String resultTypeString, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypesString = inputTypesString; - this.resultTypeString = resultTypeString; - } - - public PythonScalarFunction( - String name, - byte[] serializedScalarFunction, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this.name = name; - this.serializedScalarFunction = serializedScalarFunction; - this.pythonFunctionKind = pythonFunctionKind; - this.deterministic = deterministic; - this.pythonEnv = pythonEnv; - this.takesRowAsInput = takesRowAsInput; - } - - public Object eval(Object... args) { - throw new UnsupportedOperationException("This method is a placeholder and should not be called."); - } - - @Override - public byte[] getSerializedPythonFunction() { - return serializedScalarFunction; - } - - @Override - public PythonEnv getPythonEnv() { - return pythonEnv; - } - - @Override - public PythonFunctionKind getPythonFunctionKind() { - return pythonFunctionKind; - } - - @Override - public boolean takesRowAsInput() { - return takesRowAsInput; - } - - @Override - public boolean isDeterministic() { - return deterministic; - } - - @Override - public TypeInformation[] getParameterTypes(Class[] signature) { - if (inputTypes != null) { - return inputTypes; - } else { - return super.getParameterTypes(signature); - } - } - - @Override - public TypeInformation getResultType(Class[] signature) { - if (resultType == null && resultTypeString != null) { - throw new RuntimeException( - "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); - } - return resultType; - } - - @Override - public TypeInference getTypeInference(DataTypeFactory typeFactory) { - TypeInference.Builder builder = TypeInference.newBuilder(); - - if (inputTypes != null) { - final List argumentDataTypes = Stream.of(inputTypes) - .map(TypeConversions::fromLegacyInfoToDataType) - .collect(Collectors.toList()); - builder.typedArguments(argumentDataTypes); - } - - return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) - .build(); - } - - @Override - public String toString() { - return name; - } -} diff --git a/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java deleted file mode 100644 index 61073d2f22..0000000000 --- a/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.flink.table.functions.python; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.catalog.DataTypeFactory; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.TypeInference; -import org.apache.flink.table.types.inference.TypeStrategies; -import org.apache.flink.table.types.utils.TypeConversions; -import org.apache.flink.types.Row; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** The wrapper of user defined python table function. */ -@Internal -public class PythonTableFunction extends TableFunction implements PythonFunction { - - private static final long serialVersionUID = 1L; - - private final String name; - private final byte[] serializedTableFunction; - private final PythonFunctionKind pythonFunctionKind; - private final boolean deterministic; - private final PythonEnv pythonEnv; - private final boolean takesRowAsInput; - - private TypeInformation[] inputTypes; - private String[] inputTypesString; - private TypeInformation resultType; - private String resultTypeString; - - public PythonTableFunction( - String name, - byte[] serializedScalarFunction, - TypeInformation[] inputTypes, - TypeInformation resultType, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypes = inputTypes; - this.resultType = resultType; - } - - public PythonTableFunction( - String name, - byte[] serializedScalarFunction, - String[] inputTypesString, - String resultTypeString, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypesString = inputTypesString; - this.resultTypeString = resultTypeString; - } - - public PythonTableFunction( - String name, - byte[] serializedScalarFunction, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this.name = name; - this.serializedTableFunction = serializedScalarFunction; - this.pythonFunctionKind = pythonFunctionKind; - this.deterministic = deterministic; - this.pythonEnv = pythonEnv; - this.takesRowAsInput = takesRowAsInput; - } - - public void eval(Object... args) { - throw new UnsupportedOperationException("This method is a placeholder and should not be called."); - } - - @Override - public byte[] getSerializedPythonFunction() { - return serializedTableFunction; - } - - @Override - public PythonEnv getPythonEnv() { - return pythonEnv; - } - - @Override - public PythonFunctionKind getPythonFunctionKind() { - return pythonFunctionKind; - } - - @Override - public boolean takesRowAsInput() { - return takesRowAsInput; - } - - @Override - public boolean isDeterministic() { - return deterministic; - } - - @Override - public TypeInformation[] getParameterTypes(Class[] signature) { - if (inputTypes != null) { - return inputTypes; - } else { - return super.getParameterTypes(signature); - } - } - - @Override - public TypeInformation getResultType() { - if (resultType == null && resultTypeString != null) { - throw new RuntimeException( - "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); - } - return (TypeInformation) resultType; - } - - @Override - public TypeInference getTypeInference(DataTypeFactory typeFactory) { - TypeInference.Builder builder = TypeInference.newBuilder(); - - if (inputTypes != null) { - final List argumentDataTypes = Stream.of(inputTypes) - .map(TypeConversions::fromLegacyInfoToDataType) - .collect(Collectors.toList()); - builder.typedArguments(argumentDataTypes); - } - - return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) - .build(); - } - - @Override - public String toString() { - return name; - } -} diff --git a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java deleted file mode 100644 index 190baa7fbd..0000000000 --- a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.flink.table.functions.python; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.catalog.DataTypeFactory; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.TypeInference; -import org.apache.flink.table.types.inference.TypeStrategies; -import org.apache.flink.table.types.utils.TypeConversions; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * The wrapper of user defined python scalar function. - */ -@Internal -public class PythonScalarFunction extends ScalarFunction implements PythonFunction { - - private static final long serialVersionUID = 1L; - - private final String name; - private final byte[] serializedScalarFunction; - private final PythonFunctionKind pythonFunctionKind; - private final boolean deterministic; - private final PythonEnv pythonEnv; - private final boolean takesRowAsInput; - - private TypeInformation[] inputTypes; - private String[] inputTypesString; - private TypeInformation resultType; - private String resultTypeString; - - public PythonScalarFunction( - String name, - byte[] serializedScalarFunction, - TypeInformation[] inputTypes, - TypeInformation resultType, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypes = inputTypes; - this.resultType = resultType; - } - - public PythonScalarFunction( - String name, - byte[] serializedScalarFunction, - String[] inputTypesString, - String resultTypeString, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypesString = inputTypesString; - this.resultTypeString = resultTypeString; - } - - public PythonScalarFunction( - String name, - byte[] serializedScalarFunction, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this.name = name; - this.serializedScalarFunction = serializedScalarFunction; - this.pythonFunctionKind = pythonFunctionKind; - this.deterministic = deterministic; - this.pythonEnv = pythonEnv; - this.takesRowAsInput = takesRowAsInput; - } - - public Object eval(Object... args) { - throw new UnsupportedOperationException("This method is a placeholder and should not be called."); - } - - @Override - public byte[] getSerializedPythonFunction() { - return serializedScalarFunction; - } - - @Override - public PythonEnv getPythonEnv() { - return pythonEnv; - } - - @Override - public PythonFunctionKind getPythonFunctionKind() { - return pythonFunctionKind; - } - - @Override - public boolean takesRowAsInput() { - return takesRowAsInput; - } - - @Override - public boolean isDeterministic() { - return deterministic; - } - - @Override - public TypeInformation[] getParameterTypes(Class[] signature) { - if (inputTypes != null) { - return inputTypes; - } else { - return super.getParameterTypes(signature); - } - } - - @Override - public TypeInformation getResultType(Class[] signature) { - if (resultType == null && resultTypeString != null) { - throw new RuntimeException( - "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); - } - return resultType; - } - - @Override - public TypeInference getTypeInference(DataTypeFactory typeFactory) { - TypeInference.Builder builder = TypeInference.newBuilder(); - - if (inputTypes != null) { - final List argumentDataTypes = Stream.of(inputTypes) - .map(TypeConversions::fromLegacyInfoToDataType) - .collect(Collectors.toList()); - builder.typedArguments(argumentDataTypes); - } - - return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) - .build(); - } - - @Override - public String toString() { - return name; - } -} diff --git a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java deleted file mode 100644 index 61073d2f22..0000000000 --- a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.flink.table.functions.python; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.catalog.DataTypeFactory; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.TypeInference; -import org.apache.flink.table.types.inference.TypeStrategies; -import org.apache.flink.table.types.utils.TypeConversions; -import org.apache.flink.types.Row; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** The wrapper of user defined python table function. */ -@Internal -public class PythonTableFunction extends TableFunction implements PythonFunction { - - private static final long serialVersionUID = 1L; - - private final String name; - private final byte[] serializedTableFunction; - private final PythonFunctionKind pythonFunctionKind; - private final boolean deterministic; - private final PythonEnv pythonEnv; - private final boolean takesRowAsInput; - - private TypeInformation[] inputTypes; - private String[] inputTypesString; - private TypeInformation resultType; - private String resultTypeString; - - public PythonTableFunction( - String name, - byte[] serializedScalarFunction, - TypeInformation[] inputTypes, - TypeInformation resultType, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypes = inputTypes; - this.resultType = resultType; - } - - public PythonTableFunction( - String name, - byte[] serializedScalarFunction, - String[] inputTypesString, - String resultTypeString, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this(name, serializedScalarFunction, pythonFunctionKind, deterministic, takesRowAsInput, pythonEnv); - this.inputTypesString = inputTypesString; - this.resultTypeString = resultTypeString; - } - - public PythonTableFunction( - String name, - byte[] serializedScalarFunction, - PythonFunctionKind pythonFunctionKind, - boolean deterministic, - boolean takesRowAsInput, - PythonEnv pythonEnv) { - this.name = name; - this.serializedTableFunction = serializedScalarFunction; - this.pythonFunctionKind = pythonFunctionKind; - this.deterministic = deterministic; - this.pythonEnv = pythonEnv; - this.takesRowAsInput = takesRowAsInput; - } - - public void eval(Object... args) { - throw new UnsupportedOperationException("This method is a placeholder and should not be called."); - } - - @Override - public byte[] getSerializedPythonFunction() { - return serializedTableFunction; - } - - @Override - public PythonEnv getPythonEnv() { - return pythonEnv; - } - - @Override - public PythonFunctionKind getPythonFunctionKind() { - return pythonFunctionKind; - } - - @Override - public boolean takesRowAsInput() { - return takesRowAsInput; - } - - @Override - public boolean isDeterministic() { - return deterministic; - } - - @Override - public TypeInformation[] getParameterTypes(Class[] signature) { - if (inputTypes != null) { - return inputTypes; - } else { - return super.getParameterTypes(signature); - } - } - - @Override - public TypeInformation getResultType() { - if (resultType == null && resultTypeString != null) { - throw new RuntimeException( - "String format result type is not supported in old type system. The `register_function` is deprecated, please Use `create_temporary_system_function` instead."); - } - return (TypeInformation) resultType; - } - - @Override - public TypeInference getTypeInference(DataTypeFactory typeFactory) { - TypeInference.Builder builder = TypeInference.newBuilder(); - - if (inputTypes != null) { - final List argumentDataTypes = Stream.of(inputTypes) - .map(TypeConversions::fromLegacyInfoToDataType) - .collect(Collectors.toList()); - builder.typedArguments(argumentDataTypes); - } - - return builder.outputTypeStrategy(TypeStrategies.explicit(TypeConversions.fromLegacyInfoToDataType(resultType))) - .build(); - } - - @Override - public String toString() { - return name; - } -}