diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/pom.xml b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/pom.xml
index d0db0541..77d1aab9 100644
--- a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/pom.xml
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/pom.xml
@@ -48,6 +48,18 @@
${project.version}
+
+ io.datavines
+ datavines-engine-config
+ ${project.version}
+
+
+
+ io.datavines
+ datavines-connector-api
+ ${project.version}
+
+
org.apache.flink
flink-core
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/BaseFlinkConfigurationBuilder.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/BaseFlinkConfigurationBuilder.java
new file mode 100644
index 00000000..d2e4d7ff
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/BaseFlinkConfigurationBuilder.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.engine.flink.config;
+
+import io.datavines.common.config.EnvConfig;
+import io.datavines.common.config.SinkConfig;
+import io.datavines.common.config.SourceConfig;
+import io.datavines.common.config.enums.SinkType;
+import io.datavines.common.config.enums.SourceType;
+import io.datavines.common.entity.ConnectorParameter;
+import io.datavines.common.entity.job.BaseJobParameter;
+import io.datavines.common.exception.DataVinesException;
+import io.datavines.common.utils.JSONUtils;
+import io.datavines.common.utils.StringUtils;
+import io.datavines.connector.api.ConnectorFactory;
+import io.datavines.engine.common.utils.ParserUtils;
+import io.datavines.engine.config.BaseJobConfigurationBuilder;
+import io.datavines.spi.PluginLoader;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static io.datavines.common.CommonConstants.*;
+import static io.datavines.common.ConfigConstants.*;
+import static io.datavines.common.ConfigConstants.TABLE;
+
+/**
+ *
+ *
+ * @author dataVines
+ * @since 2021-07-01
+ */
+@Slf4j
+public abstract class BaseFlinkConfigurationBuilder extends BaseJobConfigurationBuilder {
+
+ @Override
+ protected EnvConfig getEnvConfig() {
+ EnvConfig envConfig = new EnvConfig();
+ envConfig.setEngine(jobExecutionInfo.getEngineType());
+ Map configMap = envConfig.getConfig();
+ if (configMap == null) {
+ configMap = new HashMap<>();
+ }
+
+ ConnectorParameter connectorParameter = jobExecutionParameter.getConnectorParameter();
+ String srcConnectorType = "";
+ if (connectorParameter != null) {
+ srcConnectorType = connectorParameter.getType();
+ }
+
+ ConnectorParameter connectorParameter2 = jobExecutionParameter.getConnectorParameter2();
+ String srcConnectorType2 = "";
+ if (connectorParameter2 != null) {
+ srcConnectorType2 = connectorParameter2.getType();
+ }
+
+ envConfig.setConfig(configMap);
+ return envConfig;
+ }
+
+ @Override
+ protected List getSourceConfigs() throws DataVinesException {
+ List sourceConfigs = new ArrayList<>();
+ List metricJobParameterList = jobExecutionParameter.getMetricParameterList();
+ boolean isAddValidateResultDataSource = false;
+ if (CollectionUtils.isNotEmpty(metricJobParameterList)) {
+ Set sourceConnectorSet = new HashSet<>();
+ Set targetConnectorSet = new HashSet<>();
+ for (BaseJobParameter parameter : metricJobParameterList) {
+ String metricUniqueKey = getMetricUniqueKey(parameter);
+ Map metricInputParameter = metric2InputParameter.get(metricUniqueKey);
+ if (jobExecutionParameter.getConnectorParameter() != null) {
+ ConnectorParameter connectorParameter = jobExecutionParameter.getConnectorParameter();
+ SourceConfig sourceConfig = new SourceConfig();
+
+ Map connectorParameterMap = new HashMap<>(connectorParameter.getParameters());
+ connectorParameterMap.putAll(metricInputParameter);
+
+ if (connectorParameter.getParameters().get(SCHEMA) != null) {
+ metricInputParameter.put(SCHEMA, (String)connectorParameter.getParameters().get(SCHEMA));
+ }
+
+ metricInputParameter.put(DATABASE_NAME, metricInputParameter.get(DATABASE));
+ metricInputParameter.put(TABLE_NAME, metricInputParameter.get(TABLE));
+ metricInputParameter.put(COLUMN_NAME, metricInputParameter.get(COLUMN));
+
+ ConnectorFactory connectorFactory = PluginLoader
+ .getPluginLoader(ConnectorFactory.class)
+ .getNewPlugin(connectorParameter.getType());
+
+ connectorParameterMap.put(TABLE, metricInputParameter.get(TABLE));
+ connectorParameterMap.put(DATABASE, metricInputParameter.get(DATABASE));
+ connectorParameterMap = connectorFactory.getConnectorParameterConverter().converter(connectorParameterMap);
+ connectorParameterMap.put(PASSWORD, ParserUtils.encode((String)connectorParameterMap.get(PASSWORD)));
+
+ String outputTable = getOutputTable(metricInputParameter.get(DATABASE), metricInputParameter.get(SCHEMA), metricInputParameter.get(TABLE));
+ String tableAlias = getTableAlias(metricInputParameter.get(DATABASE), metricInputParameter.get(SCHEMA), metricInputParameter.get(TABLE), "1");
+ connectorParameterMap.put(OUTPUT_TABLE, outputTable);
+ connectorParameterMap.put(DRIVER, connectorFactory.getDialect().getDriver());
+
+ metricInputParameter.put(TABLE, outputTable);
+ metricInputParameter.put(TABLE_ALIAS, tableAlias);
+ metricInputParameter.put(COLUMN, metricInputParameter.get(COLUMN));
+ metricInputParameter.put(REGEX_KEY, "REGEXP(${column}, ${regex})");
+ metricInputParameter.put(NOT_REGEX_KEY, "NOT REGEXP(${column}, ${regex})");
+ metricInputParameter.put(STRING_TYPE, "STRING");
+ metricInputParameter.put(IF_FUNCTION_KEY, "IF");
+ metricInputParameter.put(LIMIT_TOP_50_KEY, " LIMIT 50");
+ metricInputParameter.put(LENGTH_KEY, "CHARACTER_LENGTH(${column})");
+ metricInputParameter.put(SRC_CONNECTOR_TYPE, connectorParameter.getType());
+ metricInputParameter.put(ENGINE_TYPE, jobExecutionInfo.getEngineType());
+
+ String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);
+
+ if (sourceConnectorSet.contains(connectorUUID)) {
+ continue;
+ }
+
+ sourceConfig.setPlugin(connectorFactory.getCategory());
+ sourceConfig.setConfig(connectorParameterMap);
+ sourceConfig.setType(SourceType.SOURCE.getDescription());
+ sourceConfigs.add(sourceConfig);
+ sourceConnectorSet.add(connectorUUID);
+ }
+
+ if (jobExecutionParameter.getConnectorParameter2() != null
+ && jobExecutionParameter.getConnectorParameter2().getParameters() != null) {
+ ConnectorParameter connectorParameter2 = jobExecutionParameter.getConnectorParameter2();
+ SourceConfig sourceConfig = new SourceConfig();
+
+ Map connectorParameterMap = new HashMap<>(connectorParameter2.getParameters());
+ connectorParameterMap.putAll(metricInputParameter);
+
+ if (connectorParameter2.getParameters().get(SCHEMA) != null) {
+ metricInputParameter.put(SCHEMA2, (String)connectorParameter2.getParameters().get(SCHEMA));
+ }
+
+ ConnectorFactory connectorFactory = PluginLoader
+ .getPluginLoader(ConnectorFactory.class)
+ .getNewPlugin(connectorParameter2.getType());
+
+ connectorParameterMap.put(TABLE, metricInputParameter.get(TABLE2));
+ connectorParameterMap.put(DATABASE, metricInputParameter.get(DATABASE2));
+ connectorParameterMap = connectorFactory.getConnectorParameterConverter().converter(connectorParameterMap);
+ connectorParameterMap.put(PASSWORD, ParserUtils.encode((String)connectorParameterMap.get(PASSWORD)));
+
+ String outputTable = getOutputTable(metricInputParameter.get(DATABASE2),
+ metricInputParameter.get(SCHEMA2),
+ metricInputParameter.get(TABLE2)) + "_2";
+
+ String tableAlias = getTableAlias(metricInputParameter.get(DATABASE2),
+ metricInputParameter.get(SCHEMA2),
+ metricInputParameter.get(TABLE2), "2");
+
+ connectorParameterMap.put(OUTPUT_TABLE, outputTable);
+ connectorParameterMap.put(DRIVER, connectorFactory.getDialect().getDriver());
+
+ metricInputParameter.put(TABLE2, outputTable);
+ metricInputParameter.put(TABLE2_ALIAS, tableAlias);
+
+ String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);
+
+ if (targetConnectorSet.contains(connectorUUID)) {
+ continue;
+ }
+
+ sourceConfig.setPlugin(connectorFactory.getCategory());
+ sourceConfig.setConfig(connectorParameterMap);
+ sourceConfig.setType(SourceType.SOURCE.getDescription());
+ sourceConfigs.add(sourceConfig);
+ targetConnectorSet.add(connectorUUID);
+ }
+
+ metric2InputParameter.put(metricUniqueKey, metricInputParameter);
+ }
+ }
+
+ return sourceConfigs;
+ }
+
+ protected String getOutputTable(String database, String schema, String table) {
+ if (StringUtils.isNotEmpty(schema)) {
+ return String.format("%s_%s_%s", database, schema, table);
+ }
+ return String.format("%s_%s", database, table);
+ }
+
+ protected String getTableAlias(String database, String schema, String table, String order) {
+ if (StringUtils.isNotEmpty(schema)) {
+ return String.format("t%s_%s_%s_%s", order, database, schema, table);
+ }
+ return String.format("t%s_%s_%s", order, database, table);
+ }
+
+ protected SinkConfig getErrorSinkConfig(Map inputParameter) {
+ if (FILE.equalsIgnoreCase(jobExecutionInfo.getErrorDataStorageType())) {
+ SinkConfig sinkConfig = new SinkConfig();
+ Map configMap = new HashMap<>();
+ Map errorDataParameterMap = JSONUtils.toMap(jobExecutionInfo.getErrorDataStorageParameter(),String.class, String.class);
+ configMap.put(DATA_DIR, errorDataParameterMap.get(DATA_DIR));
+ configMap.put(FILE_NAME, inputParameter.get(ERROR_DATA_FILE_NAME));
+ configMap.put(COLUMN_SEPARATOR, errorDataParameterMap.get(COLUMN_SEPARATOR));
+ configMap.put(LINE_SEPARATOR, errorDataParameterMap.get(LINE_SEPARATOR));
+ sinkConfig.setConfig(configMap);
+ sinkConfig.setType(SinkType.ERROR_DATA.getDescription());
+ sinkConfig.setPlugin(FILE);
+ return sinkConfig;
+ }
+ return null;
+ }
+}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkEngineConfig.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkEngineConfig.java
new file mode 100644
index 00000000..d4e47eb3
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkEngineConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.engine.flink.config;
+
+import io.datavines.common.config.Config;
+import io.datavines.common.config.CheckResult;
+import io.datavines.engine.api.plugin.Plugin;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+
+import java.io.Serializable;
+
+public class FlinkEngineConfig implements Plugin, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String CHECKPOINT_INTERVAL = "flink.checkpoint.interval";
+ private static final String PARALLELISM = "flink.parallelism";
+ private static final String RESTART_ATTEMPTS = "flink.restart.attempts";
+ private static final String RESTART_DELAY = "flink.restart.delay";
+ private static final String STATE_BACKEND = "flink.state.backend";
+ private static final String CHECKPOINT_PATH = "flink.checkpoint.path";
+ private static final String EXECUTION_MODE = "flink.execution.mode";
+
+ private Config config;
+
+ public FlinkEngineConfig() {
+ this.config = new Config();
+ }
+
+ @Override
+ public void setConfig(Config config) {
+ this.config = config != null ? config : new Config();
+ }
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ return new CheckResult(true, "");
+ }
+
+ public long getCheckpointInterval() {
+ return config.getLong(CHECKPOINT_INTERVAL, 10000L);
+ }
+
+ public int getParallelism() {
+ return config.getInt(PARALLELISM, 1);
+ }
+
+ public int getRestartAttempts() {
+ return config.getInt(RESTART_ATTEMPTS, 3);
+ }
+
+ public long getRestartDelay() {
+ return config.getLong(RESTART_DELAY, 10000L);
+ }
+
+ public String getStateBackend() {
+ return config.getString(STATE_BACKEND, "memory");
+ }
+
+ public String getCheckpointPath() {
+ return config.getString(CHECKPOINT_PATH, "");
+ }
+
+ public RuntimeExecutionMode getExecutionMode() {
+ String mode = config.getString(EXECUTION_MODE, "STREAMING");
+ return RuntimeExecutionMode.valueOf(mode.toUpperCase());
+ }
+}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkSingleTableConfigurationBuilder.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkSingleTableConfigurationBuilder.java
new file mode 100644
index 00000000..c839d1a9
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkSingleTableConfigurationBuilder.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.engine.flink.config;
+
+import io.datavines.common.config.EnvConfig;
+import io.datavines.common.config.SinkConfig;
+import io.datavines.common.config.SourceConfig;
+import io.datavines.common.config.enums.SinkType;
+import io.datavines.common.entity.job.BaseJobParameter;
+import io.datavines.common.exception.DataVinesException;
+import io.datavines.common.utils.StringUtils;
+import io.datavines.engine.config.MetricParserUtils;
+import io.datavines.metric.api.ExpectedValue;
+import io.datavines.spi.PluginLoader;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static io.datavines.common.ConfigConstants.*;
+
+public class FlinkSingleTableConfigurationBuilder extends BaseFlinkConfigurationBuilder {
+
+
+ @Override
+ public void buildEnvConfig() {
+ EnvConfig envConfig = new EnvConfig();
+ envConfig.setEngine("flink");
+ configuration.setEnvConfig(envConfig);
+ }
+
+ @Override
+ public void buildSinkConfigs() throws DataVinesException {
+ List sinkConfigs = new ArrayList<>();
+
+ List metricJobParameterList = jobExecutionParameter.getMetricParameterList();
+ if (CollectionUtils.isNotEmpty(metricJobParameterList)) {
+ for (BaseJobParameter parameter : metricJobParameterList) {
+ String metricUniqueKey = getMetricUniqueKey(parameter);
+ Map metricInputParameter = metric2InputParameter.get(metricUniqueKey);
+ if (metricInputParameter == null) {
+ continue;
+ }
+
+ // 确保必要的参数存在
+ if (!metricInputParameter.containsKey(METRIC_NAME) && parameter.getMetricType() != null) {
+ metricInputParameter.put(METRIC_NAME, parameter.getMetricType());
+ }
+
+ metricInputParameter.put(METRIC_UNIQUE_KEY, metricUniqueKey);
+ String expectedType = "local_" + parameter.getExpectedType();
+ ExpectedValue expectedValue = PluginLoader
+ .getPluginLoader(ExpectedValue.class)
+ .getNewPlugin(expectedType);
+
+ // 只有在确保必要参数存在的情况下才生成 uniqueCode
+ if (metricInputParameter.containsKey(METRIC_NAME)) {
+ metricInputParameter.put(UNIQUE_CODE, StringUtils.wrapperSingleQuotes(MetricParserUtils.generateUniqueCode(metricInputParameter)));
+ }
+
+ // Get the actual value storage parameter
+ String actualValueSinkSql = FlinkSinkSqlBuilder.getActualValueSql()
+ .replace("${actual_value}", "${actual_value_" + metricUniqueKey + "}");
+ SinkConfig actualValueSinkConfig = getValidateResultDataSinkConfig(
+ expectedValue, actualValueSinkSql, "dv_actual_values", metricInputParameter);
+
+ if (actualValueSinkConfig != null) {
+ actualValueSinkConfig.setType(SinkType.ACTUAL_VALUE.getDescription());
+ sinkConfigs.add(actualValueSinkConfig);
+ }
+
+ String taskSinkSql = FlinkSinkSqlBuilder.getDefaultSinkSql()
+ .replace("${actual_value}", "${actual_value_" + metricUniqueKey + "}")
+ .replace("${expected_value}", "${expected_value_" + metricUniqueKey + "}");
+
+ // Get the task data storage parameter
+ SinkConfig taskResultSinkConfig = getValidateResultDataSinkConfig(
+ expectedValue, taskSinkSql, "dv_job_execution_result", metricInputParameter);
+ if (taskResultSinkConfig != null) {
+ taskResultSinkConfig.setType(SinkType.VALIDATE_RESULT.getDescription());
+ // 设置默认状态为未知(NONE)
+ taskResultSinkConfig.getConfig().put("default_state", "0");
+ // 添加其他必要参数
+ taskResultSinkConfig.getConfig().put("metric_type", "single_table");
+ taskResultSinkConfig.getConfig().put("metric_name", metricInputParameter.get(METRIC_NAME));
+ taskResultSinkConfig.getConfig().put("metric_dimension", metricInputParameter.get(METRIC_DIMENSION));
+ taskResultSinkConfig.getConfig().put("database_name", metricInputParameter.get(DATABASE));
+ taskResultSinkConfig.getConfig().put("table_name", metricInputParameter.get(TABLE));
+ taskResultSinkConfig.getConfig().put("column_name", metricInputParameter.get(COLUMN));
+ taskResultSinkConfig.getConfig().put("expected_type", metricInputParameter.get(EXPECTED_TYPE));
+ taskResultSinkConfig.getConfig().put("result_formula", metricInputParameter.get(RESULT_FORMULA));
+ sinkConfigs.add(taskResultSinkConfig);
+ }
+
+ // Get the error data storage parameter if needed
+ if (StringUtils.isNotEmpty(jobExecutionInfo.getErrorDataStorageType())
+ && StringUtils.isNotEmpty(jobExecutionInfo.getErrorDataStorageParameter())) {
+ SinkConfig errorDataSinkConfig = getErrorSinkConfig(metricInputParameter);
+ if (errorDataSinkConfig != null) {
+ errorDataSinkConfig.setType(SinkType.ERROR_DATA.getDescription());
+ sinkConfigs.add(errorDataSinkConfig);
+ }
+ }
+ }
+ }
+
+ configuration.setSinkParameters(sinkConfigs);
+ }
+
+ @Override
+ public void buildTransformConfigs() {
+ // No transform configs needed for single table configuration
+ }
+
+ @Override
+ public void buildSourceConfigs() throws DataVinesException {
+ List sourceConfigs = getSourceConfigs();
+ configuration.setSourceParameters(sourceConfigs);
+ }
+
+ @Override
+ public void buildName() {
+ // Use default name from base implementation
+ }
+}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkSinkSqlBuilder.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkSinkSqlBuilder.java
new file mode 100644
index 00000000..568dad4b
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkSinkSqlBuilder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.engine.flink.config;
+
+public class FlinkSinkSqlBuilder {
+
+ private FlinkSinkSqlBuilder() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static String getActualValueSql() {
+ return "select\n" +
+ " '${job_execution_id}' as job_execution_id,\n" +
+ " '${metric_unique_key}' as metric_unique_key,\n" +
+ " '${unique_code}' as unique_code,\n" +
+ " ${actual_value} as actual_value,\n" +
+ " cast(null as string) as expected_value,\n" +
+ " cast(null as string) as operator,\n" +
+ " cast(null as string) as threshold,\n" +
+ " cast(null as string) as check_type,\n" +
+ " CURRENT_TIMESTAMP as create_time,\n" +
+ " CURRENT_TIMESTAMP as update_time\n" +
+ "from ${table_name}";
+ }
+
+ public static String getDefaultSinkSql() {
+ return "select\n" +
+ " '${job_execution_id}' as job_execution_id,\n" +
+ " '${metric_unique_key}' as metric_unique_key,\n" +
+ " '${unique_code}' as unique_code,\n" +
+ " CASE WHEN ${actual_value} IS NULL THEN NULL ELSE ${actual_value} END as actual_value,\n" +
+ " CASE WHEN ${expected_value} IS NULL THEN NULL ELSE ${expected_value} END as expected_value,\n" +
+ " '${metric_type}' as metric_type,\n" +
+ " '${metric_name}' as metric_name,\n" +
+ " '${metric_dimension}' as metric_dimension,\n" +
+ " '${database_name}' as database_name,\n" +
+ " '${table_name}' as table_name,\n" +
+ " '${column_name}' as column_name,\n" +
+ " '${operator}' as operator,\n" +
+ " '${threshold}' as threshold,\n" +
+ " '${expected_type}' as expected_type,\n" +
+ " '${result_formula}' as result_formula,\n" +
+ " CASE WHEN ${actual_value} IS NULL THEN '${default_state}' ELSE NULL END as state,\n" +
+ " CURRENT_TIMESTAMP as create_time,\n" +
+ " CURRENT_TIMESTAMP as update_time\n" +
+ "from ${table_name} full join ${expected_table}";
+ }
+}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/resources/META-INF/plugins/io.datavines.engine.api.engine.EngineExecutor b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/resources/META-INF/plugins/io.datavines.engine.api.engine.EngineExecutor
similarity index 100%
rename from datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/resources/META-INF/plugins/io.datavines.engine.api.engine.EngineExecutor
rename to datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/resources/META-INF/plugins/io.datavines.engine.api.engine.EngineExecutor
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/resources/META-INF/plugins/io.datavines.engine.config.JobConfigurationBuilder b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/resources/META-INF/plugins/io.datavines.engine.config.JobConfigurationBuilder
similarity index 100%
rename from datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/resources/META-INF/plugins/io.datavines.engine.config.JobConfigurationBuilder
rename to datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/resources/META-INF/plugins/io.datavines.engine.config.JobConfigurationBuilder
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/resources/META-INF/services/io.datavines.engine.config.JobConfigurationBuilder b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/resources/META-INF/services/io.datavines.engine.config.JobConfigurationBuilder
similarity index 100%
rename from datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/resources/META-INF/services/io.datavines.engine.config.JobConfigurationBuilder
rename to datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/resources/META-INF/services/io.datavines.engine.config.JobConfigurationBuilder
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/pom.xml b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/pom.xml
index 23464d5b..80c6ecb2 100644
--- a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/pom.xml
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/pom.xml
@@ -35,6 +35,11 @@
datavines-engine-flink-api
${project.version}
+
+ io.datavines
+ datavines-engine-api
+ ${project.version}
+
io.datavines
datavines-engine-core
@@ -65,5 +70,15 @@
datavines-common
${project.version}
+
+ io.datavines
+ datavines-engine-flink-executor
+ ${project.version}
+
+
+ io.datavines
+ datavines-engine-flink-config
+ ${project.version}
+
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/FlinkEngine.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/FlinkEngine.java
new file mode 100644
index 00000000..fbbc1727
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/FlinkEngine.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.engine.flink;
+
+import io.datavines.common.config.Config;
+import io.datavines.common.config.Configurations;
+import io.datavines.common.entity.JobExecutionRequest;
+import io.datavines.common.entity.ProcessResult;
+
+import org.slf4j.Logger;
+
+import io.datavines.common.config.CheckResult;
+import io.datavines.engine.api.engine.EngineExecutor;
+import io.datavines.engine.flink.config.FlinkEngineConfig;
+import io.datavines.engine.flink.executor.FlinkEngineExecutor;
+
+public class FlinkEngine implements EngineExecutor {
+
+ private FlinkEngineConfig flinkEngineConfig;
+ private FlinkEngineExecutor executor;
+
+ public FlinkEngine() {
+ this.flinkEngineConfig = new FlinkEngineConfig();
+ this.executor = new FlinkEngineExecutor();
+ }
+
+ public void init(JobExecutionRequest jobExecutionRequest, Logger logger, Configurations configurations) throws Exception {
+ executor.init(jobExecutionRequest, logger, configurations);
+ }
+
+ public void execute() throws Exception {
+ executor.execute();
+ }
+
+ public void after() throws Exception {
+ executor.after();
+ }
+
+ public void cancel() throws Exception {
+ executor.cancel();
+ }
+
+ public boolean isCancel() throws Exception {
+ return executor.isCancel();
+ }
+
+ public ProcessResult getProcessResult() {
+ return executor.getProcessResult();
+ }
+
+ public JobExecutionRequest getTaskRequest() {
+ return executor.getTaskRequest();
+ }
+
+ public String getName() {
+ return "flink";
+ }
+
+ public void setConfig(Config config) {
+ if (config != null) {
+ this.flinkEngineConfig.setConfig(config);
+ }
+ }
+
+ public Config getConfig() {
+ return this.flinkEngineConfig.getConfig();
+ }
+
+ public CheckResult checkConfig() {
+ return this.flinkEngineConfig.checkConfig();
+ }
+}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/resources/META-INF/services/io.datavines.engine.api.Engine b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/resources/META-INF/services/io.datavines.engine.api.Engine
new file mode 100644
index 00000000..0efce937
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/resources/META-INF/services/io.datavines.engine.api.Engine
@@ -0,0 +1 @@
+flink=io.datavines.engine.flink.FlinkEngine