Skip to content

Commit

Permalink
Merge branch 'dev' into refactor_model_to_typehandler
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 authored Oct 16, 2023
2 parents fec5c57 + f8fb133 commit e8a7623
Show file tree
Hide file tree
Showing 34 changed files with 777 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.dinky.data.model.JobInstance;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
import org.dinky.process.exception.ExcuteException;
Expand Down Expand Up @@ -89,8 +90,7 @@ public Result<Boolean> cancel(@RequestParam Integer id) {
@GetMapping(value = "/restartTask")
@ApiOperation("Restart Task")
// @Log(title = "Restart Task", businessType = BusinessType.REMOTE_OPERATION)
public Result<JobResult> restartTask(@RequestParam Integer id, @RequestParam String savePointPath)
throws ExcuteException {
public Result<JobResult> restartTask(@RequestParam Integer id, String savePointPath) throws ExcuteException {
return Result.succeed(taskService.restartTask(id, savePointPath));
}

Expand All @@ -99,7 +99,8 @@ public Result<JobResult> restartTask(@RequestParam Integer id, @RequestParam Str
@ApiOperation("Savepoint Trigger")
public Result<SavePointResult> savepoint(@RequestParam Integer taskId, @RequestParam String savePointType) {
return Result.succeed(
taskService.savepointTaskJob(taskService.getTaskInfoById(taskId), savePointType),
taskService.savepointTaskJob(
taskService.getTaskInfoById(taskId), SavePointType.valueOf(savePointType.toUpperCase())),
Status.EXECUTE_SUCCESS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.Status;
import org.dinky.data.model.Catalog;
import org.dinky.data.model.FlinkColumn;
import org.dinky.data.model.Column;
import org.dinky.data.model.Schema;
import org.dinky.data.result.IResult;
import org.dinky.data.result.Result;
Expand Down Expand Up @@ -176,7 +176,7 @@ public Result<Schema> getMSSchemaInfo(@RequestBody StudioMetaStoreDTO studioMeta
paramType = "query"),
@ApiImplicitParam(name = "table", value = "table", required = true, dataType = "String", paramType = "query")
})
public Result<List<FlinkColumn>> getMSFlinkColumns(
public Result<List<Column>> getMSFlinkColumns(
@RequestParam Integer envId,
@RequestParam String catalog,
@RequestParam String database,
Expand Down
14 changes: 4 additions & 10 deletions dinky-admin/src/main/java/org/dinky/controller/TaskController.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
import org.dinky.process.exception.ExcuteException;
Expand Down Expand Up @@ -90,8 +91,7 @@ public Result<Boolean> cancel(@RequestParam Integer id) {
@GetMapping(value = "/restartTask")
@ApiOperation("Restart Task")
@Log(title = "Restart Task", businessType = BusinessType.REMOTE_OPERATION)
public Result<JobResult> restartTask(@RequestParam Integer id, @RequestParam String savePointPath)
throws ExcuteException {
public Result<JobResult> restartTask(@RequestParam Integer id, String savePointPath) throws ExcuteException {
return Result.succeed(taskService.restartTask(id, savePointPath));
}

Expand All @@ -100,7 +100,8 @@ public Result<JobResult> restartTask(@RequestParam Integer id, @RequestParam Str
@ApiOperation("Savepoint Trigger")
public Result<SavePointResult> savepoint(@RequestParam Integer taskId, @RequestParam String savePointType) {
return Result.succeed(
taskService.savepointTaskJob(taskService.getTaskInfoById(taskId), savePointType),
taskService.savepointTaskJob(
taskService.getTaskInfoById(taskId), SavePointType.valueOf(savePointType.toUpperCase())),
Status.EXECUTE_SUCCESS);
}

Expand All @@ -111,13 +112,6 @@ public Result<Boolean> onLineTask(@RequestParam Integer taskId) {
return Result.succeed(taskService.changeTaskLifeRecyle(taskId, JobLifeCycle.ONLINE));
}

@GetMapping("/offLineTask")
@Log(title = "offLineTask", businessType = BusinessType.TRIGGER)
@ApiOperation("offLineTask")
public Result<Boolean> offLineTask(@RequestParam Integer taskId) {
return Result.succeed(taskService.changeTaskLifeRecyle(taskId, JobLifeCycle.DEVELOP));
}

@PostMapping("/explainSql")
@ApiOperation("Explain Sql")
public Result<List<SqlExplainResult>> explainSql(@RequestBody TaskDTO taskDTO) throws NotSupportExplainExcepition {
Expand Down
52 changes: 2 additions & 50 deletions dinky-admin/src/main/java/org/dinky/service/StudioService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.dinky.data.dto.StudioDDLDTO;
import org.dinky.data.dto.StudioMetaStoreDTO;
import org.dinky.data.model.Catalog;
import org.dinky.data.model.FlinkColumn;
import org.dinky.data.model.Column;
import org.dinky.data.model.Schema;
import org.dinky.data.result.IResult;
import org.dinky.data.result.SelectResult;
Expand All @@ -41,67 +41,19 @@
*/
public interface StudioService {

/**
* Execute a DDL statement and return the result.
*
* @param studioDDLDTO A {@link StudioDDLDTO} object representing the DDL statement to execute.
* @return An {@link IResult} object representing the result of the DDL statement execution.
*/
IResult executeDDL(StudioDDLDTO studioDDLDTO);

/**
* Get common SQL data based on the specified task ID.
*
* @param taskId The ID of the task to get the common SQL data for.
* @return A {@link JdbcSelectResult} object representing the common SQL data for the specified task ID.
*/
JdbcSelectResult getCommonSqlData(Integer taskId);

/**
* Get job data based on the specified job ID.
*
* @param jobId The ID of the job to get the job data for.
* @return A {@link SelectResult} object representing the job data for the specified job ID.
*/
SelectResult getJobData(String jobId);

/**
* Get the lineage information for a specified studio CAD.
*
* @param studioCADTO A {@link StudioCADTO} object representing the studio CAD to get the lineage information for.
* @return A {@link LineageResult} object representing the lineage information for the specified studio CAD.
*/
LineageResult getLineage(StudioCADTO studioCADTO);

/**
* Get a list of Flink jobs based on the specified cluster ID.
*
* @param clusterId The ID of the cluster to get the Flink jobs for.
* @return A list of {@link JsonNode} objects representing the Flink jobs for the specified cluster ID.
*/
List<JsonNode> listFlinkJobs(Integer clusterId);

/**
* Get MS catalogs based on the specified studio meta store DTO.
*
* @param studioMetaStoreDTO A {@link StudioMetaStoreDTO} object representing the studio meta store DTO to get the MS catalogs for.
* @return A list of {@link Catalog} objects representing the MS catalogs for the specified studio meta store DTO.
*/
List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO);

/**
* Get the schema information for a specified MS catalog.
*
* @param studioMetaStoreDTO A {@link StudioMetaStoreDTO} object representing the MS catalog to get the schema information for.
* @return A {@link Schema} object representing the schema information for the specified MS catalog.
*/
Schema getMSSchemaInfo(StudioMetaStoreDTO studioMetaStoreDTO);

/**
* Get a list of Flink columns based on the specified MS catalog.
*
* @param studioMetaStoreDTO A {@link StudioMetaStoreDTO} object representing the MS catalog to get the Flink columns for.
* @return A list of {@link FlinkColumn} objects representing the Flink columns for the specified MS catalog.
*/
List<FlinkColumn> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO);
List<Column> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO);
}
3 changes: 2 additions & 1 deletion dinky-admin/src/main/java/org/dinky/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.dinky.data.model.Task;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
import org.dinky.mybatis.service.ISuperService;
Expand Down Expand Up @@ -85,7 +86,7 @@ public interface TaskService extends ISuperService<Task> {
* @param savePointType The type of savepoint to create.
* @return A {@link SavePointResult} object representing the savepoint result.
*/
SavePointResult savepointTaskJob(TaskDTO task, String savePointType);
SavePointResult savepointTaskJob(TaskDTO task, SavePointType savePointType);

/**
* Explain the given task and return a list of SQL explain results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
import org.dinky.data.dto.StudioMetaStoreDTO;
import org.dinky.data.model.Catalog;
import org.dinky.data.model.ClusterInstance;
import org.dinky.data.model.Column;
import org.dinky.data.model.DataBase;
import org.dinky.data.model.FlinkColumn;
import org.dinky.data.model.Schema;
import org.dinky.data.model.Table;
import org.dinky.data.result.DDLResult;
import org.dinky.data.result.IResult;
import org.dinky.data.result.ResultPool;
import org.dinky.data.result.SelectResult;
import org.dinky.executor.CustomTableEnvironment;
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.job.JobConfig;
Expand All @@ -49,6 +50,7 @@
import org.dinky.service.StudioService;
import org.dinky.service.TaskService;
import org.dinky.sql.FlinkQuery;
import org.dinky.utils.FlinkTableMetadataUtil;
import org.dinky.utils.RunTimeUtil;

import java.util.ArrayList;
Expand All @@ -60,10 +62,14 @@
import com.fasterxml.jackson.databind.JsonNode;

import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.cache.Cache;
import cn.hutool.cache.CacheUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/** StudioServiceImpl */
/**
* StudioServiceImpl
*/
@Service
@RequiredArgsConstructor
@Slf4j
Expand All @@ -72,6 +78,7 @@ public class StudioServiceImpl implements StudioService {
private final ClusterInstanceService clusterInstanceService;
private final DataBaseService dataBaseService;
private final TaskService taskService;
private final Cache<String, JobManager> jobManagerCache = CacheUtil.newTimedCache(1000 * 60 * 2);

private IResult executeMSFlinkSql(StudioMetaStoreDTO studioMetaStoreDTO) {
String envSql = taskService.buildEnvSql(studioMetaStoreDTO);
Expand Down Expand Up @@ -153,116 +160,65 @@ public List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO) {
catalogs.add(defaultCatalog);
}
} else {
studioMetaStoreDTO.setStatement(FlinkQuery.showCatalogs());
IResult result = executeMSFlinkSql(studioMetaStoreDTO);

if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
ddlResult.getColumns().stream().findFirst().ifPresent(key -> {
for (Map<String, Object> item : ddlResult.getRowData()) {
catalogs.add(Catalog.build(item.get(key).toString()));
}
});

for (Catalog catalog : catalogs) {
String statement = FlinkQuery.useCatalog(catalog.getName())
+ FlinkQuery.separator()
+ FlinkQuery.showDatabases();
studioMetaStoreDTO.setStatement(statement);
IResult tableResult = executeMSFlinkSql(studioMetaStoreDTO);
DDLResult tableDDLResult = (DDLResult) tableResult;
tableDDLResult.getColumns().stream().findFirst().ifPresent(key -> {
List<Map<String, Object>> rowData = tableDDLResult.getRowData();
List<Schema> schemas = new ArrayList<>();
for (Map<String, Object> item : rowData) {
schemas.add(Schema.build(item.get(key).toString()));
}
catalog.setSchemas(schemas);
});
}
}
String envSql = taskService.buildEnvSql(studioMetaStoreDTO);
JobManager jobManager = getJobManager(studioMetaStoreDTO, envSql);
CustomTableEnvironment customTableEnvironment =
jobManager.getExecutor().getCustomTableEnvironment();
catalogs.addAll(FlinkTableMetadataUtil.getCatalog(customTableEnvironment));
}
return catalogs;
}

@Override
public Schema getMSSchemaInfo(StudioMetaStoreDTO studioMetaStoreDTO) {
Schema schema = Schema.build(studioMetaStoreDTO.getDatabase());
String database = studioMetaStoreDTO.getDatabase();
Schema schema = Schema.build(database);
List<Table> tables = new ArrayList<>();
if (Dialect.isCommonSql(studioMetaStoreDTO.getDialect())) {
DataBase dataBase = dataBaseService.getById(studioMetaStoreDTO.getDatabaseId());
if (Asserts.isNotNull(dataBase)) {
Driver driver = Driver.build(dataBase.getDriverConfig());
tables.addAll(driver.listTables(studioMetaStoreDTO.getDatabase()));
tables.addAll(driver.listTables(database));
}
} else {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog())
+ FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase())
+ FlinkQuery.separator();

// show tables
String tableStatement = baseStatement + FlinkQuery.showTables();
studioMetaStoreDTO.setStatement(tableStatement);
IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
ddlResult.getColumns().stream().findFirst().ifPresent(key -> {
List<Map<String, Object>> rowData = ddlResult.getRowData();
for (Map<String, Object> item : rowData) {
Table table = Table.build(item.get(key).toString(), studioMetaStoreDTO.getDatabase());
table.setCatalog(studioMetaStoreDTO.getCatalog());
tables.add(table);
}
});
}
// show views
schema.setViews(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showViews()));
// show functions
schema.setFunctions(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showFunctions()));
// show user functions
schema.setUserFunctions(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showUserFunctions()));
// show modules
schema.setModules(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showModules()));
String envSql = taskService.buildEnvSql(studioMetaStoreDTO);
JobManager jobManager = getJobManager(studioMetaStoreDTO, envSql);
CustomTableEnvironment customTableEnvironment =
jobManager.getExecutor().getCustomTableEnvironment();
FlinkTableMetadataUtil.setSchemaInfo(
customTableEnvironment, studioMetaStoreDTO.getCatalog(), database, schema, tables);
}
schema.setTables(tables);
return schema;
}

