Skip to content

Commit

Permalink
[Fix][Engine] Fix spark engine execute error (#302)
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 authored Dec 4, 2023
1 parent 0b0e74b commit 622a70e
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public class ConfigConstants {
public static final String INVALIDATE_ITEM_CAN_OUTPUT = "invalidate_item_can_output";
public static final String ERROR_DATA_OUTPUT_TO_DATASOURCE_DATABASE = "error_data_output_to_datasource_database";
public static final String DATABASE = "database";

public static final String DATABASE_NAME = "database_name";
public static final String TABLE_NAME = "table_name";
public static final String COLUMN_NAME = "column_name";
public static final String SCHEMA = "schema";
public static final String SCHEMA2 = "schema2";
public static final String USER = "user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ default String getFullQualifiedTableName(String database, String schema, String
return table;
}

default String getFullQualifiedTableNameForSpark(String database, String schema, String table) {
table = "`" + table + "`";

if (!StringUtils.isEmptyOrNullStr(schema)) {
table = "`" + schema + "`." + table ;
}

if (!StringUtils.isEmptyOrNullStr(database)) {
table = "`" + database + "`." + table;
}

return table;
}

default boolean invalidateItemCanOutput(){
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
connectorParameterMap = connectorFactory.getConnectorParameterConverter().converter(connectorParameterMap);
String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);

metricInputParameter.put(DATABASE_NAME,metricInputParameter.get(DATABASE));
metricInputParameter.put(TABLE_NAME,metricInputParameter.get(TABLE));
metricInputParameter.put(COLUMN_NAME,metricInputParameter.get(COLUMN));

String table = connectorFactory.getDialect()
.getFullQualifiedTableName(metricInputParameter.get(DATABASE),metricInputParameter.get(SCHEMA),metricInputParameter.get(TABLE));
connectorParameterMap.put(TABLE, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,16 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
metricInputParameter.put(SCHEMA, (String)connectorParameter.getParameters().get(SCHEMA));
}

metricInputParameter.put(DATABASE_NAME,metricInputParameter.get(DATABASE));
metricInputParameter.put(TABLE_NAME,metricInputParameter.get(TABLE));
metricInputParameter.put(COLUMN_NAME,metricInputParameter.get(COLUMN));

ConnectorFactory connectorFactory = PluginLoader
.getPluginLoader(ConnectorFactory.class)
.getNewPlugin(connectorParameter.getType());
String table = connectorFactory.getDialect()
.getFullQualifiedTableName(metricInputParameter.get(DATABASE),metricInputParameter.get(SCHEMA),metricInputParameter.get(TABLE));
.getFullQualifiedTableNameForSpark(metricInputParameter.get(DATABASE),
metricInputParameter.get(SCHEMA),metricInputParameter.get(TABLE));

connectorParameterMap.put(TABLE, table);
connectorParameterMap.put(DATABASE, metricInputParameter.get(DATABASE));
Expand Down Expand Up @@ -126,7 +131,7 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
ConnectorFactory connectorFactory = PluginLoader
.getPluginLoader(ConnectorFactory.class)
.getNewPlugin(connectorParameter2.getType());
String table = connectorFactory.getDialect().getFullQualifiedTableName(metricInputParameter.get(DATABASE2),
String table = connectorFactory.getDialect().getFullQualifiedTableNameForSpark(metricInputParameter.get(DATABASE2),
metricInputParameter.get(SCHEMA2),
metricInputParameter.get(TABLE2));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@

import io.datavines.engine.core.BaseDataVinesBootstrap;

import java.util.Base64;

public class SparkDataVinesBootstrap extends BaseDataVinesBootstrap {

public static void main(String[] args) {
SparkDataVinesBootstrap bootstrap = new SparkDataVinesBootstrap();
bootstrap.execute(args);
if (args.length == 1) {
String arg = args[0];
args[0] = new String(Base64.getDecoder().decode(arg));
bootstrap.execute(args);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Base64;
import java.util.Objects;

import io.datavines.common.entity.JobExecutionRequest;
import io.datavines.common.utils.FileUtils;
Expand Down Expand Up @@ -127,7 +129,7 @@ protected String buildCommand() {
JSONUtils.parseObject(jobExecutionRequest.getApplicationParameter(), DataVinesJobConfig.class);

sparkParameters.setMainArgs("\""
+ StringUtils.replaceDoubleBrackets(StringUtils.escapeJava(JSONUtils.toJsonString(configuration))) + "\"");
+ Base64.getEncoder().encodeToString(Objects.requireNonNull(JSONUtils.toJsonString(configuration)).getBytes()) + "\"");

sparkParameters.setMainClass("io.datavines.engine.spark.core.SparkDataVinesBootstrap");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public class MetricConstants {
RESULT_COLUMN_LIST.add(new ColumnInfo("metric_type",false));
RESULT_COLUMN_LIST.add(new ColumnInfo("metric_name",false));
RESULT_COLUMN_LIST.add(new ColumnInfo("metric_dimension",false));
RESULT_COLUMN_LIST.add(new ColumnInfo("database_name",true,"database"));
RESULT_COLUMN_LIST.add(new ColumnInfo("table_name",true,"table"));
RESULT_COLUMN_LIST.add(new ColumnInfo("column_name",true,"column"));
RESULT_COLUMN_LIST.add(new ColumnInfo("database_name",true));
RESULT_COLUMN_LIST.add(new ColumnInfo("table_name",true));
RESULT_COLUMN_LIST.add(new ColumnInfo("column_name",true));
RESULT_COLUMN_LIST.add(new ColumnInfo("actual_value",false));
RESULT_COLUMN_LIST.add(new ColumnInfo("expected_value",false));
RESULT_COLUMN_LIST.add(new ColumnInfo("expected_type",false));
Expand Down

0 comments on commit 622a70e

Please sign in to comment.