From 64fe0da076366c5d7b12d4932d4b6b13acbc1001 Mon Sep 17 00:00:00 2001 From: Licho Date: Fri, 24 May 2024 20:59:32 +0800 Subject: [PATCH] feat: udf select form (#3449) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: sunlichao11 Signed-off-by: Zzm0809 <934230207@qq.com> Co-authored-by: sunlichao11 Co-authored-by: Zzm0809 <934230207@qq.com> Co-authored-by: Zzm0809 Co-authored-by: ufoe Co-authored-by: XiuhongTang Co-authored-by: 唐修红 Co-authored-by: ZackYoung Co-authored-by: zackyoungh --- .../org/dinky/controller/JarController.java | 4 +- .../org/dinky/controller/UDFController.java | 20 +++ .../main/java/org/dinky/data/dto/TaskDTO.java | 4 +- .../main/java/org/dinky/data/model/Task.java | 3 + .../dinky/data/model/ext/TaskExtConfig.java | 27 +++ .../dinky/data/model/ext/TaskUdfConfig.java | 2 +- .../dinky/data/model/ext/TaskUdfRefer.java | 41 +++++ .../org/dinky/data/model/udf/UDFManage.java | 3 + .../java/org/dinky/data/vo/CascaderVO.java | 12 +- .../java/org/dinky/data/vo/UDFManageVO.java | 1 + .../main/java/org/dinky/init/SystemInit.java | 1 - .../java/org/dinky/service/TaskService.java | 7 - .../java/org/dinky/service/UDFService.java | 14 ++ .../dinky/service/impl/FlinkServiceImpl.java | 5 +- .../dinky/service/impl/TaskServiceImpl.java | 28 +-- .../dinky/service/impl/UDFServiceImpl.java | 70 +++++++- .../main/java/org/dinky/utils/UDFUtils.java | 15 ++ .../src/main/resources/db/migration/README.md | 33 +++- .../db/migration/h2/R1.1.0__release.sql | 34 ---- .../db/migration/h2/V1.1.0__release.sql | 3 +- .../db/migration/mysql/R1.1.0__release.sql | 5 + .../db/migration/mysql/V1.1.0__release.sql | 23 ++- .../db/migration/pgsql/R1.1.0__release.sql | 4 +- .../db/migration/pgsql/V1.1.0__release.sql | 17 +- .../org/dinky/cdc/doris/DorisSinkBuilder.java | 13 +- .../org/dinky/cdc/doris/DorisSinkOptions.java | 6 +- .../java/org/dinky/explainer/Explainer.java | 12 +- .../main/java/org/dinky/job/JobConfig.java | 7 + .../main/java/org/dinky/trans/Operations.java | 29 +++ .../components/Flink/OptionsSelect/index.tsx | 1 + .../src/components/Flink/UdfSelect/index.tsx | 46 +++++ dinky-web/src/locales/en-US/pages.ts | 9 + dinky-web/src/locales/zh-CN/pages.ts | 9 + .../StudioEditor/constants.tsx | 4 +- .../RightContainer/JobConfig/index.tsx | 165 ++++++++++++++++-- .../RightContainer/JobConfig/service.tsx | 7 +- dinky-web/src/pages/DataStudio/model.ts | 26 ++- .../components/ConfigurationModal/index.tsx | 2 +- .../components/ResourceOverView/index.tsx | 13 +- .../UDF/components/UDFRegister/index.tsx | 6 + dinky-web/src/services/endpoints.tsx | 1 + dinky-web/src/types/RegCenter/data.d.ts | 1 + dinky-web/src/types/Studio/data.d.ts | 6 + script/sql/dinky-mysql.sql | 3 +- script/sql/dinky-pg.sql | 4 +- .../upgrade/1.1.0_schema/mysql/dinky_ddl.sql | 27 +-- .../upgrade/1.1.0_schema/mysql/dinky_dml.sql | 30 +--- .../1.1.0_schema/postgre/dinky_ddl.sql | 26 ++- .../1.1.0_schema/postgre/dinky_dml.sql | 30 +--- 49 files changed, 680 insertions(+), 179 deletions(-) create mode 100644 dinky-admin/src/main/java/org/dinky/data/model/ext/TaskUdfRefer.java delete mode 100644 dinky-admin/src/main/resources/db/migration/h2/R1.1.0__release.sql create mode 100644 dinky-web/src/components/Flink/UdfSelect/index.tsx diff --git a/dinky-admin/src/main/java/org/dinky/controller/JarController.java b/dinky-admin/src/main/java/org/dinky/controller/JarController.java index eee5788962..0a41af36d7 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/JarController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/JarController.java @@ -61,11 +61,11 @@ public class JarController { @PostMapping("/udf/generateJar") @ApiOperation("Generate jar") public Result>> generateJar() { - List allUDF = taskService.getAllUDF(); + List allUDF = taskService.getReleaseUDF(); List udfCodes = allUDF.stream() .map(task -> UDF.builder() .code(task.getStatement()) - .className(task.getSavePointPath()) + .className(task.getConfigJson().getUdfConfig().getClassName()) .functionLanguage( FunctionLanguage.valueOf(task.getDialect().toUpperCase())) .build()) diff --git a/dinky-admin/src/main/java/org/dinky/controller/UDFController.java b/dinky-admin/src/main/java/org/dinky/controller/UDFController.java index 6925a897a4..452c90f6ba 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/UDFController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/UDFController.java @@ -23,10 +23,15 @@ import org.dinky.data.model.Resources; import org.dinky.data.model.udf.UDFManage; import org.dinky.data.result.Result; +import org.dinky.data.vo.CascaderVO; import org.dinky.data.vo.UDFManageVO; +import org.dinky.function.data.model.UDF; +import org.dinky.service.TaskService; import org.dinky.service.UDFService; +import org.dinky.utils.UDFUtils; import java.util.List; +import java.util.stream.Collectors; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; @@ -36,6 +41,7 @@ import cn.dev33.satoken.annotation.SaCheckLogin; import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -50,6 +56,7 @@ @RequiredArgsConstructor public class UDFController { private final UDFService udfService; + private final TaskService taskService; /** * update udf name by id @@ -94,4 +101,17 @@ public Result addOrUpdateByResourceId(@RequestBody CommonDTO udfService.addOrUpdateByResourceId(dto.getData()); return Result.succeed(); } + + /** + * get all udf and convert its to cascader + * @return {@link Result} of {@link List} of {@link CascaderVO} + */ + @GetMapping("/getAllUdfs") + @ApiOperation("Get All UDFs") + public Result> getAllUdfsToCascader() { + // get all UDFs of dynamic UDFs(user defined UDFs in the task) + List userDefinedReleaseUdfs = + taskService.getReleaseUDF().stream().map(UDFUtils::taskToUDF).collect(Collectors.toList()); + return Result.succeed(udfService.getAllUdfsToCascader(userDefinedReleaseUdfs)); + } } diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java b/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java index a74dbd674a..31454442d3 100644 --- a/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java +++ b/dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java @@ -41,7 +41,6 @@ /** * StudioExecuteDTO - * */ @Getter @Setter @@ -237,10 +236,11 @@ public JobConfig getJobConfig() { Map parsedConfig = this.configJson == null ? new HashMap<>(0) : this.configJson.getCustomConfigMaps(); - + Map udfRefers = this.configJson == null ? new HashMap<>(0) : this.configJson.getUdfReferMaps(); JobConfig jobConfig = new JobConfig(); BeanUtil.copyProperties(this, jobConfig); jobConfig.setConfigJson(parsedConfig); + jobConfig.setUdfRefer(udfRefers); jobConfig.setTaskId(id); jobConfig.setJobName(name); diff --git a/dinky-admin/src/main/java/org/dinky/data/model/Task.java b/dinky-admin/src/main/java/org/dinky/data/model/Task.java index d22dd461b6..22ff4aedc2 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/Task.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/Task.java @@ -161,6 +161,9 @@ public class Task extends SuperEntity { notes = "ID of the version associated with the task") private Integer versionId; + @ApiModelProperty(value = "Enabled", dataType = "Boolean", example = "true", notes = "Whether the task is enabled") + private Boolean enabled; + @ApiModelProperty(value = "Statement", dataType = "String", notes = "SQL statement for the task") private String statement; diff --git a/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskExtConfig.java b/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskExtConfig.java index 2c713bd4fd..df437da715 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskExtConfig.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskExtConfig.java @@ -22,6 +22,8 @@ import org.dinky.assertion.Asserts; import org.dinky.data.ext.ConfigItem; +import org.apache.commons.lang3.StringUtils; + import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -49,6 +51,12 @@ public class TaskExtConfig implements Serializable { notes = "UDF (User-Defined Function) configuration for the task") private TaskUdfConfig udfConfig; + @ApiModelProperty( + value = "UDF Refer", + dataType = "TaskUdfRefer", + notes = "UDF (User-Defined Function) reference for the task") + private List udfRefer; + @ApiModelProperty( value = "Custom Config", dataType = "List", @@ -80,6 +88,25 @@ public Map getCustomConfigMaps() { : new HashMap<>(); } + // udfRefer-value的所有key-value + @JsonIgnore + public Map getUdfReferMaps() { + return Asserts.isNotNullCollection(udfRefer) + ? udfRefer.stream() + .filter(item -> item.getClassName() != null) + .map(t -> { + if (StringUtils.isEmpty(t.getName())) { + String name = t.getClassName() + .substring(t.getClassName().lastIndexOf(".") + 1); + name = name.substring(0, 1).toLowerCase() + name.substring(1); + t.setName(name); + } + return t; + }) + .collect(Collectors.toConcurrentMap(TaskUdfRefer::getClassName, TaskUdfRefer::getName)) + : new HashMap<>(); + } + // 是否包含某个key public boolean containsKey(String key) { return customConfig.stream().anyMatch(item -> item.getKey().equals(key)); diff --git a/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskUdfConfig.java b/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskUdfConfig.java index 99958c79ef..cda3a22ddb 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskUdfConfig.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskUdfConfig.java @@ -33,7 +33,7 @@ @AllArgsConstructor @NoArgsConstructor public class TaskUdfConfig implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -5981544561742928810L; @ApiModelProperty(value = "Template ID", dataType = "Integer", example = "1", notes = "ID of the UDF template") private Integer templateId; diff --git a/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskUdfRefer.java b/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskUdfRefer.java new file mode 100644 index 0000000000..1817cfdcda --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskUdfRefer.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.model.ext; + +import java.io.Serializable; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@ApiModel(value = "TaskUdfRefer", description = "UDF (User-Defined Function) refer for Task") +@AllArgsConstructor +@NoArgsConstructor +public class TaskUdfRefer implements Serializable { + + @ApiModelProperty(value = "function name", dataType = "String", example = "add", notes = "Nmae of the UDF function") + private String name; + + @ApiModelProperty(value = "Class Name", dataType = "String", notes = "Name of the UDF class") + private String className; +} diff --git a/dinky-admin/src/main/java/org/dinky/data/model/udf/UDFManage.java b/dinky-admin/src/main/java/org/dinky/data/model/udf/UDFManage.java index 92d9b38ea9..9935725551 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/udf/UDFManage.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/udf/UDFManage.java @@ -43,6 +43,9 @@ public class UDFManage extends SuperEntity { @ApiModelProperty(value = "Class Name", dataType = "String", notes = "Class Name") private String className; + @ApiModelProperty(value = "Language", dataType = "String", notes = "Language") + private String language; + @ApiModelProperty(value = "Task Id", dataType = "Integer", notes = "Task Id") private Integer taskId; diff --git a/dinky-admin/src/main/java/org/dinky/data/vo/CascaderVO.java b/dinky-admin/src/main/java/org/dinky/data/vo/CascaderVO.java index 8298ab1f2b..e7a7c36704 100644 --- a/dinky-admin/src/main/java/org/dinky/data/vo/CascaderVO.java +++ b/dinky-admin/src/main/java/org/dinky/data/vo/CascaderVO.java @@ -55,9 +55,19 @@ public CascaderVO(String label) { this.value = label; } + public CascaderVO(String label, String value) { + this.label = label; + this.value = value; + } + public CascaderVO(String label, List children) { this.label = label; - this.value = label; + this.children = children; + } + + public CascaderVO(String label, String value, List children) { + this.label = label; + this.value = value; this.children = children; } } diff --git a/dinky-admin/src/main/java/org/dinky/data/vo/UDFManageVO.java b/dinky-admin/src/main/java/org/dinky/data/vo/UDFManageVO.java index 4ef22a55a5..17e7f63352 100644 --- a/dinky-admin/src/main/java/org/dinky/data/vo/UDFManageVO.java +++ b/dinky-admin/src/main/java/org/dinky/data/vo/UDFManageVO.java @@ -30,6 +30,7 @@ public class UDFManageVO implements Serializable { private String name; private Boolean enabled; private String className; + private String language; private Integer taskId; private Integer resourcesId; /** diff --git a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java index e525cebf21..02464ea435 100644 --- a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java +++ b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java @@ -93,7 +93,6 @@ public class SystemInit implements ApplicationRunner { private final TenantService tenantService; private final GitProjectService gitProjectService; private final ScheduleThreadPool schedule; - private static Project project; @Override diff --git a/dinky-admin/src/main/java/org/dinky/service/TaskService.java b/dinky-admin/src/main/java/org/dinky/service/TaskService.java index 3974ef9897..81e52200d8 100644 --- a/dinky-admin/src/main/java/org/dinky/service/TaskService.java +++ b/dinky-admin/src/main/java/org/dinky/service/TaskService.java @@ -190,13 +190,6 @@ public interface TaskService extends ISuperService { */ Task initDefaultFlinkSQLEnv(Integer tenantId); - /** - * Get a list of all user-defined functions (UDFs) in the system. - * - * @return A list of {@link Task} objects representing the UDFs. - */ - List getAllUDF(); - /** * Get a list of all release user-defined functions (UDFs) in the system. * @return A list of {@link Task} objects representing the release UDFs. diff --git a/dinky-admin/src/main/java/org/dinky/service/UDFService.java b/dinky-admin/src/main/java/org/dinky/service/UDFService.java index 7054d8e57c..34e1bf91f8 100644 --- a/dinky-admin/src/main/java/org/dinky/service/UDFService.java +++ b/dinky-admin/src/main/java/org/dinky/service/UDFService.java @@ -21,7 +21,9 @@ import org.dinky.data.model.Resources; import org.dinky.data.model.udf.UDFManage; +import org.dinky.data.vo.CascaderVO; import org.dinky.data.vo.UDFManageVO; +import org.dinky.function.data.model.UDF; import java.util.List; @@ -55,4 +57,16 @@ public interface UDFService extends IService { */ @Transactional(rollbackFor = Exception.class) void addOrUpdateByResourceId(List resourceIds); + + /** + * get udf from udfManage + * @return List + */ + List getUDFFromUdfManage(); + + /** + * get all udf to cascader list + * @return List + */ + List getAllUdfsToCascader(List userDefinedReleaseUdfs); } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/FlinkServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/FlinkServiceImpl.java index 8a84f0be2c..54bc6e1c69 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/FlinkServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/FlinkServiceImpl.java @@ -49,14 +49,15 @@ public List loadConfigOptions() { List flinkConfigOptions = FlinkConfigOptionsUtils.loadOptionsByClassName(name); String binlogGroup = FlinkConfigOptionsUtils.parsedBinlogGroup(name); List child = flinkConfigOptions.stream() - .map(conf -> new CascaderVO(conf.getKey())) + .map(conf -> new CascaderVO(conf.getKey(), conf.getKey())) .collect(Collectors.toList()); CascaderVO cascaderVO = new CascaderVO(binlogGroup, child); dataList.add(cascaderVO); } List voList = documentService.lambdaQuery().eq(Document::getType, "FLINK_OPTIONS").list().stream() - .map(d -> new CascaderVO(d.getName().replace("set ", ""))) + .map(d -> new CascaderVO( + d.getName().replace("set ", ""), d.getName().replace("set ", ""))) .collect(Collectors.toList()); CascaderVO cascaderVO = new CascaderVO(); diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 72f8df14d7..dc97c91848 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -84,9 +84,11 @@ import org.dinky.service.SavepointsService; import org.dinky.service.TaskService; import org.dinky.service.TaskVersionService; +import org.dinky.service.UDFService; import org.dinky.service.UDFTemplateService; import org.dinky.service.UserService; import org.dinky.service.catalogue.CatalogueService; +import org.dinky.service.resource.ResourcesService; import org.dinky.service.task.BaseTask; import org.dinky.utils.FragmentVariableUtils; import org.dinky.utils.JsonUtils; @@ -109,6 +111,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import javax.annotation.Resource; @@ -156,6 +159,8 @@ public class TaskServiceImpl extends SuperServiceImpl implemen private final DataSourceProperties dsProperties; private final UserService userService; private final ApplicationContext applicationContext; + private final UDFService udfService; + private final ResourcesService resourcesService; @Resource @Lazy @@ -716,21 +721,20 @@ public JobModelOverview getJobStreamingOrBatchModelOverview() { return baseMapper.getJobStreamingOrBatchModelOverview(); } - @Override - public List getAllUDF() { - return list(new QueryWrapper() - .in("dialect", Dialect.JAVA.getValue(), Dialect.SCALA.getValue(), Dialect.PYTHON.getValue()) - .eq("enabled", 1) - .isNotNull("save_point_path")); - } - @Override public List getReleaseUDF() { return list(new LambdaQueryWrapper() - .in(Task::getDialect, Dialect.JAVA.getValue(), Dialect.SCALA.getValue(), Dialect.PYTHON.getValue()) - .eq(Task::getEnabled, 1) - .eq(Task::getStep, JobLifeCycle.PUBLISH.getValue()) - .isNotNull(Task::getSavePointPath)); + .in( + Task::getDialect, + Dialect.JAVA.getValue(), + Dialect.SCALA.getValue(), + Dialect.PYTHON.getValue()) + .eq(Task::getEnabled, 1) + .eq(Task::getStep, JobLifeCycle.PUBLISH.getValue())) + .stream() + .filter(task -> Asserts.isNotNullString( + task.getConfigJson().getUdfConfig().getClassName())) + .collect(Collectors.toList()); } @Override diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/UDFServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/UDFServiceImpl.java index 6b3d6ad7e4..4819893d68 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/UDFServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/UDFServiceImpl.java @@ -22,14 +22,20 @@ import org.dinky.config.Dialect; import org.dinky.data.model.Resources; import org.dinky.data.model.udf.UDFManage; +import org.dinky.data.vo.CascaderVO; import org.dinky.data.vo.UDFManageVO; +import org.dinky.function.data.model.UDF; import org.dinky.mapper.UDFManageMapper; import org.dinky.service.UDFService; import org.dinky.service.resource.ResourcesService; +import org.dinky.trans.Operations; import org.dinky.utils.UDFUtils; +import org.apache.flink.table.catalog.FunctionLanguage; + import java.io.File; import java.util.Collection; +import java.util.LinkedList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -76,8 +82,10 @@ public List selectAll() { String fileName = x.getFileName(); if ("jar".equals(FileUtil.getSuffix(fileName))) { x.setDialect(Dialect.JAVA.getValue()); + x.setLanguage(Dialect.JAVA.getValue()); } else { x.setDialect(Dialect.PYTHON.getValue()); + x.setLanguage(Dialect.JAVA.getValue()); } }) .collect(Collectors.toList()); @@ -118,6 +126,7 @@ public void addOrUpdateByResourceId(List resourceIds) { return classes.stream().map(clazz -> { UDFManage udfManage = UDFManage.builder() .className(clazz.getName()) + .language(FunctionLanguage.JAVA.name()) .resourcesId(x.getId()) .build(); udfManage.setName(StrUtil.toUnderlineCase(getSimpleClassName(clazz.getName()))); @@ -130,12 +139,13 @@ public void addOrUpdateByResourceId(List resourceIds) { UDFManage udfManage = UDFManage.builder() .className(className) .resourcesId(x.getId()) + .language(FunctionLanguage.PYTHON.name()) .build(); udfManage.setName(StrUtil.toUnderlineCase(getSimpleClassName(className))); return udfManage; }); } else { - log.error("Unsupported file type: {}", suffix); + log.error("Unsupported file type to add UDFManage, extension: {}", suffix); } return Stream.of(); }) @@ -144,6 +154,64 @@ public void addOrUpdateByResourceId(List resourceIds) { } } + /** + * @return + */ + @Override + public List getUDFFromUdfManage() { + // 1. get all resources + List resourcesList = resourcesService.list(); + // 2. get all udf from udf manage and then filter the udf by resources id in resources list + List collect = this.list().stream() + .filter(udf -> resourcesList.stream() + .anyMatch(resources -> resources.getId().equals(udf.getResourcesId()))) + .collect(Collectors.toList()); + // 去重 根据 className 去重 || distinct by className + return collect.stream() + .collect(Collectors.toMap(UDFManage::getClassName, udf -> udf, (a, b) -> a)) + .values() + .stream() + .collect(Collectors.toList()); + } + + /** + * get all udf to cascader list + * + * @return List + */ + @Override + public List getAllUdfsToCascader(List userDefinedReleaseUdfs) { + // Get all UDFs of static UDFs and dynamic UDFs + List staticUdfs = Operations.getCustomStaticUdfs(); + + // get all UDFs of UDFManage table + List udfManageDynamic = getUDFFromUdfManage().stream() + .map(UDFUtils::resourceUdfManageToUDF) + .collect(Collectors.toList()); + + CascaderVO staticUdfCascaderVO = new CascaderVO( + "Flink Static UDF", + staticUdfs.stream() + .map(udf -> new CascaderVO(udf.getClassName(), udf.getClassName())) + .collect(Collectors.toList())); + CascaderVO userDefinedUdfCascaderVO = new CascaderVO( + "User Defined Release UDF", + userDefinedReleaseUdfs.stream() + .map(udf -> new CascaderVO(udf.getClassName(), udf.getClassName())) + .collect(Collectors.toList())); + CascaderVO udfManageDynamicCascaderVO = new CascaderVO( + "From UDF Manage", + udfManageDynamic.stream() + .map(udf -> new CascaderVO(udf.getClassName(), udf.getClassName())) + .collect(Collectors.toList())); + + List result = new LinkedList<>(); + result.add(staticUdfCascaderVO); + result.add(udfManageDynamicCascaderVO); + result.add(userDefinedUdfCascaderVO); + return result; + } + private static String getSimpleClassName(String className) { final List packages = StrUtil.split(className, CharUtil.DOT); if (null == packages || packages.size() < 2) { diff --git a/dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java b/dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java index 1e771df27e..5345f81968 100644 --- a/dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java +++ b/dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java @@ -22,6 +22,7 @@ import org.dinky.assertion.Asserts; import org.dinky.data.exception.BusException; import org.dinky.data.model.Task; +import org.dinky.data.model.udf.UDFManage; import org.dinky.function.data.model.UDF; import org.dinky.function.util.UDFUtil; @@ -41,4 +42,18 @@ public static UDF taskToUDF(Task task) { throw new BusException("udf `class` config is null,please check your udf task config"); } } + + public static UDF resourceUdfManageToUDF(UDFManage udfManage) { + if (Asserts.isNotNull(udfManage)) { + return UDF.builder() + .name(udfManage.getName()) + .className(udfManage.getClassName()) + .functionLanguage( + FunctionLanguage.valueOf(udfManage.getLanguage().toUpperCase())) + .build(); + } else { + throw new BusException( + "udf `class` config is null, Please check if the resource file to which this udf belongs exists"); + } + } } diff --git a/dinky-admin/src/main/resources/db/migration/README.md b/dinky-admin/src/main/resources/db/migration/README.md index 41b83d7f4e..6adbbd1904 100644 --- a/dinky-admin/src/main/resources/db/migration/README.md +++ b/dinky-admin/src/main/resources/db/migration/README.md @@ -20,6 +20,23 @@ - V{版本号}__{描述}.sql 中间是**两个下划线**,固定规则,不符合规则将无法执行 - 每个版本只能有一个 V{版本号}__{描述}.sql 文件,否则将无法执行, 不管是 DDL 还是 DML 统一放在一个文件中 + + +**升级脚本注意事项:** +- 如果你需要对某一个表添加字段,请不要使用`alter table add column`语句,使用如下语句: + - MySQL: `CALL add_column_if_not_exists('tableName', 'columnName', 'dataType', 'defaultValue', 'comment');` + - eg: `CALL add_column_if_not_exists('user', 'age', 'int', '0', 'age');` + - PostgresSQL: `SELECT add_column_if_not_exists('model_name', 'table_name', 'column_name', 'data_type', 'default_value', 'comment');` + - eg: `SELECT add_column_if_not_exists('public', 'user', 'age', 'int', '0', 'age');` + + +**其他注意事项:** +- 在你贡献代码时,如若涉及到了变更表结构,请添加回滚脚本,虽然 FlyWay 会有事务回滚操作,回滚脚本不会被 FlyWay 自动自行,但是为了本地调试测试时能方便进行回滚,所以添加回滚脚本 +- 由于数据库类型不同,可能存在差异,请根据实际需求进行迭代增加脚本内容 +- H2 数据库脚本需要按照规范进行正常的版本迭代(方便版本管理),但是 H2 数据库脚本不需要添加回滚脚本,因为 H2 数据库是内存数据库(默认程序启动时配置为内存模式,未持久化),每次启动都会重新创建,所以不需要回滚脚本 + +--- + # English ## Pre requirements @@ -37,4 +54,18 @@ **Attention:** - V{version number}__{description}.SQL has two underscores in the middle, which are fixed rules. If they do not comply with the rules, they cannot be executed -- Each version can only have one V{version number}__{description}.sql file, otherwise it will not be executed, whether it is DDL or DML, it will be placed in one file \ No newline at end of file +- Each version can only have one V{version number}__{description}.sql file, otherwise it will not be executed, whether it is DDL or DML, it will be placed in one file + + +**Upgrade script considerations:** +- If you need to add fields to a table, do not use the 'alter table add column' statement. Instead, use the following statement: + - MySQL: `CALL add_column_if_not_exists('tableName', 'columnName', 'dataType', 'defaultValue', 'comment');` + - eg: `CALL add_column_if_not_exists('user', 'age', 'int', '0', 'age');` + - PostgresSQL: `SELECT add_column_if_not_exists('model_name', 'table_name', 'column_name', 'data_type', 'default_value', 'comment');` + - eg: `SELECT add_column_if_not_exists('public', 'user', 'age', 'int', '0', 'age');` + + +**Other precautions:** +- When you contribute code, if it involves changing the table structure, please add a rollback script. Although FlyWay may have transaction rollback operations, the rollback script will not be automatically rolled back by FlyWay. However, in order to facilitate rollback during local debugging and testing, add a rollback script +- Due to different database types, there may be differences. Please iterate and add script content according to actual needs +- The H2 database script needs to perform normal version iteration according to the specifications (for easy version management), but the H2 database script does not need to add a rollback script because the H2 database is an in memory database (configured in memory mode by default when the program starts, not persistent), and will be recreated every time it starts, so there is no need to add a rollback script \ No newline at end of file diff --git a/dinky-admin/src/main/resources/db/migration/h2/R1.1.0__release.sql b/dinky-admin/src/main/resources/db/migration/h2/R1.1.0__release.sql deleted file mode 100644 index 7ab19639cd..0000000000 --- a/dinky-admin/src/main/resources/db/migration/h2/R1.1.0__release.sql +++ /dev/null @@ -1,34 +0,0 @@ --- note: Rolling back SQL statements is only necessary to perform a rollback operation in the event of an automatic upgrade failure. The following SQL statements need to be manually executed - -update dinky_sys_menu set `path`='/settings/alertrule', - `component`='./SettingCenter/AlertRule', - `perms`='settings:alertrule', - `parent_id`=6 -where `id` = 116; - -update dinky_sys_menu set `path`='/settings/alertrule/add', - `perms`='settings:alertrule:add' -where `id` = 117; -update dinky_sys_menu set `path`='/settings/alertrule/delete', - `perms`='settings:alertrule:delete' -where `id` = 118; -update dinky_sys_menu set `path`='/settings/alertrule/edit', - `perms`='settings:alertrule:edit' -where `id` = 119; - -ALTER TABLE dinky_task DROP COLUMN `first_level_owner`; -ALTER TABLE dinky_task DROP COLUMN `second_level_owners`; - - -ALTER TABLE dinky_udf_manage ALTER COLUMN class_name SET DATA TYPE VARCHAR(50); - -ALTER TABLE dinky_history ALTER COLUMN statement SET DATA TYPE text ; - -ALTER TABLE dinky_task ALTER COLUMN statement SET DATA TYPE text ; - -ALTER TABLE dinky_task_version ALTER COLUMN statement SET DATA TYPE text ; - --- Delete the 1.1.0 record in the _dinky_flyway_schema_history table -DELETE FROM `_dinky_flyway_schema_history` WHERE version = '1.1.0'; - -ALTER TABLE dinky_resources ALTER COLUMN `file_name` SET DATA TYPE VARCHAR(64); \ No newline at end of file diff --git a/dinky-admin/src/main/resources/db/migration/h2/V1.1.0__release.sql b/dinky-admin/src/main/resources/db/migration/h2/V1.1.0__release.sql index e8fe73712b..aeadfe6eb9 100644 --- a/dinky-admin/src/main/resources/db/migration/h2/V1.1.0__release.sql +++ b/dinky-admin/src/main/resources/db/migration/h2/V1.1.0__release.sql @@ -43,4 +43,5 @@ ALTER TABLE dinky_task ALTER COLUMN statement SET DATA TYPE LONGVARCHAR ; ALTER TABLE dinky_task_version ALTER COLUMN statement SET DATA TYPE LONGVARCHAR ; -ALTER TABLE dinky_resources ALTER COLUMN `file_name` SET DATA TYPE TEXT; \ No newline at end of file +ALTER TABLE dinky_resources ALTER COLUMN `file_name` SET DATA TYPE TEXT; +alter table dinky_udf_manage add column `language` VARCHAR(10) DEFAULT null comment 'udf language' ; diff --git a/dinky-admin/src/main/resources/db/migration/mysql/R1.1.0__release.sql b/dinky-admin/src/main/resources/db/migration/mysql/R1.1.0__release.sql index 855333fbbc..ec4ffde3f9 100644 --- a/dinky-admin/src/main/resources/db/migration/mysql/R1.1.0__release.sql +++ b/dinky-admin/src/main/resources/db/migration/mysql/R1.1.0__release.sql @@ -31,6 +31,11 @@ ALTER TABLE dinky_task_version CHANGE COLUMN `statement` `statement` longtext DE # Delete the 1.1.0 record in the _dinky_flyway_schema_history table DELETE FROM `_dinky_flyway_schema_history` WHERE version = '1.1.0'; +ALTER TABLE dinky_udf_manage DROP COLUMN `language`; + + +ALTER TABLE dinky_resources CHANGE COLUMN `file_name` `file_name` varchar(64) DEFAULT NULL COMMENT 'file name'; + ALTER TABLE dinky_resources CHANGE COLUMN `file_name` `file_name` varchar(64) DEFAULT NULL COMMENT 'file name'; diff --git a/dinky-admin/src/main/resources/db/migration/mysql/V1.1.0__release.sql b/dinky-admin/src/main/resources/db/migration/mysql/V1.1.0__release.sql index d2500083c6..6aa668ed74 100644 --- a/dinky-admin/src/main/resources/db/migration/mysql/V1.1.0__release.sql +++ b/dinky-admin/src/main/resources/db/migration/mysql/V1.1.0__release.sql @@ -5,9 +5,17 @@ SET FOREIGN_KEY_CHECKS = 0; -- 创建存储过程 用于添加表字段时判断字段是否存在, 如果字段不存在则添加字段, 如果字段存在则不执行任何操作,避免添加重复字段时抛出异常,从而终止Flyway执行, 在 Flyway 执行时, 如果你需要增加字段,必须使用该存储过程 -- Create a stored procedure to determine whether a field exists when adding table fields. If the field does not exist, add it. If the field exists, do not perform any operations to avoid throwing exceptions when adding duplicate fields. When executing in Flyway, if you need to add a field, you must use this stored procedure +-- Parameter Description: +-- tableName: Table name +-- columnName: Field name +-- columnDefinitionType: Field type +-- columnDefinitionDefaultValue Value: Field default value +-- columnDefinitionComment: Field comment +-- afterColumnName: Field position, default value is empty. If it is not empty, it means adding a field after the afterColumnName field + DELIMITER $$ DROP PROCEDURE IF EXISTS add_column_if_not_exists$$ -CREATE PROCEDURE if not exists add_column_if_not_exists(IN tableName VARCHAR(64), IN columnName VARCHAR(64), IN columnDefinitionType VARCHAR(64), IN columnDefinitionDefaultValue VARCHAR(128), IN columnDefinitionComment VARCHAR(255)) +CREATE PROCEDURE if not exists add_column_if_not_exists(IN tableName VARCHAR(64), IN columnName VARCHAR(64), IN columnDefinitionType VARCHAR(64), IN columnDefinitionDefaultValue VARCHAR(128), IN columnDefinitionComment VARCHAR(255), in afterColumnName VARCHAR(64)) BEGIN IF NOT EXISTS ( SELECT * @@ -16,7 +24,12 @@ BEGIN AND table_name = tableName AND column_name = columnName ) THEN - SET @sql = CONCAT('ALTER TABLE ', tableName, ' ADD COLUMN ', columnName, ' ', columnDefinitionType , ' DEFAULT ', columnDefinitionDefaultValue, ' COMMENT ', columnDefinitionComment); + -- 判断 afterColumnName 入参是否 有值, 如果有值则拼接 afterColumnName 和 columnName 之间的关系 + IF (afterColumnName IS NOT NULL OR afterColumnName <> '') THEN + SET @sql = CONCAT('ALTER TABLE ', tableName, ' ADD COLUMN ', columnName, ' ', columnDefinitionType, ' DEFAULT ', columnDefinitionDefaultValue, ' COMMENT ', columnDefinitionComment, ' AFTER ', afterColumnName); + ELSE + SET @sql = CONCAT('ALTER TABLE ', tableName, ' ADD COLUMN ', columnName, ' ', columnDefinitionType, ' DEFAULT ', columnDefinitionDefaultValue, ' COMMENT ', columnDefinitionComment); + END IF; PREPARE stmt FROM @sql; EXECUTE stmt; END IF; @@ -53,8 +66,8 @@ where `id` = 119; ALTER TABLE dinky_udf_manage CHANGE COLUMN class_name class_name VARCHAR(100) null DEFAULT null COMMENT 'Complete class name'; -CALL add_column_if_not_exists('dinky_task', 'first_level_owner', 'int', 'NULL', 'primary responsible person id'); -CALL add_column_if_not_exists('dinky_task', 'second_level_owners', 'varchar(128)', 'NULL', 'list of secondary responsible persons ids'); +CALL add_column_if_not_exists('dinky_task', 'first_level_owner', 'int', 'NULL', 'primary responsible person id' ,''); +CALL add_column_if_not_exists('dinky_task', 'second_level_owners', 'varchar(128)', 'NULL', 'list of secondary responsible persons ids' , ''); update dinky_task set first_level_owner = creator; @@ -70,6 +83,6 @@ ALTER TABLE dinky_task_version CHANGE COLUMN `statement` `statement` mediumtext ALTER TABLE dinky_resources CHANGE COLUMN `file_name` `file_name` text DEFAULT NULL COMMENT 'file name'; - +CALL add_column_if_not_exists('dinky_udf_manage', 'language', 'varchar(10)', 'NULL', 'udf language' , 'class_name'); SET FOREIGN_KEY_CHECKS = 1; diff --git a/dinky-admin/src/main/resources/db/migration/pgsql/R1.1.0__release.sql b/dinky-admin/src/main/resources/db/migration/pgsql/R1.1.0__release.sql index 6080f2823f..0da1a45b36 100644 --- a/dinky-admin/src/main/resources/db/migration/pgsql/R1.1.0__release.sql +++ b/dinky-admin/src/main/resources/db/migration/pgsql/R1.1.0__release.sql @@ -25,4 +25,6 @@ DELETE FROM public."_dinky_flyway_schema_history" WHERE version = '1.1.0'; UPDATE public.dinky_user SET "password" = '21232f297a57a5a743894a0e4a801fc3' WHERE "id" =1 and "password"= 'f4b3a484ee745b98d64cd69c429b2aa2'; -ALTER TABLE public.dinky_resources ALTER COLUMN file_name TYPE varchar(64); \ No newline at end of file +ALTER TABLE public.dinky_resources ALTER COLUMN file_name TYPE varchar(64); + +alter table public.dinky_udf_manage drop column "language"; diff --git a/dinky-admin/src/main/resources/db/migration/pgsql/V1.1.0__release.sql b/dinky-admin/src/main/resources/db/migration/pgsql/V1.1.0__release.sql index c8233154cf..8f47b01eea 100644 --- a/dinky-admin/src/main/resources/db/migration/pgsql/V1.1.0__release.sql +++ b/dinky-admin/src/main/resources/db/migration/pgsql/V1.1.0__release.sql @@ -118,4 +118,19 @@ update public.dinky_task set "first_level_owner" = "creator"; UPDATE public.dinky_user SET "password" = 'f4b3a484ee745b98d64cd69c429b2aa2' WHERE "id" =1 and "password"= '21232f297a57a5a743894a0e4a801fc3'; -ALTER TABLE public.dinky_resources ALTER COLUMN file_name TYPE TEXT; \ No newline at end of file +ALTER TABLE public.dinky_resources ALTER COLUMN file_name TYPE TEXT; + +SELECT add_column_if_not_exists('public','dinky_udf_manage', 'language', 'varchar(10)', 'null', 'udf language'); + +UPDATE + dinky_udf_manage duml +SET + "language" = + CASE + WHEN r.file_name LIKE '%.zip' OR r.file_name LIKE '%.py' THEN 'python' + WHEN r.file_name LIKE '%.jar' THEN 'java' + ELSE 'unknown' + END + FROM dinky_resources r +WHERE + duml.resources_id = r.id; diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java index 93bbd89d6f..d384e1f69d 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java @@ -19,6 +19,12 @@ package org.dinky.cdc.doris; +import org.dinky.assertion.Asserts; +import org.dinky.cdc.AbstractSinkBuilder; +import org.dinky.cdc.SinkBuilder; +import org.dinky.data.model.FlinkCDCConfig; +import org.dinky.data.model.Table; + import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; @@ -30,11 +36,6 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.utils.TypeConversions; -import org.dinky.assertion.Asserts; -import org.dinky.cdc.AbstractSinkBuilder; -import org.dinky.cdc.SinkBuilder; -import org.dinky.data.model.FlinkCDCConfig; -import org.dinky.data.model.Table; import java.io.Serializable; import java.util.ArrayList; @@ -183,7 +184,7 @@ public void addSink( DorisSink.Builder builder = DorisSink.builder(); builder.setDorisReadOptions(readOptionBuilder.build()) .setDorisExecutionOptions(executionBuilder.build()) - .setSerializer( RowDataSerializer.builder() + .setSerializer(RowDataSerializer.builder() .setFieldNames(columnNames) .setType("json") .enableDelete(true) diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkOptions.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkOptions.java index 383e1d3b65..dd6cb61553 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkOptions.java +++ b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkOptions.java @@ -119,11 +119,11 @@ public class DorisSinkOptions { .defaultValue(1) .withDescription("In the 2pc scenario, the number of retries after the commit phase fails."); - public static final ConfigOption SINK_USE_NEW_SCHEMA_CHANGE = ConfigOptions.key("sink.use-new-schema-change") + public static final ConfigOption SINK_USE_NEW_SCHEMA_CHANGE = ConfigOptions.key( + "sink.use-new-schema-change") .booleanType() .defaultValue(false) .withDescription( "supports table column name, column type, default, comment synchronization, supports multi-column changes, " - +"and supports column name rename. Need to be enabled by configuring use-new-schema-change."); - + + "and supports column name rename. Need to be enabled by configuring use-new-schema-change."); } diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index e0f9f9fd95..4880289439 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -58,8 +58,10 @@ import java.net.URL; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; @@ -114,7 +116,15 @@ public JobParam pretreatStatements(String[] statements) { List statementList = new ArrayList<>(); List udfList = new ArrayList<>(); StrBuilder parsedSql = new StrBuilder(); - for (String item : statements) { + + List statementsWithUdf = Arrays.stream(statements).collect(Collectors.toList()); + Optional.ofNullable(jobManager.getConfig().getUdfRefer()) + .ifPresent(t -> t.forEach((key, value) -> { + String sql = String.format("create temporary function %s as '%s'", value, key); + statementsWithUdf.add(0, sql); + })); + + for (String item : statementsWithUdf) { String statement = executor.pretreatStatement(item); parsedSql.append(statement).append(";\n"); if (statement.isEmpty()) { diff --git a/dinky-core/src/main/java/org/dinky/job/JobConfig.java b/dinky-core/src/main/java/org/dinky/job/JobConfig.java index f5fd35a499..da480bb125 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobConfig.java +++ b/dinky-core/src/main/java/org/dinky/job/JobConfig.java @@ -91,6 +91,13 @@ public class JobConfig { notes = "JSON configuration") private Map configJson; + @ApiModelProperty( + value = "UDF configuration", + dataType = "Map", + example = "{\"udf1\": \"value1\", \"udf2\": \"value2\"}", + notes = "UDF (User-Defined Function) configuration") + private Map udfRefer; + @ApiModelProperty( value = "Flag indicating whether to use the result", dataType = "boolean", diff --git a/dinky-core/src/main/java/org/dinky/trans/Operations.java b/dinky-core/src/main/java/org/dinky/trans/Operations.java index a65a344520..f4984f416e 100644 --- a/dinky-core/src/main/java/org/dinky/trans/Operations.java +++ b/dinky-core/src/main/java/org/dinky/trans/Operations.java @@ -19,15 +19,24 @@ package org.dinky.trans; +import org.dinky.function.data.model.UDF; import org.dinky.parser.SqlType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.functions.UserDefinedFunction; + import java.lang.reflect.InvocationTargetException; import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import org.reflections.Reflections; import org.reflections.scanners.Scanners; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; import lombok.extern.slf4j.Slf4j; @@ -45,6 +54,7 @@ private Operations() {} private static final Operation[] ALL_OPERATIONS = getAllOperations(); + private static final List JAVA_STATIC_UDF_LIST = getCustomStaticUdfs(); /** * get all {@link Operation} children ordinary class, * @@ -95,4 +105,23 @@ public static Operation buildOperation(String statement) { .map(p -> p.create(statement)) .orElse(null); } + + public static List getCustomStaticUdfs() { + if (CollectionUtils.isNotEmpty(JAVA_STATIC_UDF_LIST)) { + return JAVA_STATIC_UDF_LIST; + } + + Reflections reflections = + new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath())); + Set> operations = + reflections.get(Scanners.SubTypes.of(UserDefinedFunction.class).asClass()); + return operations.stream() + .filter(operation -> + !operation.isInterface() && !operation.getName().startsWith("org.apache")) + .map(operation -> UDF.builder() + .className(operation.getName()) + .functionLanguage(FunctionLanguage.JAVA) + .build()) + .collect(Collectors.toList()); + } } diff --git a/dinky-web/src/components/Flink/OptionsSelect/index.tsx b/dinky-web/src/components/Flink/OptionsSelect/index.tsx index 1dc648e2a5..5e7213d79a 100644 --- a/dinky-web/src/components/Flink/OptionsSelect/index.tsx +++ b/dinky-web/src/components/Flink/OptionsSelect/index.tsx @@ -21,6 +21,7 @@ import { l } from '@/utils/intl'; import { ProFormSelect } from '@ant-design/pro-components'; import { ProFormSelectProps } from '@ant-design/pro-form/es/components/Select'; import { Divider, Typography } from 'antd'; +import React from "react"; const { Link } = Typography; diff --git a/dinky-web/src/components/Flink/UdfSelect/index.tsx b/dinky-web/src/components/Flink/UdfSelect/index.tsx new file mode 100644 index 0000000000..e7cf6263b9 --- /dev/null +++ b/dinky-web/src/components/Flink/UdfSelect/index.tsx @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import {ProFormSelect} from '@ant-design/pro-components'; +import {ProFormSelectProps} from '@ant-design/pro-form/es/components/Select'; +import {Divider} from 'antd'; +import React from "react"; + +export type FlinkUdfOptionsProps = ProFormSelectProps & {}; + +const FlinkUdfOptionsSelect = (props: FlinkUdfOptionsProps) => { + + const renderTemplateDropDown = (item: any) => { + return ( + <> + + {item} + + ); + }; + + return ( + renderTemplateDropDown(item), ...props.fieldProps}} + /> + ); +}; + +export default FlinkUdfOptionsSelect; diff --git a/dinky-web/src/locales/en-US/pages.ts b/dinky-web/src/locales/en-US/pages.ts index 29c97fce68..409cb71723 100644 --- a/dinky-web/src/locales/en-US/pages.ts +++ b/dinky-web/src/locales/en-US/pages.ts @@ -449,6 +449,12 @@ export default { 'pages.datastudio.label.jobConfig.addConfig': 'Add Config item', 'pages.datastudio.label.jobConfig.addConfig.params': 'parameters', 'pages.datastudio.label.jobConfig.addConfig.value': 'value', + 'pages.datastudio.label.udf': 'Udf Item', + 'pages.datastudio.label.udf.tip': 'Inject UDF item, Automatically add statement `create temporary function {functionName} as {className}` at the beginning of the SQL statement', + 'pages.datastudio.label.udf.duplicate.tip': 'The class [{className}] selected this time already exists and duplicate injection is not allowed. Please reselect or cancel injection (delete and change line).', + 'pages.datastudio.label.udf.injectUdf': 'Inject UDF item', + 'pages.datastudio.label.udf.name': 'function name', + 'pages.datastudio.label.udf.className': 'class name', 'pages.datastudio.label.jobConfig.alertGroup': 'Alarm Group', 'pages.datastudio.label.jobConfig.alertGroup.tip': 'Select alert group', 'pages.datastudio.label.jobConfig.batchmode': 'Batch Mode', @@ -475,6 +481,8 @@ export default { 'pages.datastudio.label.jobConfig.other': 'Other Config', 'pages.datastudio.label.jobConfig.other.tip': 'Other Config items will be applied to the execution environment, such as pipeline.name', + 'pages.datastudio.label.jobConfig.udf': 'UDF injected', + 'pages.datastudio.label.jobConfig.udf.tip': 'Automatically inject UDF', 'pages.datastudio.label.jobConfig.parallelism': 'Parallelism', 'pages.datastudio.label.jobConfig.parallelism.tip': 'Set the parallelism of Flink tasks, the minimum value is 1', @@ -998,6 +1006,7 @@ export default { 'Support for a single or bulk upload. Strictly prohibited from uploading company data or\n other banned files.', 'rc.resource.filelist': 'File list', 'rc.resource.sync': 'Sync remote files', + 'rc.resource.sync.confirm': 'Please note that this operation will delete all records in the database and will affect running jobs as well as corresponding resource files referenced in UDF management, resulting in job failure. And UDF cannot be used in UDF management Please operate with caution!! Please confirm if you want to continue?', 'rc.resource.copy_to_add_custom_jar': 'Copy as ADD CUSTOMJAR syntax', 'rc.resource.copy_to_add_jar': 'Copy as ADD JAR syntax', 'rc.resource.copy_to_add_file': 'Copy as ADD FILE syntax', diff --git a/dinky-web/src/locales/zh-CN/pages.ts b/dinky-web/src/locales/zh-CN/pages.ts index 5d72e5de2a..feea66b0f8 100644 --- a/dinky-web/src/locales/zh-CN/pages.ts +++ b/dinky-web/src/locales/zh-CN/pages.ts @@ -428,6 +428,12 @@ export default { 'pages.datastudio.label.jobConfig.addConfig': '添加配置项', 'pages.datastudio.label.jobConfig.addConfig.params': '参数', 'pages.datastudio.label.jobConfig.addConfig.value': '值', + 'pages.datastudio.label.udf': '注入UDF算子', + 'pages.datastudio.label.udf.tip': '注入UDF算子, 自动在所有语句前注入`create temporary function {functionName} as {className}` 语句', + 'pages.datastudio.label.udf.duplicate.tip': '此次选择的类[{className}]已经存在,不允许重复注入,请重新选择,或者取消注入(删除改行即可)。', + 'pages.datastudio.label.udf.injectUdf': '注入UDF', + 'pages.datastudio.label.udf.name': '函数名称', + 'pages.datastudio.label.udf.className': '类名', 'pages.datastudio.label.jobConfig.alertGroup': '告警组', 'pages.datastudio.label.jobConfig.alertGroup.tip': '选择告警组', 'pages.datastudio.label.jobConfig.batchmode': '批模式', @@ -450,6 +456,8 @@ export default { 'pages.datastudio.label.jobConfig.fragment.tip': '【增强特性】 开启FlinkSql全局变量', 'pages.datastudio.label.jobConfig.other': '其他配置', 'pages.datastudio.label.jobConfig.other.tip': '其他配置项,将被应用于执行环境,如 pipeline.name', + 'pages.datastudio.label.jobConfig.udf': 'UDF注入', + 'pages.datastudio.label.jobConfig.udf.tip': '自动注入UDF算子', 'pages.datastudio.label.jobConfig.parallelism': '任务并行度', 'pages.datastudio.label.jobConfig.parallelism.tip': '设置Flink任务的并行度,最小为 1', 'pages.datastudio.label.jobConfig.savePointStrategy': 'Savepoint策略', @@ -954,6 +962,7 @@ export default { 'rc.resource.upload.tip2': '支持单个或批量上传。严禁上传公司数据或其他禁止上传的文件。', 'rc.resource.filelist': '文件列表', 'rc.resource.sync': '同步目录结构', + 'rc.resource.sync.confirm': '请注意: 该操作会删除数据库内的所有记录,且会关系到运行中的作业,以及UDF管理中引用的对应资源文件.从而导致作业运行失败。以及在 UDF管理中的 UDF 无法被使用. 请谨慎操作!! 请确认是否继续? ', 'rc.resource.copy_to_add_custom_jar': '复制为 ADD CUSTOMJAR 语法', 'rc.resource.copy_to_add_jar': '复制为 ADD JAR 语法', 'rc.resource.copy_to_add_file': '复制为 ADD FILE 语法', diff --git a/dinky-web/src/pages/DataStudio/MiddleContainer/StudioEditor/constants.tsx b/dinky-web/src/pages/DataStudio/MiddleContainer/StudioEditor/constants.tsx index 983e4204ea..61844d2378 100644 --- a/dinky-web/src/pages/DataStudio/MiddleContainer/StudioEditor/constants.tsx +++ b/dinky-web/src/pages/DataStudio/MiddleContainer/StudioEditor/constants.tsx @@ -21,8 +21,8 @@ import { l } from '@/utils/intl'; export const PARAM_DIFF_TABLE_COL = [ { title: l('pages.datastudio.sql.configItem'), key: 'key', dataIndex: 'key' }, - { title: l('pages.datastudio.sql.cacheConfigItem'), key: 'cache', dataIndex: 'cache' }, - { title: l('pages.datastudio.sql.serverConfigItem'), key: 'server', dataIndex: 'server' } + { title: l('pages.datastudio.sql.serverConfigItem'), key: 'server', dataIndex: 'server' }, + { title: l('pages.datastudio.sql.cacheConfigItem'), key: 'cache', dataIndex: 'cache' } ]; export const DIFF_EDITOR_PARAMS = { diff --git a/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/index.tsx b/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/index.tsx index 98df5e70a0..2443c399e0 100644 --- a/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/index.tsx +++ b/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/index.tsx @@ -18,7 +18,7 @@ */ import FlinkOptionsSelect from '@/components/Flink/OptionsSelect'; -import { SAVE_POINT_TYPE } from '@/pages/DataStudio/constants'; +import {SAVE_POINT_TYPE} from '@/pages/DataStudio/constants'; import { getCurrentData, getCurrentTab, @@ -52,22 +52,26 @@ import { ProFormText } from '@ant-design/pro-components'; import { useModel } from '@umijs/max'; -import { Alert, Space } from 'antd'; +import {Alert, Input, Space} from 'antd'; import { useForm } from 'antd/es/form/Form'; import { debounce } from 'lodash'; -import { useEffect, useState } from 'react'; +import React, {useEffect, useState} from 'react'; import { connect } from 'umi'; +import FlinkUdfOptionsSelect from "@/components/Flink/UdfSelect"; +import {TaskUdfRefer} from "@/types/Studio/data"; +import {ErrorMessageAsync} from "@/utils/messages"; const JobConfig = (props: any) => { const { sessionCluster, clusterConfiguration, dispatch, - tabs: { panes, activeKey }, + tabs: {panes, activeKey}, env, group, rightContainer, flinkConfigOptions, + flinkUdfOptions, taskOwnerLockingStrategy } = props; @@ -77,6 +81,10 @@ const JobConfig = (props: any) => { const [selectRunMode, setSelectRunMode] = useState(current?.type ?? RUN_MODE.LOCAL); + const [currentSelectUdfIndexMap, setCurrentSelectUdfIndexMap] = useState>( + new Map(current?.configJson?.udfRefer?.map((item: TaskUdfRefer, index: number) => [index, item]) ?? []) + ); + const { initialState, setInitialState } = useModel('@@initialState'); const isLockTask = lockTask( @@ -93,8 +101,11 @@ const JobConfig = (props: any) => { dispatch({ type: ALERT_MODEL_ASYNC.queryAlertGroup }); + dispatch({ + type: STUDIO_MODEL_ASYNC.queryFlinkUdfOptions + }); setSelectRunMode(current?.type ?? RUN_MODE.LOCAL); - form.setFieldsValue({ ...current, type: current?.type }); + form.setFieldsValue({...current, type: current?.type}); }, [current]); const onValuesChange = (change: { [key in string]: any }, all: any) => { @@ -119,10 +130,87 @@ const JobConfig = (props: any) => { pane.isModified = true; dispatch({ type: STUDIO_MODEL.saveTabs, - payload: { ...props.tabs } + payload: {...props.tabs} + }); + }; + + + /** + * 处理 selectUdfIndexMap 的状态 | process the state of selectUdfIndexMap + * @param index + * @param className + * @param name + */ + function processSelectUdfMapState(index: number, className: string = '', name: string = '') { + setCurrentSelectUdfIndexMap(prevState => { + const newState = new Map(prevState); + newState.set(index, { + className: className, + name: name + }); + return newState; + }); + } + + + const handleClassChange = async (value: string, index: number) => { + // 检测 这个值是否已经存在 currentSelectUdfIndexMap 的 map 中 || check if the value already exists in the map of currentSelectUdfIndexMap + const values = currentSelectUdfIndexMap.values(); + for (const taskUdfRefer of values) { + if (taskUdfRefer?.className === value) { + await ErrorMessageAsync(l('pages.datastudio.label.udf.duplicate.tip', '', {className: value}), 3); + // clear the value of the form + form.setFieldsValue({ + 'configJson': { + 'udfRefer': { + [index]: { + className: '', + name: '' + } + } + } + }); + return; + } + } + const simpleClassName = value?.split('.')?.pop() ?? ''; + const lowerName = simpleClassName.charAt(0).toLowerCase() + simpleClassName.slice(1); + processSelectUdfMapState(index, value, lowerName); + form.setFieldsValue({ + 'configJson': { + 'udfRefer': { + [index]: { + className: value, + name: lowerName + } + } + } }); }; + + function handleNameChange(name: string, index: number) { + // 拿到 currentSelectUdfIndexMap[index].get(index) 的值 || get the value of currentSelectUdfIndexMap[index].get(index) + const currentSelectUdfIndexMapValue = currentSelectUdfIndexMap.get(index); + + + // 如果 name 和 currentSelectUdfIndexMapValue?.name 相等 则不做任何操作 || if name and currentSelectUdfIndexMapValue?.name are equal, do nothing + if (currentSelectUdfIndexMapValue?.name && name !== currentSelectUdfIndexMapValue?.name) { + // 更新 currentSelectUdfIndexMap 的值 + processSelectUdfMapState(index, currentSelectUdfIndexMapValue?.className, name); + } + form.setFieldsValue({ + 'configJson': { + 'udfRefer': { + [index]: { + className: currentSelectUdfIndexMapValue?.className ?? '', + name: name + } + } + } + }); + } + return (
{(current?.step === JOB_LIFE_CYCLE.PUBLISH || isLockTask) && ( @@ -148,7 +236,7 @@ const JobConfig = (props: any) => { alertGroupId: -1 }} className={'data-studio-form'} - style={{ paddingInline: '15px', overflow: 'scroll', marginTop: 5 }} + style={{paddingInline: '15px', overflow: 'scroll', marginTop: 5}} form={form} submitter={false} layout='vertical' @@ -160,7 +248,7 @@ const JobConfig = (props: any) => { name='type' label={l('global.table.execmode')} tooltip={l('pages.datastudio.label.jobConfig.execmode.tip')} - rules={[{ required: true, message: l('pages.datastudio.label.jobConfig.execmode.tip') }]} + rules={[{required: true, message: l('pages.datastudio.label.jobConfig.execmode.tip')}]} options={buildRunModelOptions()} fieldProps={{ onChange: (value: string) => { @@ -176,7 +264,7 @@ const JobConfig = (props: any) => { {isCanRenderClusterInstance(selectRunMode) && ( <> { tooltip={l('pages.datastudio.label.jobConfig.flinksql.env.tip1')} options={buildEnvOptions(env)} rules={[ - { required: true, message: l('pages.datastudio.label.jobConfig.flinksql.env.tip1') } + {required: true, message: l('pages.datastudio.label.jobConfig.flinksql.env.tip1')} ]} showSearch allowClear={false} @@ -253,7 +341,7 @@ const JobConfig = (props: any) => { valuePropName='checked' tooltip={{ title: l('pages.datastudio.label.jobConfig.fragment.tip'), - icon: + icon: }} {...SWITCH_OPTIONS()} /> @@ -263,7 +351,7 @@ const JobConfig = (props: any) => { valuePropName='checked' tooltip={{ title: l('pages.datastudio.label.jobConfig.batchmode.tip'), - icon: + icon: }} {...SWITCH_OPTIONS()} /> @@ -300,7 +388,7 @@ const JobConfig = (props: any) => { name={['configJson', 'customConfig']} copyIconProps={false} creatorButtonProps={{ - style: { width: '100%' }, + style: {width: '100%'}, creatorButtonText: l('pages.datastudio.label.jobConfig.addConfig') }} > @@ -323,6 +411,56 @@ const JobConfig = (props: any) => { + { + // 删除一项之后拿到 index 从 currentSelectUdfIndexMap 中删除对应的值 || get the value from currentSelectUdfIndexMap and delete it + setCurrentSelectUdfIndexMap(prevState => { + const newState = new Map(prevState); + newState.delete(index); + return newState; + }); + }} + creatorButtonProps={{ + style: {width: '100%'}, + creatorButtonText: l('pages.datastudio.label.udf.injectUdf') + }} + > + {( + _, index + ) => { + return ( + + + handleClassChange(value, index)} + /> + + handleNameChange(e.target.value, index)} + placeholder={l('pages.datastudio.label.udf.name')} + style={{width: calculatorWidth(rightContainer.width) - 80}} + /> + + + + ); + } + } +
); @@ -345,6 +483,7 @@ export default connect( env: Studio.env, group: Alert.group, flinkConfigOptions: Studio.flinkConfigOptions, + flinkUdfOptions: Studio.flinkUdfOptions, taskOwnerLockingStrategy: SysConfig.taskOwnerLockingStrategy }) )(JobConfig); diff --git a/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/service.tsx b/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/service.tsx index 07d3de4bb4..b004ea8092 100644 --- a/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/service.tsx +++ b/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/service.tsx @@ -34,6 +34,11 @@ export function getFlinkConfigs() { return queryDataByParams(API_CONSTANTS.FLINK_CONF_CONFIG_OPTIONS); } -export function querySuggessionData(params: any) { +export function getFlinkUdfOptions() { + return queryDataByParams(API_CONSTANTS.ALL_UDF_LIST); + +} + +export function querySuggestionData(params: any) { return getDataByParams(API_CONSTANTS.SUGGESTION_QUERY_ALL_SUGGESTIONS, params); } diff --git a/dinky-web/src/pages/DataStudio/model.ts b/dinky-web/src/pages/DataStudio/model.ts index 2f41ee6290..946a5acc74 100644 --- a/dinky-web/src/pages/DataStudio/model.ts +++ b/dinky-web/src/pages/DataStudio/model.ts @@ -28,9 +28,9 @@ import { import { getClusterConfigurationData, getEnvData, - getFlinkConfigs, + getFlinkConfigs, getFlinkUdfOptions, getSessionData, - querySuggessionData + querySuggestionData } from '@/pages/DataStudio/RightContainer/JobConfig/service'; import { QueryParams } from '@/pages/RegCenter/DataSource/components/DataSourceDetail/RightTagsRouter/data'; import { UserBaseInfo } from '@/types/AuthCenter/data.d'; @@ -302,6 +302,7 @@ export type StateType = { sessionCluster: Cluster.Instance[]; clusterConfiguration: Cluster.Config[]; flinkConfigOptions: DefaultOptionType[]; + flinkUdfOptions: DefaultOptionType[]; env: EnvType[]; tabs: TabsType; bottomContainerContent: BottomContainerContent; @@ -316,6 +317,7 @@ export type ModelType = { effects: { queryProject: Effect; queryFlinkConfigOptions: Effect; + queryFlinkUdfOptions: Effect; querySuggestions: Effect; queryEnv: Effect; queryDatabaseList: Effect; @@ -358,6 +360,7 @@ export type ModelType = { saveFooterValue: Reducer; updateJobRunningMsg: Reducer; saveFlinkConfigOptions: Reducer; + saveFlinkUdfOptions: Reducer; updateSuggestions: Reducer; saveTaskSortTypeData: Reducer; saveUserData: Reducer; @@ -419,6 +422,7 @@ const Model: ModelType = { sessionCluster: [], clusterConfiguration: [], flinkConfigOptions: [], + flinkUdfOptions: [], env: [], footContainer: { codePosition: [1, 1], @@ -459,8 +463,15 @@ const Model: ModelType = { payload: response }); }, + *queryFlinkUdfOptions({ payload }, { call, put }) { + const response: [] = yield call(getFlinkUdfOptions, payload); + yield put({ + type: 'saveFlinkUdfOptions', + payload: response + }); + }, *querySuggestions({ payload }, { call, put }) { - const response: SuggestionInfo[] = yield call(querySuggessionData, payload); + const response: SuggestionInfo[] = yield call(querySuggestionData, payload); yield put({ type: 'updateSuggestions', payload: response @@ -687,6 +698,15 @@ const Model: ModelType = { flinkConfigOptions: payload }; }, + /** + * udf options + */ + saveFlinkUdfOptions(state, { payload }) { + return { + ...state, + flinkUdfOptions: payload + }; + }, /** * 更新tabs activeKey */ diff --git a/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/index.tsx b/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/index.tsx index 5769c5ebb7..05766009de 100644 --- a/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/index.tsx +++ b/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/index.tsx @@ -60,7 +60,7 @@ const ConfigurationModal: React.FC = (props) useEffect(() => { if (visible) { dispatch({ - type: STUDIO_MODEL_ASYNC.queryFlinkConfigOptions + type:STUDIO_MODEL_ASYNC.queryFlinkConfigOptions }); } form.setFieldsValue(value); diff --git a/dinky-web/src/pages/RegCenter/Resource/components/ResourceOverView/index.tsx b/dinky-web/src/pages/RegCenter/Resource/components/ResourceOverView/index.tsx index 9a17de43f2..17c7d44d46 100644 --- a/dinky-web/src/pages/RegCenter/Resource/components/ResourceOverView/index.tsx +++ b/dinky-web/src/pages/RegCenter/Resource/components/ResourceOverView/index.tsx @@ -48,7 +48,7 @@ import { WarningOutlined } from '@ant-design/icons'; import { ProCard } from '@ant-design/pro-components'; import { history } from '@umijs/max'; import { useAsyncEffect } from 'ahooks'; -import { Button, Result } from 'antd'; +import {Button, Modal, Result} from 'antd'; import { MenuInfo } from 'rc-menu/es/interface'; import React, { useCallback, useEffect, useRef, useState } from 'react'; import { connect } from 'umi'; @@ -238,8 +238,15 @@ const ResourceOverView: React.FC = (props) => { }; const handleSync = async () => { - await handleGetOption(API_CONSTANTS.RESOURCE_SYNC_DATA, l('rc.resource.sync'), {}); - await refreshTree(); + Modal.confirm({ + title: l('rc.resource.sync'), + content: l('rc.resource.sync.confirm'), + onOk: async () => { + await handleGetOption(API_CONSTANTS.RESOURCE_SYNC_DATA, l('rc.resource.sync'), {}); + await refreshTree(); + } + }); + }; /** diff --git a/dinky-web/src/pages/RegCenter/UDF/components/UDFRegister/index.tsx b/dinky-web/src/pages/RegCenter/UDF/components/UDFRegister/index.tsx index 9c5f5ca2d6..7f795ec303 100644 --- a/dinky-web/src/pages/RegCenter/UDF/components/UDFRegister/index.tsx +++ b/dinky-web/src/pages/RegCenter/UDF/components/UDFRegister/index.tsx @@ -131,6 +131,12 @@ const UDFRegister: React.FC = (props) => { readonly: true, width: '15%' }, + { + title: l('rc.udf.register.language'), + dataIndex: 'language', + readonly: true, + width: '15%' + }, { title: l('global.table.updateTime'), dataIndex: 'updateTime', diff --git a/dinky-web/src/services/endpoints.tsx b/dinky-web/src/services/endpoints.tsx index 5be5795f09..cba31e76da 100644 --- a/dinky-web/src/services/endpoints.tsx +++ b/dinky-web/src/services/endpoints.tsx @@ -164,6 +164,7 @@ export enum API_CONSTANTS { UDF_RESOURCES_LIST = '/api/udf/udfResourcesList', UDF_ADD = '/api/udf/addOrUpdateByResourceId', UDF_UPDATE = '/api/udf/update', + ALL_UDF_LIST = '/api/udf/getAllUdfs', // ------------------------------------ udf template ------------------------------------ UDF_TEMPLATE = '/api/udf/template/list', diff --git a/dinky-web/src/types/RegCenter/data.d.ts b/dinky-web/src/types/RegCenter/data.d.ts index fa518607f7..38812c32f4 100644 --- a/dinky-web/src/types/RegCenter/data.d.ts +++ b/dinky-web/src/types/RegCenter/data.d.ts @@ -346,6 +346,7 @@ export interface UDFRegisterInfo { resourcesId: number; name: string; className: string; + language: string; enable: boolean; dialect: string; source: string; diff --git a/dinky-web/src/types/Studio/data.d.ts b/dinky-web/src/types/Studio/data.d.ts index 6d0240da37..b285eff55e 100644 --- a/dinky-web/src/types/Studio/data.d.ts +++ b/dinky-web/src/types/Studio/data.d.ts @@ -44,12 +44,18 @@ export type TaskUdfConfig = { className: string; }; +export type TaskUdfRefer = { + name: string; + className: string; +} + export type ConfigItem = { key: string; value: string; }; export type TaskExtConfig = { + udfRefer: List; udfConfig: TaskUdfConfig; customConfig: List>; }; diff --git a/script/sql/dinky-mysql.sql b/script/sql/dinky-mysql.sql index 87003fe050..021b77d2dd 100644 --- a/script/sql/dinky-mysql.sql +++ b/script/sql/dinky-mysql.sql @@ -2014,7 +2014,8 @@ drop table if exists `dinky_udf_manage`; CREATE TABLE `dinky_udf_manage` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(50) DEFAULT NULL COMMENT 'udf name', - `class_name` varchar(100) DEFAULT NULL COMMENT 'Complete class name', + `class_name` varchar(50) DEFAULT NULL COMMENT 'Complete class name', + `language` varchar(10) DEFAULT NULL COMMENT 'language', `task_id` int(11) DEFAULT NULL COMMENT 'task id', `resources_id` int(11) DEFAULT NULL COMMENT 'resources id', `enabled` tinyint(1) DEFAULT 1 COMMENT 'is enable', diff --git a/script/sql/dinky-pg.sql b/script/sql/dinky-pg.sql index a82774d039..0f91a6099c 100644 --- a/script/sql/dinky-pg.sql +++ b/script/sql/dinky-pg.sql @@ -4049,7 +4049,8 @@ CREATE TABLE dinky_udf_manage ( id SERIAL PRIMARY KEY NOT NULL, name VARCHAR(50), - class_name VARCHAR(100), + class_name VARCHAR(50), + language varchar(10) , task_id INT, resources_id INT, enabled BOOLEAN DEFAULT TRUE, @@ -4064,6 +4065,7 @@ CREATE INDEX name_resources_id_idx ON dinky_udf_manage (name, resources_id); COMMENT ON COLUMN dinky_udf_manage.id IS 'id'; COMMENT ON COLUMN dinky_udf_manage.name IS 'udf name'; COMMENT ON COLUMN dinky_udf_manage.class_name IS 'Complete class name'; +COMMENT ON COLUMN dinky_udf_manage.language IS 'udf language'; COMMENT ON COLUMN dinky_udf_manage.task_id IS 'task_id'; COMMENT ON COLUMN dinky_udf_manage.resources_id IS 'resources_id'; diff --git a/script/sql/upgrade/1.1.0_schema/mysql/dinky_ddl.sql b/script/sql/upgrade/1.1.0_schema/mysql/dinky_ddl.sql index 17ba8d604c..eb29ae771f 100644 --- a/script/sql/upgrade/1.1.0_schema/mysql/dinky_ddl.sql +++ b/script/sql/upgrade/1.1.0_schema/mysql/dinky_ddl.sql @@ -20,19 +20,20 @@ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; --- Increase class_name column's length from 50 to 100. -ALTER TABLE dinky_udf_manage CHANGE COLUMN class_name class_name VARCHAR(100) null DEFAULT null COMMENT 'Complete class name'; +CREATE TABLE `_dinky_flyway_schema_history` ( + `installed_rank` int NOT NULL, + `version` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `description` varchar(200) COLLATE utf8mb4_general_ci NOT NULL, + `type` varchar(20) COLLATE utf8mb4_general_ci NOT NULL, + `script` varchar(1000) COLLATE utf8mb4_general_ci NOT NULL, + `checksum` int DEFAULT NULL, + `installed_by` varchar(100) COLLATE utf8mb4_general_ci NOT NULL, + `installed_on` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `execution_time` int NOT NULL, + `success` tinyint(1) NOT NULL, + PRIMARY KEY (`installed_rank`), + KEY `_dinky_flyway_schema_history_s_idx` (`success`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; -ALTER TABLE dinky_task - add COLUMN `first_level_owner` int DEFAULT NULL comment 'primary responsible person id'; - -ALTER TABLE dinky_task - add COLUMN `second_level_owners` varchar(128) DEFAULT NULL comment 'list of secondary responsible persons ids'; - -ALTER TABLE dinky_history CHANGE COLUMN `statement` `statement` mediumtext DEFAULT NULL COMMENT 'statement set'; - -ALTER TABLE dinky_task CHANGE COLUMN `statement` `statement` mediumtext DEFAULT NULL COMMENT 'sql statement'; - -ALTER TABLE dinky_task_version CHANGE COLUMN `statement` `statement` mediumtext DEFAULT NULL COMMENT 'flink sql statement'; SET FOREIGN_KEY_CHECKS = 1; diff --git a/script/sql/upgrade/1.1.0_schema/mysql/dinky_dml.sql b/script/sql/upgrade/1.1.0_schema/mysql/dinky_dml.sql index 51a0ea8547..45c2a7e6b4 100644 --- a/script/sql/upgrade/1.1.0_schema/mysql/dinky_dml.sql +++ b/script/sql/upgrade/1.1.0_schema/mysql/dinky_dml.sql @@ -22,29 +22,9 @@ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; begin; +INSERT INTO `_dinky_flyway_schema_history` (`installed_rank`, `version`, `description`, `type`, `script`, `checksum`, + `installed_by`, `installed_on`, `execution_time`, `success`) +VALUES (1, '1.0.2', '<< Flyway Baseline >>', 'BASELINE', '<< Flyway Baseline >>', NULL, 'root', '2024-05-20 01:32:29', + 0, 1); -update dinky_sys_menu -set `path`='/registration/alert/rule', - `component`='./RegCenter/Alert/AlertRule', - `perms`='registration:alert:rule', - `parent_id`=12 -where `id` = 116; - -update dinky_sys_menu -set `path`='/registration/alert/rule/add', - `perms`='registration:alert:rule:add' -where `id` = 117; - -update dinky_sys_menu -set `path`='/registration/alert/rule/delete', - `perms`='registration:alert:rule:delete' -where `id` = 118; - -update dinky_sys_menu -set `path`='/registration/alert/rule/edit', - `perms`='registration:alert:rule:edit' -where `id` = 119; - -update dinky_task set first_level_owner = creator; - -commit ; \ No newline at end of file +commit ; diff --git a/script/sql/upgrade/1.1.0_schema/postgre/dinky_ddl.sql b/script/sql/upgrade/1.1.0_schema/postgre/dinky_ddl.sql index b3e88716ee..076f06e93a 100644 --- a/script/sql/upgrade/1.1.0_schema/postgre/dinky_ddl.sql +++ b/script/sql/upgrade/1.1.0_schema/postgre/dinky_ddl.sql @@ -17,13 +17,21 @@ * */ --- Increase class_name column's length from 50 to 100. -ALTER TABLE dinky_udf_manage ALTER COLUMN class_name TYPE VARCHAR(100); +CREATE TABLE "public"."_dinky_flyway_schema_history" ( + "installed_rank" int4 NOT NULL, + "version" varchar(50) COLLATE "pg_catalog"."default", + "description" varchar(200) COLLATE "pg_catalog"."default" NOT NULL, + "type" varchar(20) COLLATE "pg_catalog"."default" NOT NULL, + "script" varchar(1000) COLLATE "pg_catalog"."default" NOT NULL, + "checksum" int4, + "installed_by" varchar(100) COLLATE "pg_catalog"."default" NOT NULL, + "installed_on" timestamp(6) NOT NULL DEFAULT now(), + "execution_time" int4 NOT NULL, + "success" bool NOT NULL DEFAULT false, + CONSTRAINT "_dinky_flyway_schema_history_pk" PRIMARY KEY ("installed_rank") +) +; -COMMENT ON COLUMN dinky_udf_manage.class_name IS 'Complete class name'; - -alter table dinky_task add column first_level_owner int; -alter table dinky_task add column second_level_owners varchar(128); - -COMMENT ON COLUMN dinky_task.first_level_owner IS 'primary responsible person id'; -COMMENT ON COLUMN dinky_task.second_level_owners IS 'list of secondary responsible persons ids'; +CREATE INDEX "_dinky_flyway_schema_history_s_idx" ON "public"."_dinky_flyway_schema_history" USING btree ( + "success" "pg_catalog"."bool_ops" ASC NULLS LAST + ); \ No newline at end of file diff --git a/script/sql/upgrade/1.1.0_schema/postgre/dinky_dml.sql b/script/sql/upgrade/1.1.0_schema/postgre/dinky_dml.sql index 494bf2c11d..f58e7101b8 100644 --- a/script/sql/upgrade/1.1.0_schema/postgre/dinky_dml.sql +++ b/script/sql/upgrade/1.1.0_schema/postgre/dinky_dml.sql @@ -18,28 +18,8 @@ */ - -update dinky_sys_menu -set "path"='/registration/alert/rule', - "component"='./RegCenter/Alert/AlertRule', - "perms"='registration:alert:rule', - "parent_id"=12 -where "id" = 116; - -update dinky_sys_menu -set "path"='/registration/alert/rule/add', - "perms"='registration:alert:rule:add' -where "id" = 117; - -update dinky_sys_menu -set "path"='/registration/alert/rule/delete', - "perms"='registration:alert:rule:delete' -where "id" = 118; - -update dinky_sys_menu -set "path"='/registration/alert/rule/edit', - "perms"='registration:alert:rule:edit' -where "id" = 119; - -update dinky_task set "first_level_owner" = "creator"; - +INSERT INTO "public"."_dinky_flyway_schema_history" ("installed_rank", "version", "description", "type", "script", + "checksum", "installed_by", "installed_on", "execution_time", + "success") +VALUES (1, '1.0.2', '<< Flyway Baseline >>', 'BASELINE', '<< Flyway Baseline >>', NULL, 'null', + '2024-05-17 17:25:43.682212', 0, 't'); \ No newline at end of file