Skip to content

Commit

Permalink
[Refactor][Connector] Refactor connector config (#484)
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 authored Dec 7, 2024
1 parent 2955262 commit 9f40d8c
Show file tree
Hide file tree
Showing 71 changed files with 733 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class ConfigConstants {
public static final String CONNECTOR_TYPE = "connector_type";
public static final String DATASOURCE_ID = "datasource_id";
public static final String TABLE = "table";
public static final String VIEW_SQL = "view_sql";
public static final String TABLE_ALIAS = "table_alias";
public static final String TABLE2_ALIAS = "table2_alias";
public static final String TABLE_ALIAS_COLUMNS = "table_alias_columns";
Expand All @@ -37,6 +38,7 @@ public class ConfigConstants {
public static final String ACTUAL_NAME = "actual_name";
public static final String ACTUAL_EXECUTE_SQL = "actual_execute_sql";
public static final String ACTUAL_AGGREGATE_SQL = "actual_aggregate_sql";
public static final String ACTUAL_CUSTOM_SQL = "actual_custom_sql";
public static final String EXPECTED_NAME = "expected_name";
public static final String EXPECTED_TYPE = "expected_type";
public static final String EXPECTED_TABLE = "expected_table";
Expand Down Expand Up @@ -74,9 +76,12 @@ public class ConfigConstants {
public static final String ERROR_DATA_FILE_NAME = "error_data_file_name";
public static final String VALIDATE_RESULT_DATA_DIR = "validate_result_data_dir";
public static final String INVALIDATE_ITEM_CAN_OUTPUT = "invalidate_item_can_output";

public static final String ERROR_DATA_STORAGE_ID = "error_data_storage_id";
public static final String ERROR_DATA_OUTPUT_TO_DATASOURCE_DATABASE = "error_data_output_to_datasource_database";
public static final String DATASOURCE = "datasource";
public static final String DATABASE = "database";
public static final String SID = "sid";
public static final String DATABASE_NAME = "database_name";
public static final String TABLE_NAME = "table_name";
public static final String COLUMN_NAME = "column_name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
public enum SourceType {

/**
* 0 normal
* 1 invalidate items
* 2 actual value
* 0 source
* 1 target
* 2 metadata
**/
SOURCE(0, "source"),
TARGET(1, "target"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
*/
package io.datavines.common.datasource.jdbc;

import io.datavines.common.utils.Md5Utils;
import io.datavines.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Map;

import static io.datavines.common.ConfigConstants.*;

/**
* data source base class
Expand All @@ -30,26 +34,26 @@ public abstract class BaseJdbcDataSourceInfo {

private static final Logger logger = LoggerFactory.getLogger(BaseJdbcDataSourceInfo.class);

protected final JdbcConnectionInfo jdbcConnectionInfo;
protected final Map<String,String> param;

public BaseJdbcDataSourceInfo(JdbcConnectionInfo jdbcConnectionInfo) {
this.jdbcConnectionInfo = jdbcConnectionInfo;
public BaseJdbcDataSourceInfo(Map<String,String> param) {
this.param = param;
}

public String getUser() {
return jdbcConnectionInfo.getUser();
return param.get(USER);
}

public String getPassword() {
return jdbcConnectionInfo.getPassword();
return param.get(PASSWORD);
}

public String getHost() {
return jdbcConnectionInfo.getHost();
return param.get(HOST);
}

public String getPort() {
return jdbcConnectionInfo.getPort();
return param.get(PORT);
}

public String getValidationQuery() {
Expand All @@ -59,19 +63,19 @@ public String getValidationQuery() {
public abstract String getAddress();

public String getCatalog() {
return jdbcConnectionInfo.getCatalog();
return param.get(CATALOG);
}

public String getDatabase() {
return jdbcConnectionInfo.getDatabase();
return param.get(DATABASE);
}

public String getSchema() {
return jdbcConnectionInfo.getSchema();
return param.get(SCHEMA);
}

public String getProperties() {
return jdbcConnectionInfo.getProperties();
return param.get(PROPERTIES);
}

/**
Expand Down Expand Up @@ -159,6 +163,21 @@ public void loadClass() {
}

public String getUniqueKey() {
return jdbcConnectionInfo.getUniqueKey();
return Md5Utils.getMd5(paramToString(), false);
}

public String paramToString() {
return getHost().trim() +
"&" + getPort() +
"&" + getOrEmpty(getCatalog()) +
"&" + getOrEmpty(getDatabase()) +
"&" + getOrEmpty(getUser()) +
"&" + getOrEmpty(getPassword()) +
"&" + getProperties();
}

private String getOrEmpty(String keyword) {
return StringUtils.isNotEmpty(keyword)? keyword.trim() : "";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package io.datavines.common.datasource.jdbc;

import java.util.Map;

public interface IJdbcDataSourceInfo {

BaseJdbcDataSourceInfo getDatasourceInfo(JdbcConnectionInfo jdbcConnectionInfo);
BaseJdbcDataSourceInfo getDatasourceInfo(Map<String,String> param);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

import lombok.Data;

import java.io.Serializable;

@Data
public class QueryColumn {
public class QueryColumn implements Serializable {

private static final long serialVersionUID = -2398995167525051291L;

private String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
package io.datavines.common.enums;

import java.util.HashMap;

import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;

import java.util.HashMap;

/**
* running status for workflow and task nodes
*
Expand Down Expand Up @@ -55,7 +55,7 @@ public enum ExecutionStatus {
WAITING_THREAD(10, "waiting thread", "等待线程"),
WAITING_SUMMIT(11, "waiting_summit","待提交");

ExecutionStatus(int code, String description,String zhDescription){
ExecutionStatus(int code, String description, String zhDescription){
this.code = code;
this.description = description;
this.zhDescription = zhDescription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum JobType {

@EnumValue
private final int code;

private final String description;

private final String zhDescription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public enum OperatorType {
GTE(4,"gte",">="),
NE(5,"neq","!=");

OperatorType(int code, String description,String symbol) {
OperatorType(int code, String description, String symbol) {
this.code = code;
this.description = description;
this.symbol = symbol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class ParameterUtils {

private static final char PARAM_REPLACE_CHAR = '?';


private ParameterUtils() {
throw new UnsupportedOperationException("Construct ParameterUtils");
}
Expand Down Expand Up @@ -74,7 +73,6 @@ public static String convertParameterPlaceholders(String parameterString, Map<St
}

/**
* new
* convert parameters place holders
*
* @param parameterString parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ public interface ConnectorFactory {
ConfigBuilder getConfigBuilder();

DataSourceClient getDataSourceClient();

StatementSplitter getStatementSplitter();

StatementParser getStatementParser();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.connector.api;

import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.entity.ScriptMetadata;
import io.datavines.connector.api.entity.StatementMetadata;
import org.apache.commons.collections4.CollectionUtils;

import java.time.LocalDateTime;
import java.util.List;

public class LineageParser {

public static ScriptMetadata parseScript(String script, StatementSplitter statementSplitter, StatementParser statementParser) {
if (StringUtils.isEmpty(script)) {
return null;
}

ScriptMetadata scriptMetadata = new ScriptMetadata();
scriptMetadata.setScript(script);
List<String> statements = statementSplitter.splitStatements(script);

if (CollectionUtils.isEmpty(statements)) {
return null;
}

for (int i=0; i<statements.size(); i++) {
StatementMetadata statementMetadata = new StatementMetadata();
statementMetadata.setStatementIndex(i);
statementMetadata.setStatementText(statements.get(i));
statementMetadata.setStatementParseStartTime(LocalDateTime.now());
statementMetadata.setStatementMetadataFragment(statementParser.parseStatement(statements.get(i)));
statementMetadata.setStatementParseEndTime(LocalDateTime.now());
scriptMetadata.addStatementMetadata(statementMetadata);
}

return scriptMetadata;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.connector.api;

import io.datavines.connector.api.entity.StatementMetadataFragment;

public interface StatementParser {

StatementMetadataFragment parseStatement(String statement);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.connector.api;

import java.util.List;

public interface StatementSplitter {

List<String> splitStatements(String body);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.connector.api.entity;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.List;

@Data
@AllArgsConstructor
public class ColumnLineage {

private List<String> inputColumns;

private List<String> outputColumns;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.connector.api.entity;

public class MetaDataConstants {

public final static String UNKNOWN_STATEMENT_TYPE = "UNKNOWN";
}
Loading

0 comments on commit 9f40d8c

Please sign in to comment.