Skip to content

Commit

Permalink
refactor_udf_execute
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh committed Dec 6, 2024
1 parent a37b106 commit 1de21e7
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 13 deletions.
5 changes: 2 additions & 3 deletions dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.dinky.function.compiler.FunctionPackage;
import org.dinky.function.data.model.UDF;
import org.dinky.function.util.UDFUtil;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.FunctionLanguage;

public class UDFUtils extends UDFUtil {
Expand All @@ -41,7 +39,8 @@ public static UDF taskToUDF(Task task) {
.code(task.getStatement())
.functionLanguage(FunctionLanguage.valueOf(task.getDialect().toUpperCase()))
.build();
FunctionCompiler.getCompiler(udf, new Configuration(), task.getId());

FunctionCompiler.getCompiler(udf, task.getConfigJson().getCustomConfigMaps(), task.getId());
FunctionPackage.bale(udf, task.getId());
return udf;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

package org.dinky.function.compiler;

import org.apache.flink.configuration.Configuration;
import org.dinky.function.data.model.UDF;
import org.dinky.function.exception.UDFCompilerException;

import org.apache.flink.configuration.ReadableConfig;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
Expand All @@ -50,6 +52,9 @@ public interface FunctionCompiler {
*/
boolean compiler(UDF udf, ReadableConfig conf, Integer taskId);

static boolean getCompiler(UDF udf, Map<String,String> conf, Integer taskId) {
return getCompiler(udf, Configuration.fromMap(conf),taskId);
}
/**
* 编译
*
Expand Down
10 changes: 0 additions & 10 deletions dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.FlinkUdfManifest;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.executor.CustomTableEnvironment;
import org.dinky.function.FunctionFactory;
import org.dinky.function.compiler.CustomStringJavaCompiler;
import org.dinky.function.compiler.CustomStringScalaCompiler;
Expand All @@ -41,7 +40,6 @@

import org.apache.flink.client.python.PythonFunctionFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.FunctionLanguage;
Expand Down Expand Up @@ -450,14 +448,6 @@ private static List<String> execPyAndGetUdfNameList(String pyPath, String pyFile
}
}

public static void addConfigurationClsAndJars(
CustomTableEnvironment customTableEnvironment, List<URL> jarList, List<URL> classpaths) {
customTableEnvironment.addConfiguration(
PipelineOptions.CLASSPATHS,
classpaths.stream().map(URL::toString).collect(Collectors.toList()));
customTableEnvironment.addConfiguration(
PipelineOptions.JARS, jarList.stream().map(URL::toString).collect(Collectors.toList()));
}

public static void writeManifest(
Integer taskId, List<URL> jarPaths, FlinkUdfPathContextHolder udfPathContextHolder) {
Expand Down

0 comments on commit 1de21e7

Please sign in to comment.