Skip to content

Commit

Permalink
[Feature] support pgsql catalog (DataLinkDC#4000)
Browse files Browse the repository at this point in the history
Co-authored-by: Zzm0809 <[email protected]>
Co-authored-by: GH Action - Upstream Sync <[email protected]>
  • Loading branch information
3 people authored Dec 4, 2024
1 parent 5674a45 commit 52d3bc7
Show file tree
Hide file tree
Showing 71 changed files with 13,313 additions and 77 deletions.
44 changes: 0 additions & 44 deletions .github/workflows/code-review.yml

This file was deleted.

2 changes: 0 additions & 2 deletions dinky-admin/src/main/java/org/dinky/mapper/TaskMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
@Mapper
public interface TaskMapper extends SuperMapper<Task> {

Integer queryAllSizeByName(String name);

List<Task> queryOnLineTaskByDoneStatus(
@Param("parentIds") List<Integer> parentIds,
@Param("stepIds") List<Integer> stepIds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ private String getFileText(File sourceFile) {
}
}
} catch (Exception e) {
log.error("read file error, {} ", e);
log.error("read file error, {} ", e.getMessage(), e);
}
return sb.toString();
}
Expand Down Expand Up @@ -555,7 +555,6 @@ public Result<Void> deleteCatalogueById(Integer catalogueId) {

if (CollUtil.isNotEmpty(metricListByTaskId)) {
metricListByTaskId.forEach(metrics -> monitorService.removeById(metrics.getId()));
// todo: 需要删除 paimon 中的监控数据, 但是 paimon 中没有提供删除接口
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.dto.TaskSubmitDto;
import org.dinky.data.enums.CatalogTypeMappingEnum;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.JobStatus;
Expand Down Expand Up @@ -87,11 +88,9 @@
import org.dinky.service.SavepointsService;
import org.dinky.service.TaskService;
import org.dinky.service.TaskVersionService;
import org.dinky.service.UDFService;
import org.dinky.service.UDFTemplateService;
import org.dinky.service.UserService;
import org.dinky.service.catalogue.CatalogueService;
import org.dinky.service.resource.ResourcesService;
import org.dinky.service.task.BaseTask;
import org.dinky.utils.FragmentVariableUtils;
import org.dinky.utils.JsonUtils;
Expand Down Expand Up @@ -119,13 +118,15 @@

import javax.annotation.Resource;

import org.jetbrains.annotations.NotNull;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;

import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -164,8 +165,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private final DataSourceProperties dsProperties;
private final UserService userService;
private final ApplicationContext applicationContext;
private final UDFService udfService;
private final ResourcesService resourcesService;
private final DruidDataSource druidProperties;

@Resource
@Lazy
Expand Down Expand Up @@ -685,19 +685,7 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) {

Task defaultFlinkSQLEnvTask = getTaskByNameAndTenantId(name, tenantId);

String sql = String.format(
"create catalog my_catalog with(\n "
+ "'type' = 'dinky_mysql',\n"
+ " 'username' = "
+ "'%s',\n "
+ "'password' = '%s',\n"
+ " 'url' = '%s'\n"
+ ")%suse catalog my_catalog%s",
dsProperties.getUsername(),
dsProperties.getPassword(),
dsProperties.getUrl(),
FlinkSQLConstant.SEPARATOR,
FlinkSQLConstant.SEPARATOR);
String sql = getStatementByCatalogType(CatalogTypeMappingEnum.ofDbType(druidProperties.getDbType()));

if (null != defaultFlinkSQLEnvTask) {
defaultFlinkSQLEnvTask.setStatement(sql);
Expand All @@ -720,6 +708,31 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) {
return defaultFlinkSQLEnvTask;
}

private @NotNull String getStatementByCatalogType(CatalogTypeMappingEnum catalogTypeMappingEnum) {
String sql = String.format(
"create catalog my_catalog_%s with(\n "
+ "'type' = '%s',\n"
+ " 'username' = "
+ "'%s',\n "
+ "'password' = '%s',\n"
+ " 'url' = '%s'\n"
+ ")%suse catalog my_catalog_%s %s",
catalogTypeMappingEnum.getCatalogTypeName(),
catalogTypeMappingEnum.getCatalogTypeName(),
dsProperties.getUsername(),
dsProperties.getPassword(),
dsProperties.getUrl(),
FlinkSQLConstant.SEPARATOR,
catalogTypeMappingEnum.getCatalogTypeName(),
FlinkSQLConstant.SEPARATOR);
log.debug(
"Init default flink sql env sql:{}, yours dbType is:{}, catalogName is:{}",
sql,
catalogTypeMappingEnum.getDbType(),
catalogTypeMappingEnum.getCatalogTypeName());
return sql;
}

@Override
public Task getTaskByNameAndTenantId(String name, Integer tenantId) {
return baseMapper.getTaskByNameAndTenantId(name, tenantId);
Expand Down Expand Up @@ -778,7 +791,8 @@ public String getTaskAPIAddress() {

@Override
public Integer queryAllSizeByName(String name) {
return baseMapper.queryAllSizeByName(name);
Long value = baseMapper.selectCount(new LambdaQueryWrapper<Task>().likeRight(Task::getName, name + "-"));
return Math.toIntExact(value);
}

@Override
Expand Down
6 changes: 3 additions & 3 deletions dinky-admin/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ spring:
application:
name: Dinky
profiles:
# The h2 database is used by default. If you need to use other databases, please set the configuration active to: mysql, currently supports [mysql, pgsql, h2]
# The h2 database is used by default. If you need to use other databases, please set the configuration active to: mysql, currently supports [mysql, postgresql, h2]
# If you use mysql database, please configure mysql database connection information in application-mysql.yml
# If you use pgsql database, please configure pgsql database connection information in application-pgsql.yml
# If you use pgsql database, please configure pgsql database connection information in application-postgresql.yml
# If you use the h2 database, please configure the h2 database connection information in application-h2.yml,
# note: the h2 database is only for experience use, and the related data that has been created cannot be migrated, please use it with caution
active: ${DB_ACTIVE:h2} #[h2,mysql,pgsql]
active: ${DB_ACTIVE:h2} #[h2,mysql,postgresql]
include:
- jmx
- flyway
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@



-- Table structure for public.dinky_user

CREATE TABLE IF NOT EXISTS public.dinky_user_backup
(
id serial PRIMARY KEY NOT NULL,
username varchar(50) NOT NULL,
user_type int DEFAULT 1,
password varchar(50),
nickname varchar(50),
worknum varchar(50),
avatar bytea,
mobile varchar(20),
enabled boolean NOT NULL DEFAULT true,
super_admin_flag boolean DEFAULT false,
is_delete boolean NOT NULL DEFAULT false,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
COMMENT ON TABLE public.dinky_user_backup IS 'user';
COMMENT ON COLUMN public.dinky_user_backup.id IS 'id';
COMMENT ON COLUMN public.dinky_user_backup.username IS 'username';
COMMENT ON COLUMN public.dinky_user_backup.password IS 'password';
COMMENT ON COLUMN public.dinky_user_backup.nickname IS 'nickname';
COMMENT ON COLUMN public.dinky_user_backup.worknum IS 'worknum';
COMMENT ON COLUMN public.dinky_user_backup.avatar IS 'avatar';
COMMENT ON COLUMN public.dinky_user_backup.mobile IS 'mobile phone';
COMMENT ON COLUMN public.dinky_user_backup.enabled IS 'enabled';
COMMENT ON COLUMN public.dinky_user_backup.super_admin_flag IS 'is super admin(0:false,1true)';


insert into public.dinky_user_backup(id, username, user_type, password, nickname, worknum, avatar, mobile, enabled, super_admin_flag, is_delete, create_time, update_time)
select id, username, user_type, password, nickname, worknum, avatar, mobile, enabled,
case when super_admin_flag = 0 then false else true end as super_admin_flag
, is_delete, create_time, update_time
from public.dinky_user;

drop table if exists public.dinky_user;
alter table public.dinky_user_backup rename to dinky_user;
6 changes: 1 addition & 5 deletions dinky-admin/src/main/resources/mapper/TaskMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@
</select>


<select id="queryAllSizeByName" resultType="java.lang.Integer">
SELECT count(*)
from dinky_task
where `name` REGEXP concat(#{name}, '-[0-9]$')
</select>


<select id="getTaskByNameAndTenantId" resultType="org.dinky.data.model.Task">
select *
Expand Down
7 changes: 7 additions & 0 deletions dinky-assembly/src/main/assembly/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
<outputDirectory>extends/flink1.14/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.14-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.14-${project.version}.jar</include>
<include>dinky-client-1.14-${project.version}.jar</include>
<include>dinky-connector-jdbc-1.14-${project.version}.jar</include>
</includes>
Expand All @@ -87,6 +88,7 @@
<outputDirectory>extends/flink1.15/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.15-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.15-${project.version}.jar</include>
<include>dinky-client-1.15-${project.version}.jar</include>
</includes>
</fileSet>
Expand All @@ -95,6 +97,7 @@
<outputDirectory>extends/flink1.16/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.16-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.16-${project.version}.jar</include>
<include>dinky-client-1.16-${project.version}.jar</include>
</includes>
</fileSet>
Expand All @@ -103,6 +106,7 @@
<outputDirectory>extends/flink1.17/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.17-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.17-${project.version}.jar</include>
<include>dinky-client-1.17-${project.version}.jar</include>
</includes>
</fileSet>
Expand All @@ -111,6 +115,7 @@
<outputDirectory>extends/flink1.18/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.18-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.18-${project.version}.jar</include>
<include>dinky-client-1.18-${project.version}.jar</include>
</includes>
</fileSet>
Expand All @@ -119,6 +124,7 @@
<outputDirectory>extends/flink1.19/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.19-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.19-${project.version}.jar</include>
<include>dinky-client-1.19-${project.version}.jar</include>
</includes>
</fileSet>
Expand All @@ -127,6 +133,7 @@
<outputDirectory>extends/flink1.20/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.20-${project.version}.jar</include>
<include>dinky-catalog-postgres-1.20-${project.version}.jar</include>
<include>dinky-client-1.20-${project.version}.jar</include>
</includes>
</fileSet>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.dinky</groupId>
<artifactId>dinky-catalog-postgres</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>dinky-catalog-postgres-1.14</artifactId>
<packaging>jar</packaging>
<name>Dinky : Catalog : Postgres 1.14</name>

<dependencies>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-common</artifactId>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-flink-1.14</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<!-- 指定打包的jar包输出路径 -->
<outputDirectory>${project.parent.parent.parent.basedir}/build/extends</outputDirectory>
</configuration>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit 52d3bc7

Please sign in to comment.