From 0bdc784d7b3c080e4027ca37ba27c58e7594f002 Mon Sep 17 00:00:00 2001 From: winghv Date: Mon, 30 Oct 2023 21:04:54 +0800 Subject: [PATCH 1/3] [Feature][Engine]data profile support spark/livy mode --- .../config/SparkDataProfileMetricBuilder.java | 55 +++++++++++++++++++ .../spark/config/SparkSinkSqlBuilder.java | 16 ++++++ ...ines.engine.config.JobConfigurationBuilder | 2 + .../CatalogEntityInstanceServiceImpl.java | 11 ++++ 4 files changed, 84 insertions(+) create mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/java/io/datavines/engine/spark/config/SparkDataProfileMetricBuilder.java diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/java/io/datavines/engine/spark/config/SparkDataProfileMetricBuilder.java b/datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/java/io/datavines/engine/spark/config/SparkDataProfileMetricBuilder.java new file mode 100644 index 000000000..183d97e66 --- /dev/null +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/java/io/datavines/engine/spark/config/SparkDataProfileMetricBuilder.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.engine.spark.config; + +import io.datavines.common.config.SinkConfig; +import io.datavines.common.config.enums.SinkType; +import io.datavines.common.entity.job.BaseJobParameter; +import io.datavines.common.exception.DataVinesException; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static io.datavines.common.ConfigConstants.METRIC_UNIQUE_KEY; + +public class SparkDataProfileMetricBuilder extends BaseSparkConfigurationBuilder { + + @Override + public void buildSinkConfigs() throws DataVinesException { + List sinkConfigs = new ArrayList<>(); + + List metricJobParameterList = jobExecutionParameter.getMetricParameterList(); + if (CollectionUtils.isNotEmpty(metricJobParameterList)) { + for (BaseJobParameter parameter : metricJobParameterList) { + String metricUniqueKey = getMetricUniqueKey(parameter); + Map metricInputParameter = metric2InputParameter.get(metricUniqueKey); + metricInputParameter.put(METRIC_UNIQUE_KEY, metricUniqueKey); + String profileSinkSql = SparkSinkSqlBuilder.getProfileValueSql().replace("${actual_value}", + "actual_value_" + metricUniqueKey); + + SinkConfig actualValueSinkConfig = getValidateResultDataSinkConfig( + null, profileSinkSql, "dv_catalog_entity_profile", metricInputParameter); + actualValueSinkConfig.setType(SinkType.PROFILE_VALUE.getDescription()); + sinkConfigs.add(actualValueSinkConfig); + } + } + + configuration.setSinkParameters(sinkConfigs); + } +} diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/java/io/datavines/engine/spark/config/SparkSinkSqlBuilder.java b/datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/java/io/datavines/engine/spark/config/SparkSinkSqlBuilder.java index fb6f9e24c..a9322c8c7 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/java/io/datavines/engine/spark/config/SparkSinkSqlBuilder.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/java/io/datavines/engine/spark/config/SparkSinkSqlBuilder.java @@ -65,4 +65,20 @@ public static String getMultiTableComparisonSinkSql() { + " join ( ${expected_execute_sql} ) tmp2"; } + public static String getProfileValueSql() { + + List columnList = new ArrayList<>(MetricConstants.PROFILE_COLUMN_LIST.size()); + + for (ColumnInfo columnInfo : MetricConstants.PROFILE_COLUMN_LIST) { + + if (columnInfo.isNeedSingleQuotation()) { + columnList.add(StringUtils.wrapperSingleQuotes("${" + columnInfo.getParameterName() + "}") + " as " + + columnInfo.getName()); + } else { + columnList.add("${" + columnInfo.getParameterName() + "}" + " as " + columnInfo.getName()); + } + } + + return "select " + String.join(", ", columnList) + " from ${actual_table}"; + } } diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/resources/META-INF/plugins/io.datavines.engine.config.JobConfigurationBuilder b/datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/resources/META-INF/plugins/io.datavines.engine.config.JobConfigurationBuilder index 8d57e852f..ebd3b0f5e 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/resources/META-INF/plugins/io.datavines.engine.config.JobConfigurationBuilder +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-spark/datavines-engine-spark-config/src/main/resources/META-INF/plugins/io.datavines.engine.config.JobConfigurationBuilder @@ -1,6 +1,8 @@ spark_single_table=io.datavines.engine.spark.config.SparkSingleTableMetricBuilder +spark_data_profile=io.datavines.engine.spark.config.SparkDataProfileMetricBuilder spark_multi_table_accuracy=io.datavines.engine.spark.config.SparkMultiTableAccuracyMetricBuilder spark_multi_table_value_comparison=io.datavines.engine.spark.config.SparkMultiTableValueComparisonMetricBuilder livy_single_table=io.datavines.engine.spark.config.SparkSingleTableMetricBuilder +livy_data_profile=io.datavines.engine.spark.config.SparkDataProfileMetricBuilder livy_multi_table_accuracy=io.datavines.engine.spark.config.SparkMultiTableAccuracyMetricBuilder livy_multi_table_value_comparison=io.datavines.engine.spark.config.SparkMultiTableValueComparisonMetricBuilder \ No newline at end of file diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java index 08c26ae5c..cf4e4941f 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java @@ -25,6 +25,7 @@ import io.datavines.common.enums.DataVinesDataType; import io.datavines.common.enums.EntityRelType; import io.datavines.common.enums.JobType; +import io.datavines.common.utils.CommonPropertyUtils; import io.datavines.common.utils.DateUtils; import io.datavines.common.utils.JSONUtils; import io.datavines.common.utils.StringUtils; @@ -926,6 +927,16 @@ public long executeDataProfileJob(RunProfileRequest runProfileRequest, int runni createOrUpdate.setTableName(tableName); createOrUpdate.setSelectedColumn(String.join(",", columns)); createOrUpdate.setRunningNow(runningNow); + String livyEngineParameter = CommonPropertyUtils.getString("profile.engine.parameter.livy"); + if (StringUtils.isNotEmpty(livyEngineParameter)) { + createOrUpdate.setEngineType("livy"); + createOrUpdate.setEngineParameter(livyEngineParameter); + } + String sparkEngineParameter = CommonPropertyUtils.getString("profile.engine.parameter.spark"); + if (StringUtils.isNotEmpty(sparkEngineParameter)) { + createOrUpdate.setEngineType("spark"); + createOrUpdate.setEngineParameter(sparkEngineParameter); + } long jobId = jobService.createOrUpdateDataProfileJob(createOrUpdate); if (jobId != -1L) { From d91aeeded974cc8ae18215a1cbd59af5eea60107 Mon Sep 17 00:00:00 2001 From: winghv Date: Tue, 31 Oct 2023 19:47:13 +0800 Subject: [PATCH 2/3] [Feature][Engine]data profile execute engine param config --- .../common/entity/SparkEngineParameter.java | 68 +++++++++++++++++++ .../CatalogEntityInstanceServiceImpl.java | 47 ++++++++++--- scripts/sql/datavines-mysql.sql | 8 +++ 3 files changed, 113 insertions(+), 10 deletions(-) create mode 100644 datavines-common/src/main/java/io/datavines/common/entity/SparkEngineParameter.java diff --git a/datavines-common/src/main/java/io/datavines/common/entity/SparkEngineParameter.java b/datavines-common/src/main/java/io/datavines/common/entity/SparkEngineParameter.java new file mode 100644 index 000000000..475a52b1a --- /dev/null +++ b/datavines-common/src/main/java/io/datavines/common/entity/SparkEngineParameter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.common.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +@Data +@AllArgsConstructor +@RequiredArgsConstructor +public class SparkEngineParameter { + + /** + * program type + */ + private String programType; + + /** + * deploy mode + */ + private String deployMode; + + /** + * driver-cores Number of cores used by the driver, only in cluster mode + */ + private int driverCores; + + /** + * driver-memory Memory for driver + */ + private String driverMemory; + + /** + * num-executors Number of executors to launch + */ + private int numExecutors; + + /** + * executor-cores Number of cores per executor + */ + private int executorCores; + + /** + * Memory per executor + */ + private String executorMemory; + + /** + * other arguments + */ + private String others; + +} diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java index cf4e4941f..bd751d698 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java @@ -21,6 +21,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.datavines.common.datasource.jdbc.entity.ColumnInfo; +import io.datavines.common.entity.SparkEngineParameter; import io.datavines.common.entity.job.BaseJobParameter; import io.datavines.common.enums.DataVinesDataType; import io.datavines.common.enums.EntityRelType; @@ -927,16 +928,8 @@ public long executeDataProfileJob(RunProfileRequest runProfileRequest, int runni createOrUpdate.setTableName(tableName); createOrUpdate.setSelectedColumn(String.join(",", columns)); createOrUpdate.setRunningNow(runningNow); - String livyEngineParameter = CommonPropertyUtils.getString("profile.engine.parameter.livy"); - if (StringUtils.isNotEmpty(livyEngineParameter)) { - createOrUpdate.setEngineType("livy"); - createOrUpdate.setEngineParameter(livyEngineParameter); - } - String sparkEngineParameter = CommonPropertyUtils.getString("profile.engine.parameter.spark"); - if (StringUtils.isNotEmpty(sparkEngineParameter)) { - createOrUpdate.setEngineType("spark"); - createOrUpdate.setEngineParameter(sparkEngineParameter); - } + engineParameter(createOrUpdate); + long jobId = jobService.createOrUpdateDataProfileJob(createOrUpdate); if (jobId != -1L) { @@ -965,6 +958,40 @@ public long executeDataProfileJob(RunProfileRequest runProfileRequest, int runni return jobId; } + private void engineParameter(DataProfileJobCreateOrUpdate createOrUpdate) { + String profileEngine = CommonPropertyUtils.getString("profile.execute.engine"); + if ("livy".equalsIgnoreCase(profileEngine)) { + + createOrUpdate.setEngineType("livy"); + String deployMode = CommonPropertyUtils.getString("livy.engine.parameter.deploy.mode", "cluster"); + int numExecutors = CommonPropertyUtils.getInt("livy.engine.parameter.num.executors", 1); + int driverCores = CommonPropertyUtils.getInt("livy.engine.parameter.driver.cores", 1); + String driverMemory = CommonPropertyUtils.getString("livy.engine.parameter.driver.memory", "512M"); + int executorCores = CommonPropertyUtils.getInt("livy.engine.parameter.executor.cores", 1); + String executorMemory = CommonPropertyUtils.getString("livy.engine.parameter.executor.memory", "512M"); + String others = CommonPropertyUtils.getString("livy.engine.parameter.others"); + + SparkEngineParameter engineParameter = new SparkEngineParameter("JAVA", deployMode, driverCores, driverMemory, + numExecutors, executorCores, executorMemory, others); + + createOrUpdate.setEngineParameter(JSONUtils.toJsonString(engineParameter)); + } else if ("spark".equalsIgnoreCase(profileEngine)) { + + createOrUpdate.setEngineType("spark"); + String deployMode = CommonPropertyUtils.getString("spark.engine.parameter.deploy.mode", "cluster"); + int numExecutors = CommonPropertyUtils.getInt("spark.engine.parameter.num.executors", 1); + int driverCores = CommonPropertyUtils.getInt("spark.engine.parameter.driver.cores", 1); + String driverMemory = CommonPropertyUtils.getString("spark.engine.parameter.driver.memory", "512M"); + int executorCores = CommonPropertyUtils.getInt("spark.engine.parameter.executor.cores", 1); + String executorMemory = CommonPropertyUtils.getString("spark.engine.parameter.executor.memory", "512M"); + String others = CommonPropertyUtils.getString("spark.engine.parameter.others"); + + SparkEngineParameter engineParameter = new SparkEngineParameter("JAVA", deployMode, driverCores, driverMemory, + numExecutors, executorCores, executorMemory, others); + createOrUpdate.setEngineParameter(JSONUtils.toJsonString(engineParameter)); + } + } + @Override public List listTableRecords(String uuid, String starTime, String endTime) { return catalogEntityProfileService.listTableRecords(uuid, starTime, endTime); diff --git a/scripts/sql/datavines-mysql.sql b/scripts/sql/datavines-mysql.sql index 2806b6567..2eced5b4c 100644 --- a/scripts/sql/datavines-mysql.sql +++ b/scripts/sql/datavines-mysql.sql @@ -827,6 +827,14 @@ INSERT INTO `dv_config` VALUES ('22', '-1', 'livy.task.jars', CONCAT('datavines- 'datavines-engine-api-1.0.0-SNAPSHOT.jar,mysql-connector-java-8.0.16.jar,httpclient-4.4.1.jar,' 'httpcore-4.4.1.jar,postgresql-42.2.6.jar,presto-jdbc-0.283.jar,trino-jdbc-407.jar,clickhouse-jdbc-0.1.53.jar'), '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO `dv_config` VALUES ('23', '-1', 'profile.execute.engine', 'local', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO `dv_config` VALUES ('24', '-1', 'spark.engine.parameter.deploy.mode', 'cluster', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO `dv_config` VALUES ('25', '-1', 'spark.engine.parameter.num.executors', '1', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO `dv_config` VALUES ('26', '-1', 'spark.engine.parameter.driver.cores', '1', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO `dv_config` VALUES ('27', '-1', 'spark.engine.parameter.driver.memory', '512M', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO `dv_config` VALUES ('28', '-1', 'spark.engine.parameter.executor.cores', '1', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO `dv_config` VALUES ('29', '-1', 'spark.engine.parameter.executor.memory', '512M', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO `dv_config` VALUES ('30', '-1', 'spark.engine.parameter.executor.memory', '512M', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); INSERT INTO `dv_user` (`id`, `username`, `password`, `email`, `phone`, `admin`) VALUES ('1', 'admin', '$2a$10$9ZcicUYFl/.knBi9SE53U.Nml8bfNeArxr35HQshxXzimbA6Ipgqq', 'admin@gmail.com', NULL, '0'); INSERT INTO `dv_workspace` (`id`, `name`, `create_by`, `update_by`) VALUES ('1', "admin\'s default", '1', '1'); From e2bbd8969750a4c4a34ae7328b0c37d735405c7e Mon Sep 17 00:00:00 2001 From: winghv Date: Wed, 1 Nov 2023 19:32:40 +0800 Subject: [PATCH 3/3] [Feature][Engine]data profile execute engine param del duplicate config --- scripts/sql/datavines-mysql.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/sql/datavines-mysql.sql b/scripts/sql/datavines-mysql.sql index 2eced5b4c..ec93c2290 100644 --- a/scripts/sql/datavines-mysql.sql +++ b/scripts/sql/datavines-mysql.sql @@ -834,7 +834,6 @@ INSERT INTO `dv_config` VALUES ('26', '-1', 'spark.engine.parameter.driver.cores INSERT INTO `dv_config` VALUES ('27', '-1', 'spark.engine.parameter.driver.memory', '512M', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); INSERT INTO `dv_config` VALUES ('28', '-1', 'spark.engine.parameter.executor.cores', '1', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); INSERT INTO `dv_config` VALUES ('29', '-1', 'spark.engine.parameter.executor.memory', '512M', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); -INSERT INTO `dv_config` VALUES ('30', '-1', 'spark.engine.parameter.executor.memory', '512M', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); INSERT INTO `dv_user` (`id`, `username`, `password`, `email`, `phone`, `admin`) VALUES ('1', 'admin', '$2a$10$9ZcicUYFl/.knBi9SE53U.Nml8bfNeArxr35HQshxXzimbA6Ipgqq', 'admin@gmail.com', NULL, '0'); INSERT INTO `dv_workspace` (`id`, `name`, `create_by`, `update_by`) VALUES ('1', "admin\'s default", '1', '1');