@Override
public List<FlinkColumn> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO) {
List<FlinkColumn> columns = new ArrayList<>();
public List<Column> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO) {
List<Column> columns = new ArrayList<>();
if (!Dialect.isCommonSql(studioMetaStoreDTO.getDialect())) {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog())
+ FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase())
+ FlinkQuery.separator();

// desc tables
String tableStatement = baseStatement + FlinkQuery.descTable(studioMetaStoreDTO.getTable());
studioMetaStoreDTO.setStatement(tableStatement);
IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
List<Map<String, Object>> rowData = ddlResult.getRowData();
int i = 1;
for (Map<String, Object> item : rowData) {
FlinkColumn column = FlinkColumn.build(
i,
item.get(FlinkQuery.columnName()).toString(),
item.get(FlinkQuery.columnType()).toString(),
item.get(FlinkQuery.columnKey()).toString(),
item.get(FlinkQuery.columnNull()).toString(),
item.get(FlinkQuery.columnExtras()).toString(),
item.get(FlinkQuery.columnWatermark()).toString());
columns.add(column);
i++;
}
}
String catalogName = studioMetaStoreDTO.getCatalog();
String database = studioMetaStoreDTO.getDatabase();
String tableName = studioMetaStoreDTO.getTable();
String envSql = taskService.buildEnvSql(studioMetaStoreDTO);
JobManager jobManager = getJobManager(studioMetaStoreDTO, envSql);
CustomTableEnvironment customTableEnvironment =
jobManager.getExecutor().getCustomTableEnvironment();
columns.addAll(
FlinkTableMetadataUtil.getColumnList(customTableEnvironment, catalogName, database, tableName));
}
return columns;
}

private JobManager getJobManager(StudioMetaStoreDTO studioMetaStoreDTO, String envSql) {
JobManager jobManager = jobManagerCache.get(envSql, () -> {
JobConfig config = studioMetaStoreDTO.getJobConfig();
JobManager jobManagerTmp = JobManager.build(config);
jobManagerTmp.executeDDL(envSql);
return jobManagerTmp;
});
return jobManager;
}

private List<String> showInfo(StudioMetaStoreDTO studioMetaStoreDTO, String baseStatement, String statement) {
List<String> infos = new ArrayList<>();
studioMetaStoreDTO.setStatement(baseStatement + statement);
Expand Down
Loading

0 comments on commit e8a7623

Please sign in to comment.