Skip to content

Commit

Permalink
Merge branch 'DataLinkDC:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 authored Jan 13, 2024
2 parents 128d532 + cf37b0b commit ee47bc5
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 38 deletions.
27 changes: 15 additions & 12 deletions dinky-admin/src/main/java/org/dinky/controller/APIController.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.dinky.controller;

import org.dinky.DinkyVersion;
import org.dinky.data.annotations.Log;
import org.dinky.data.dto.APISavePointTaskDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskSubmitDto;
import org.dinky.data.enums.BusinessType;
Expand Down Expand Up @@ -65,18 +67,10 @@ public class APIController {
private final TaskService taskService;
private final JobInstanceService jobInstanceService;

// Interface compatible with DolphinScheduler
@GetMapping("/submitTask")
@ApiOperation("Submit Task")
public Result<JobResult> submitTask(@RequestParam Integer id) throws Exception {
taskService.initTenantByTaskId(id);
JobResult jobResult =
taskService.submitTask(TaskSubmitDto.builder().id(id).build());
if (jobResult.isSuccess()) {
return Result.succeed(jobResult, Status.EXECUTE_SUCCESS);
} else {
return Result.failed(jobResult, jobResult.getError());
}
@GetMapping("/version")
@ApiOperation(value = "Query Service Version", notes = "Query Dinky Service Version Number")
public Result<String> getVersionInfo() {
return Result.succeed(DinkyVersion.getVersion(), "Get success");
}

@PostMapping("/submitTask")
Expand All @@ -92,6 +86,15 @@ public Result<JobResult> submitTask(@RequestBody TaskSubmitDto submitDto) throws
}
}

@PostMapping("/savepointTask")
public Result savepointTask(@RequestBody APISavePointTaskDTO apiSavePointTaskDTO) {
return Result.succeed(
taskService.savepointTaskJob(
taskService.getTaskInfoById(apiSavePointTaskDTO.getTaskId()),
SavePointType.get(apiSavePointTaskDTO.getType())),
Status.EXECUTE_SUCCESS);
}

