diff --git a/dinky-admin/src/main/java/org/dinky/controller/APIController.java b/dinky-admin/src/main/java/org/dinky/controller/APIController.java index b7d8149805..bcc8d223c7 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/APIController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/APIController.java @@ -19,7 +19,9 @@ package org.dinky.controller; +import org.dinky.DinkyVersion; import org.dinky.data.annotations.Log; +import org.dinky.data.dto.APISavePointTaskDTO; import org.dinky.data.dto.TaskDTO; import org.dinky.data.dto.TaskSubmitDto; import org.dinky.data.enums.BusinessType; @@ -65,18 +67,10 @@ public class APIController { private final TaskService taskService; private final JobInstanceService jobInstanceService; - // Interface compatible with DolphinScheduler - @GetMapping("/submitTask") - @ApiOperation("Submit Task") - public Result submitTask(@RequestParam Integer id) throws Exception { - taskService.initTenantByTaskId(id); - JobResult jobResult = - taskService.submitTask(TaskSubmitDto.builder().id(id).build()); - if (jobResult.isSuccess()) { - return Result.succeed(jobResult, Status.EXECUTE_SUCCESS); - } else { - return Result.failed(jobResult, jobResult.getError()); - } + @GetMapping("/version") + @ApiOperation(value = "Query Service Version", notes = "Query Dinky Service Version Number") + public Result getVersionInfo() { + return Result.succeed(DinkyVersion.getVersion(), "Get success"); } @PostMapping("/submitTask") @@ -92,6 +86,15 @@ public Result submitTask(@RequestBody TaskSubmitDto submitDto) throws } } + @PostMapping("/savepointTask") + public Result savepointTask(@RequestBody APISavePointTaskDTO apiSavePointTaskDTO) { + return Result.succeed( + taskService.savepointTaskJob( + taskService.getTaskInfoById(apiSavePointTaskDTO.getTaskId()), + SavePointType.get(apiSavePointTaskDTO.getType())), + Status.EXECUTE_SUCCESS); + } + @GetMapping("/cancel") // @Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER) @ApiOperation("Cancel Flink Job") diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/TaskSubmitDto.java b/dinky-admin/src/main/java/org/dinky/data/dto/TaskSubmitDto.java index b5eea02c44..d5e4798fe8 100644 --- a/dinky-admin/src/main/java/org/dinky/data/dto/TaskSubmitDto.java +++ b/dinky-admin/src/main/java/org/dinky/data/dto/TaskSubmitDto.java @@ -28,10 +28,25 @@ @Data @Builder public class TaskSubmitDto { + public TaskSubmitDto() {} + + public TaskSubmitDto(Integer id, Boolean isOnline, String savePointPath, Map variables) { + this.id = id; + this.isOnline = isOnline; + this.savePointPath = savePointPath; + this.variables = variables; + } @ApiModelProperty(value = "ID", dataType = "Integer", example = "6", notes = "The identifier of the execution") private Integer id; + @ApiModelProperty( + value = "Is online", + dataType = "Boolean", + example = "true", + notes = "Online dinky task, and only one job is allowed to execute") + private Boolean isOnline; + @ApiModelProperty( value = "Save Point Path", dataType = "String", diff --git a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java index 5ea552f8c7..73f2c0c5d7 100644 --- a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java +++ b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java @@ -202,9 +202,7 @@ private void aboutDolphinSchedulerInitOperation(Object v) { project = projectClient.createDinkyProject(); } } catch (Exception e) { - log.error("Error in DolphinScheduler: ", e); - log.error( - "get or create DolphinScheduler project failed, please check the config of DolphinScheduler!"); + log.warn("Get or create DolphinScheduler project failed, please check the config of DolphinScheduler!"); } } } diff --git a/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java b/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java index 9adefd4340..c90391361f 100644 --- a/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java +++ b/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java @@ -74,28 +74,42 @@ import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.URLUtil; import cn.hutool.json.JSONUtil; +import lombok.Data; import lombok.extern.slf4j.Slf4j; @Slf4j +@Data public class PaimonUtil { - private static final Cache, Schema> SCHEMA_CACHE = CacheUtil.newLRUCache(100); - private static final CatalogContext CONTEXT = - CatalogContext.create(new Path(URLUtil.toURI(URLUtil.url(PathConstant.TMP_PATH + "paimon")))); - private static final Catalog CATALOG = CatalogFactory.createCatalog(CONTEXT); - static { + private static PaimonUtil instance; + + private final Cache, Schema> schemaCache; + private final CatalogContext context; + private final Catalog catalog; + + public PaimonUtil() { + schemaCache = CacheUtil.newLRUCache(100); + context = CatalogContext.create(new Path(URLUtil.toURI(URLUtil.url(PathConstant.TMP_PATH + "paimon")))); + catalog = CatalogFactory.createCatalog(context); try { - CATALOG.createDatabase(DINKY_DB, true); + catalog.createDatabase(DINKY_DB, true); } catch (Catalog.DatabaseAlreadyExistException e) { throw new RuntimeException(e); } } + public static synchronized PaimonUtil getInstance() { + if (instance == null) { + instance = new PaimonUtil(); + } + return instance; + } + public static void dropTable(String table) { Identifier identifier = Identifier.create(DINKY_DB, table); - if (CATALOG.tableExists(identifier)) { + if (getInstance().getCatalog().tableExists(identifier)) { try { - CATALOG.dropTable(identifier, true); + getInstance().getCatalog().dropTable(identifier, true); } catch (Exception e) { throw new RuntimeException(e); } @@ -122,6 +136,8 @@ public static void write(String table, List dataList, Class clazz) { String fieldName = StrUtil.toCamelCase(dataField.name()); Object fieldValue = ReflectUtil.getFieldValue(t, fieldName); try { + // TODO BinaryWriter.write已被废弃,后续可以考虑改成这种方式 + // BinaryWriter.createValueSetter(type).setValue(writer, i, fieldValue); if (type.getTypeRoot() == DataTypeRoot.VARCHAR) { BinaryWriter.write( writer, i, BinaryString.fromString(JSONUtil.toJsonStr(fieldValue)), type, null); @@ -169,10 +185,10 @@ public static List batchReadTable( ReadBuilder readBuilder; try { - if (!CATALOG.tableExists(identifier)) { + if (!getInstance().getCatalog().tableExists(identifier)) { return dataList; } - readBuilder = CATALOG.getTable(identifier).newReadBuilder(); + readBuilder = getInstance().getCatalog().getTable(identifier).newReadBuilder(); if (filter != null) { List predicates = filter.apply(builder); readBuilder.withFilter(predicates); @@ -218,18 +234,18 @@ public static List batchReadTable( public static Table createOrGetTable(String tableName, Class clazz) { try { Identifier identifier = Identifier.create(DINKY_DB, tableName); - if (CATALOG.tableExists(identifier)) { - return CATALOG.getTable(identifier); + if (getInstance().getCatalog().tableExists(identifier)) { + return getInstance().getCatalog().getTable(identifier); } - CATALOG.createTable(identifier, getSchemaByClass(clazz), false); - return CATALOG.getTable(identifier); + getInstance().getCatalog().createTable(identifier, getSchemaByClass(clazz), false); + return getInstance().getCatalog().getTable(identifier); } catch (Exception e) { throw new RuntimeException(e); } } public static Schema getSchemaByClass(Class clazz) { - return SCHEMA_CACHE.get(clazz, () -> { + return getInstance().getSchemaCache().get(clazz, () -> { List primaryKeys = new ArrayList<>(); List partitionKeys = new ArrayList<>(); Schema.Builder builder = Schema.newBuilder(); diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java index def1e49e15..d9f0c40d19 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java @@ -42,6 +42,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; @@ -105,6 +106,9 @@ public void init() { private void initConfig() { final ClusterConfig clusterConfig = config.getClusterConfig(); configuration = GlobalConfiguration.loadConfiguration(clusterConfig.getFlinkConfigPath()); + if (!configuration.contains(RestOptions.PORT)) { + configuration.set(RestOptions.PORT, RestOptions.PORT.defaultValue()); + } configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first"); final FlinkConfig flinkConfig = config.getFlinkConfig(); diff --git a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java index 19ff05ad1f..469b074b75 100644 --- a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java +++ b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java @@ -600,8 +600,9 @@ public JdbcSelectResult query(String sql, Integer limit) { } catch (Exception e) { result.error(LogUtil.getError(e)); log.error("Query failed", e); + } finally { + close(preparedStatement, results); } - close(preparedStatement, results); result.setRowData(datas); return result; } diff --git a/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java b/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java index 6ab5b5b152..adc1d963a3 100644 --- a/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java +++ b/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java @@ -108,7 +108,7 @@ private String genTable(Table table) { final String dv = column.getDefaultValue(); String defaultValue = Asserts.isNotNull(dv) - ? String.format(" DEFAULT %s", dv.isEmpty() ? "\"\"" : dv) + ? String.format(" DEFAULT '%s'", dv.isEmpty() ? "''" : dv) : String.format("%s NULL ", !column.isNullable() ? " NOT " : ""); return String.format( diff --git a/dinky-scheduler/src/main/java/org/dinky/scheduler/model/DinkyTaskParams.java b/dinky-scheduler/src/main/java/org/dinky/scheduler/model/DinkyTaskParams.java index 441037f666..3469bb1894 100644 --- a/dinky-scheduler/src/main/java/org/dinky/scheduler/model/DinkyTaskParams.java +++ b/dinky-scheduler/src/main/java/org/dinky/scheduler/model/DinkyTaskParams.java @@ -19,6 +19,7 @@ package org.dinky.scheduler.model; +import java.util.ArrayList; import java.util.List; import io.swagger.annotations.ApiModelProperty; @@ -28,7 +29,7 @@ public class DinkyTaskParams { @ApiModelProperty(value = "自定义参数") - private List localParams; + private List localParams = new ArrayList<>(); @ApiModelProperty(value = "dinky地址") private String address; diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/function.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/function.tsx index 82600e54cd..636d0121a7 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/function.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/function.tsx @@ -46,7 +46,6 @@ export const isOnline = (data: TaskDataType | undefined) => { export const isCanPushDolphin = (data: TaskDataType | undefined) => { return data ? JOB_LIFE_CYCLE.PUBLISH === data.step && - !isSql(data?.dialect) && data?.dialect?.toLowerCase() !== DIALECT.FLINKSQLENV && data?.dialect?.toLowerCase() !== DIALECT.SCALA && data?.dialect?.toLowerCase() !== DIALECT.JAVA && diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx index 22627ee70f..c689f253f4 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx @@ -367,9 +367,7 @@ const HeaderContainer = (props: connect) => { icon: isOnline(currentData) ? : , title: isOnline(currentData) ? l('button.offline') : l('button.publish'), isShow: - (currentTab?.type == TabsPageType.project && - currentTab?.subType?.toLowerCase() === DIALECT.FLINK_SQL) || - currentTab?.subType?.toLowerCase() === DIALECT.FLINKJAR, + (currentTab?.type == TabsPageType.project), click: () => handleChangeJobLife() }, { diff --git a/pom.xml b/pom.xml index 104ad43fd6..eb86626455 100644 --- a/pom.xml +++ b/pom.xml @@ -607,7 +607,7 @@ org.dinky - dinky-cdc-doris + dinky-cdc-plus ${project.version}