Skip to content

Commit

Permalink
[Fix-2628] [core] Fix set config is not effective (DataLinkDC#2629)
Browse files Browse the repository at this point in the history
Co-authored-by: wenmo <[email protected]>
  • Loading branch information
aiwenmo and aiwenmo authored Dec 12, 2023
1 parent 4aa5794 commit 4093fc7
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.trans.ddl.CustomSetOperation;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.LineageContext;

Expand Down Expand Up @@ -106,6 +107,15 @@ public boolean parseAndLoadConfiguration(String statement, Map<String, Object> s
} else if (operation instanceof ResetOperation) {
callReset((ResetOperation) operation, getStreamExecutionEnvironment(), setMap);
return true;
} else if (operation instanceof CustomSetOperation) {
CustomSetOperation customSetOperation = (CustomSetOperation) operation;
if (customSetOperation.isValid()) {
callSet(
new SetOperation(customSetOperation.getKey(), customSetOperation.getValue()),
getStreamExecutionEnvironment(),
setMap);
}
return true;
}
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,36 @@
*
* @since 2021/10/21 19:56
*/
public class SetOperation extends AbstractOperation implements ExtendOperation {
public class CustomSetOperation extends AbstractOperation implements ExtendOperation {

public SetOperation() {}
private String key;
private String value;

public SetOperation(String statement) {
public CustomSetOperation() {}

public CustomSetOperation(String statement) {
super(statement);
parseConfig();
}

public String getKey() {
return key;
}

public String getValue() {
return value;
}

public boolean isValid() {
return Asserts.isAllNotNullString(key, value);
}

private void parseConfig() {
Map<String, List<String>> map = SetSqlParseStrategy.getInfo(statement);
if (Asserts.isNotNullMap(map) && map.size() == 2) {
key = StringUtils.join(map.get("SET"), ".");
value = StringUtils.join(map.get("="), ",");
}
}

@Override
Expand All @@ -58,10 +82,9 @@ public Optional<? extends TableResult> execute(CustomTableEnvironment tEnv) {
} catch (ClassNotFoundException e) {
logger.error("Class not found: org.apache.log4j.Logger");
}
Map<String, List<String>> map = SetSqlParseStrategy.getInfo(statement);
if (Asserts.isNotNullMap(map) && map.size() == 2) {
if (Asserts.isAllNotNullString(key, value)) {
Map<String, String> confMap = new HashMap<>();
confMap.put(StringUtils.join(map.get("SET"), "."), StringUtils.join(map.get("="), ","));
confMap.put(key, value);
TableConfig config = tEnv.getConfig();
config.addConfiguration(Configuration.fromMap(confMap));
Configuration configuration = Configuration.fromMap(confMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.dinky.trans.parse;

import org.dinky.parser.SqlSegment;
import org.dinky.trans.ddl.SetOperation;
import org.dinky.trans.ddl.CustomSetOperation;
import org.dinky.utils.SqlSegmentUtil;

import org.apache.flink.table.operations.Operation;
Expand Down Expand Up @@ -51,13 +51,13 @@ public static Map<String, List<String>> getInfo(String statement) {
// SET(\s+(\S+)\s*=(.*))?
List<SqlSegment> segments = new ArrayList<>();
segments.add(new SqlSegment("(set)\\s+(.+)(\\s*=)", "[.]"));
segments.add(new SqlSegment("(=)\\s*(.*)( ENDOFSQL)", ","));
segments.add(new SqlSegment("(=)\\s*(.*)($)", ","));
return SqlSegmentUtil.splitSql2Segment(segments, statement);
}

@Override
public Operation convert(String statement) {
return new SetOperation(statement);
return new CustomSetOperation(statement);
}

@Override
Expand Down

0 comments on commit 4093fc7

Please sign in to comment.