@GetMapping("/cancel")
// @Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER)
@ApiOperation("Cancel Flink Job")
Expand Down
15 changes: 15 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskSubmitDto.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,25 @@
@Data
@Builder
public class TaskSubmitDto {
public TaskSubmitDto() {}

public TaskSubmitDto(Integer id, Boolean isOnline, String savePointPath, Map<String, String> variables) {
this.id = id;
this.isOnline = isOnline;
this.savePointPath = savePointPath;
this.variables = variables;
}

@ApiModelProperty(value = "ID", dataType = "Integer", example = "6", notes = "The identifier of the execution")
private Integer id;

@ApiModelProperty(
value = "Is online",
dataType = "Boolean",
example = "true",
notes = "Online dinky task, and only one job is allowed to execute")
private Boolean isOnline;

@ApiModelProperty(
value = "Save Point Path",
dataType = "String",
Expand Down
4 changes: 1 addition & 3 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ private void aboutDolphinSchedulerInitOperation(Object v) {
project = projectClient.createDinkyProject();
}
} catch (Exception e) {
log.error("Error in DolphinScheduler: ", e);
log.error(
"get or create DolphinScheduler project failed, please check the config of DolphinScheduler!");
log.warn("Get or create DolphinScheduler project failed, please check the config of DolphinScheduler!");
}
}
}
Expand Down
46 changes: 31 additions & 15 deletions dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,42 @@
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.json.JSONUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Data
public class PaimonUtil {
private static final Cache<Class<?>, Schema> SCHEMA_CACHE = CacheUtil.newLRUCache(100);
private static final CatalogContext CONTEXT =
CatalogContext.create(new Path(URLUtil.toURI(URLUtil.url(PathConstant.TMP_PATH + "paimon"))));
private static final Catalog CATALOG = CatalogFactory.createCatalog(CONTEXT);

static {
private static PaimonUtil instance;

private final Cache<Class<?>, Schema> schemaCache;
private final CatalogContext context;
private final Catalog catalog;

public PaimonUtil() {
schemaCache = CacheUtil.newLRUCache(100);
context = CatalogContext.create(new Path(URLUtil.toURI(URLUtil.url(PathConstant.TMP_PATH + "paimon"))));
catalog = CatalogFactory.createCatalog(context);
try {
CATALOG.createDatabase(DINKY_DB, true);
catalog.createDatabase(DINKY_DB, true);
} catch (Catalog.DatabaseAlreadyExistException e) {
throw new RuntimeException(e);
}
}

public static synchronized PaimonUtil getInstance() {
if (instance == null) {
instance = new PaimonUtil();
}
return instance;
}

public static void dropTable(String table) {
Identifier identifier = Identifier.create(DINKY_DB, table);
if (CATALOG.tableExists(identifier)) {
if (getInstance().getCatalog().tableExists(identifier)) {
try {
CATALOG.dropTable(identifier, true);
getInstance().getCatalog().dropTable(identifier, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -122,6 +136,8 @@ public static <T> void write(String table, List<T> dataList, Class<?> clazz) {
String fieldName = StrUtil.toCamelCase(dataField.name());
Object fieldValue = ReflectUtil.getFieldValue(t, fieldName);
try {
// TODO BinaryWriter.write已被废弃,后续可以考虑改成这种方式
// BinaryWriter.createValueSetter(type).setValue(writer, i, fieldValue);
if (type.getTypeRoot() == DataTypeRoot.VARCHAR) {
BinaryWriter.write(
writer, i, BinaryString.fromString(JSONUtil.toJsonStr(fieldValue)), type, null);
Expand Down Expand Up @@ -169,10 +185,10 @@ public static <T> List<T> batchReadTable(

ReadBuilder readBuilder;
try {
if (!CATALOG.tableExists(identifier)) {
if (!getInstance().getCatalog().tableExists(identifier)) {
return dataList;
}
readBuilder = CATALOG.getTable(identifier).newReadBuilder();
readBuilder = getInstance().getCatalog().getTable(identifier).newReadBuilder();
if (filter != null) {
List<Predicate> predicates = filter.apply(builder);
readBuilder.withFilter(predicates);
Expand Down Expand Up @@ -218,18 +234,18 @@ public static <T> List<T> batchReadTable(
public static Table createOrGetTable(String tableName, Class<?> clazz) {
try {
Identifier identifier = Identifier.create(DINKY_DB, tableName);
if (CATALOG.tableExists(identifier)) {
return CATALOG.getTable(identifier);
if (getInstance().getCatalog().tableExists(identifier)) {
return getInstance().getCatalog().getTable(identifier);
}
CATALOG.createTable(identifier, getSchemaByClass(clazz), false);
return CATALOG.getTable(identifier);
getInstance().getCatalog().createTable(identifier, getSchemaByClass(clazz), false);
return getInstance().getCatalog().getTable(identifier);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static Schema getSchemaByClass(Class<?> clazz) {
return SCHEMA_CACHE.get(clazz, () -> {
return getInstance().getSchemaCache().get(clazz, () -> {
List<String> primaryKeys = new ArrayList<>();
List<String> partitionKeys = new ArrayList<>();
Schema.Builder builder = Schema.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
Expand Down Expand Up @@ -105,6 +106,9 @@ public void init() {
private void initConfig() {
final ClusterConfig clusterConfig = config.getClusterConfig();
configuration = GlobalConfiguration.loadConfiguration(clusterConfig.getFlinkConfigPath());
if (!configuration.contains(RestOptions.PORT)) {
configuration.set(RestOptions.PORT, RestOptions.PORT.defaultValue());
}
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

final FlinkConfig flinkConfig = config.getFlinkConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,9 @@ public JdbcSelectResult query(String sql, Integer limit) {
} catch (Exception e) {
result.error(LogUtil.getError(e));
log.error("Query failed", e);
} finally {
close(preparedStatement, results);
}
close(preparedStatement, results);
result.setRowData(datas);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private String genTable(Table table) {

final String dv = column.getDefaultValue();
String defaultValue = Asserts.isNotNull(dv)
? String.format(" DEFAULT %s", dv.isEmpty() ? "\"\"" : dv)
? String.format(" DEFAULT '%s'", dv.isEmpty() ? "''" : dv)
: String.format("%s NULL ", !column.isNullable() ? " NOT " : "");

return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.scheduler.model;

import java.util.ArrayList;
import java.util.List;

import io.swagger.annotations.ApiModelProperty;
Expand All @@ -28,7 +29,7 @@
public class DinkyTaskParams {

@ApiModelProperty(value = "自定义参数")
private List<Property> localParams;
private List<Property> localParams = new ArrayList<>();

@ApiModelProperty(value = "dinky地址")
private String address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ export const isOnline = (data: TaskDataType | undefined) => {
export const isCanPushDolphin = (data: TaskDataType | undefined) => {
return data
? JOB_LIFE_CYCLE.PUBLISH === data.step &&
!isSql(data?.dialect) &&
data?.dialect?.toLowerCase() !== DIALECT.FLINKSQLENV &&
data?.dialect?.toLowerCase() !== DIALECT.SCALA &&
data?.dialect?.toLowerCase() !== DIALECT.JAVA &&
Expand Down
4 changes: 1 addition & 3 deletions dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,7 @@ const HeaderContainer = (props: connect) => {
icon: isOnline(currentData) ? <MergeCellsOutlined /> : <FundOutlined />,
title: isOnline(currentData) ? l('button.offline') : l('button.publish'),
isShow:
(currentTab?.type == TabsPageType.project &&
currentTab?.subType?.toLowerCase() === DIALECT.FLINK_SQL) ||
currentTab?.subType?.toLowerCase() === DIALECT.FLINKJAR,
(currentTab?.type == TabsPageType.project),
click: () => handleChangeJobLife()
},
{
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-cdc-doris</artifactId>
<artifactId>dinky-cdc-plus</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down

0 comments on commit ee47bc5

Please sign in to comment.