Skip to content

Commit

Permalink
[Feature] support pgsql catalog (DataLinkDC#4000)
Browse files Browse the repository at this point in the history
Co-authored-by: Zzm0809 <[email protected]>
Co-authored-by: GH Action - Upstream Sync <[email protected]>
  • Loading branch information
3 people committed Dec 7, 2024
1 parent 7c0e38e commit 9254e4c
Show file tree
Hide file tree
Showing 73 changed files with 13,313 additions and 155 deletions.
44 changes: 0 additions & 44 deletions .github/workflows/code-review.yml

This file was deleted.

2 changes: 0 additions & 2 deletions dinky-admin/src/main/java/org/dinky/mapper/TaskMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
@Mapper
public interface TaskMapper extends SuperMapper<Task> {

Integer queryAllSizeByName(String name);

List<Task> queryOnLineTaskByDoneStatus(
@Param("parentIds") List<Integer> parentIds,
@Param("stepIds") List<Integer> stepIds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ private String getFileText(File sourceFile) {
}
}
} catch (Exception e) {
log.error("read file error, {} ", e);
log.error("read file error, {} ", e.getMessage(), e);
}
return sb.toString();
}
Expand Down Expand Up @@ -555,7 +555,6 @@ public Result<Void> deleteCatalogueById(Integer catalogueId) {

if (CollUtil.isNotEmpty(metricListByTaskId)) {
metricListByTaskId.forEach(metrics -> monitorService.removeById(metrics.getId()));
// todo: 需要删除 paimon 中的监控数据, 但是 paimon 中没有提供删除接口
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.dto.TaskSubmitDto;
import org.dinky.data.enums.CatalogTypeMappingEnum;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.JobStatus;
Expand Down Expand Up @@ -87,11 +88,9 @@
import org.dinky.service.SavepointsService;
import org.dinky.service.TaskService;
import org.dinky.service.TaskVersionService;
import org.dinky.service.UDFService;
import org.dinky.service.UDFTemplateService;
import org.dinky.service.UserService;
import org.dinky.service.catalogue.CatalogueService;
import org.dinky.service.resource.ResourcesService;
import org.dinky.service.task.BaseTask;
import org.dinky.utils.FragmentVariableUtils;
import org.dinky.utils.JsonUtils;
Expand Down Expand Up @@ -119,13 +118,15 @@

import javax.annotation.Resource;

import org.jetbrains.annotations.NotNull;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;

import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -164,8 +165,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private final DataSourceProperties dsProperties;
private final UserService userService;
private final ApplicationContext applicationContext;
private final UDFService udfService;
private final ResourcesService resourcesService;
private final DruidDataSource druidProperties;

@Resource
@Lazy
Expand Down Expand Up @@ -685,19 +685,7 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) {

Task defaultFlinkSQLEnvTask = getTaskByNameAndTenantId(name, tenantId);

String sql = String.format(
"create catalog my_catalog with(\n "
+ "'type' = 'dinky_mysql',\n"
+ " 'username' = "
+ "'%s',\n "
+ "'password' = '%s',\n"
+ " 'url' = '%s'\n"
+ ")%suse catalog my_catalog%s",
dsProperties.getUsername(),
dsProperties.getPassword(),
dsProperties.getUrl(),
FlinkSQLConstant.SEPARATOR,
FlinkSQLConstant.SEPARATOR);
String sql = getStatementByCatalogType(CatalogTypeMappingEnum.ofDbType(druidProperties.getDbType()));

if (null != defaultFlinkSQLEnvTask) {
defaultFlinkSQLEnvTask.setStatement(sql);
Expand All @@ -720,6 +708,31 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) {
return defaultFlinkSQLEnvTask;
}

private @NotNull String getStatementByCatalogType(CatalogTypeMappingEnum catalogTypeMappingEnum) {
String sql = String.format(
"create catalog my_catalog_%s with(\n "
+ "'type' = '%s',\n"
+ " 'username' = "
+ "'%s',\n "
+ "'password' = '%s',\n"
+ " 'url' = '%s'\n"
+ ")%suse catalog my_catalog_%s %s",
catalogTypeMappingEnum.getCatalogTypeName(),
catalogTypeMappingEnum.getCatalogTypeName(),
dsProperties.getUsername(),
dsProperties.getPassword(),
dsProperties.getUrl(),
FlinkSQLConstant.SEPARATOR,
catalogTypeMappingEnum.getCatalogTypeName(),
FlinkSQLConstant.SEPARATOR);
log.debug(
"Init default flink sql env sql:{}, yours dbType is:{}, catalogName is:{}",
sql,
catalogTypeMappingEnum.getDbType(),
catalogTypeMappingEnum.getCatalogTypeName());
return sql;
}

@Override
public Task getTaskByNameAndTenantId(String name, Integer tenantId) {
return baseMapper.getTaskByNameAndTenantId(name, tenantId);
Expand Down Expand Up @@ -778,7 +791,8 @@ public String getTaskAPIAddress() {

@Override
public Integer queryAllSizeByName(String name) {
return baseMapper.queryAllSizeByName(name);
Long value = baseMapper.selectCount(new LambdaQueryWrapper<Task>().likeRight(Task::getName, name + "-"));
return Math.toIntExact(value);
}

@Override
Expand Down
6 changes: 3 additions & 3 deletions dinky-admin/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ spring:
application:
name: Dinky
profiles:
# The h2 database is used by default. If you need to use other databases, please set the configuration active to: mysql, currently supports [mysql, pgsql, h2]
# The h2 database is used by default. If you need to use other databases, please set the configuration active to: mysql, currently supports [mysql, postgresql, h2]
# If you use mysql database, please configure mysql database connection information in application-mysql.yml
# If you use pgsql database, please configure pgsql database connection information in application-pgsql.yml
# If you use pgsql database, please configure pgsql database connection information in application-postgresql.yml
# If you use the h2 database, please configure the h2 database connection information in application-h2.yml,
# note: the h2 database is only for experience use, and the related data that has been created cannot be migrated, please use it with caution
active: ${DB_ACTIVE:h2} #[h2,mysql,pgsql]
active: ${DB_ACTIVE:h2} #[h2,mysql,postgresql]
include:
- jmx
- flyway
Expand Down

This file was deleted.

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@



-- Table structure for public.dinky_user

CREATE TABLE IF NOT EXISTS public.dinky_user_backup
(
id serial PRIMARY KEY NOT NULL,
username varchar(50) NOT NULL,
user_type int DEFAULT 1,
password varchar(50),
nickname varchar(50),
worknum varchar(50),
avatar bytea,
mobile varchar(20),
enabled boolean NOT NULL DEFAULT true,
super_admin_flag boolean DEFAULT false,
is_delete boolean NOT NULL DEFAULT false,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
COMMENT ON TABLE public.dinky_user_backup IS 'user';
COMMENT ON COLUMN public.dinky_user_backup.id IS 'id';
COMMENT ON COLUMN public.dinky_user_backup.username IS 'username';
COMMENT ON COLUMN public.dinky_user_backup.password IS 'password';
COMMENT ON COLUMN public.dinky_user_backup.nickname IS 'nickname';
COMMENT ON COLUMN public.dinky_user_backup.worknum IS 'worknum';
COMMENT ON COLUMN public.dinky_user_backup.avatar IS 'avatar';
COMMENT ON COLUMN public.dinky_user_backup.mobile IS 'mobile phone';
COMMENT ON COLUMN public.dinky_user_backup.enabled IS 'enabled';
COMMENT ON COLUMN public.dinky_user_backup.super_admin_flag IS 'is super admin(0:false,1true)';


insert into public.dinky_user_backup(id, username, user_type, password, nickname, worknum, avatar, mobile, enabled, super_admin_flag, is_delete, create_time, update_time)
select id, username, user_type, password, nickname, worknum, avatar, mobile, enabled,
case when super_admin_flag = 0 then false else true end as super_admin_flag
, is_delete, create_time, update_time
from public.dinky_user;

drop table if exists public.dinky_user;
alter table public.dinky_user_backup rename to dinky_user;
6 changes: 1 addition & 5 deletions dinky-admin/src/main/resources/mapper/TaskMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@
</select>


<select id="queryAllSizeByName" resultType="java.lang.Integer">
SELECT count(*)
from dinky_task
where `name` REGEXP concat(#{name}, '-[0-9]$')
</select>


<select id="getTaskByNameAndTenantId" resultType="org.dinky.data.model.Task">
select *
Expand Down
7 changes: 7 additions & 0 deletions dinky-assembly/src/main/assembly/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
<outputDirectory>extends/flink1.14/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.14-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.14-${project.version}.jar</include>
<include>dinky-client-1.14-${project.version}.jar</include>
<include>dinky-connector-jdbc-1.14-${project.version}.jar</include>
</includes>
Expand All @@ -87,6 +88,7 @@
<outputDirectory>extends/flink1.15/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.15-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.15-${project.version}.jar</include>
<include>dinky-client-1.15-${project.version}.jar</include>
</includes>
</fileSet>
Expand All @@ -95,6 +97,7 @@
<outputDirectory>extends/flink1.16/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.16-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.16-${project.version}.jar</include>
<include>dinky-client-1.16-${project.version}.jar</include>
</includes>
</fileSet>
Expand All @@ -103,6 +106,7 @@
<outputDirectory>extends/flink1.17/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.17-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.17-${project.version}.jar</include>
<include>dinky-client-1.17-${project.version}.jar</include>
</includes>
</fileSet>
Expand All @@ -111,6 +115,7 @@
<outputDirectory>extends/flink1.18/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.18-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.18-${project.version}.jar</include>
<include>dinky-client-1.18-${project.version}.jar</include>
</includes>
</fileSet>
Expand All @@ -119,6 +124,7 @@
<outputDirectory>extends/flink1.19/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.19-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.19-${project.version}.jar</include>
<include>dinky-client-1.19-${project.version}.jar</include>
</includes>
</fileSet>
Expand All @@ -127,6 +133,7 @@
<outputDirectory>extends/flink1.20/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.20-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.20-${project.version}.jar</include>
<include>dinky-client-1.20-${project.version}.jar</include>
</includes>
</fileSet>
Expand Down
Loading

0 comments on commit 9254e4c

Please sign in to comment.