Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev-1.1.7-webank-dcn' into dev-1…
Browse files Browse the repository at this point in the history
….1.7-webank
  • Loading branch information
jefftlin committed Nov 13, 2024
2 parents c3ab47d + 8ffa8af commit cf926a2
Show file tree
Hide file tree
Showing 28 changed files with 467 additions and 102 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.webank.wedatasphere.exchangis.job.server.utils;
package com.webank.wedatasphere.exchangis.common.util.json;

import com.webank.wedatasphere.exchangis.datasource.core.utils.Json;
import org.apache.commons.lang.StringUtils;

import java.util.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.webank.wedatasphere.exchangis.dao.domain.ExchangisJobParamConfig;
import com.webank.wedatasphere.exchangis.dao.hook.MapperHook;
import com.webank.wedatasphere.exchangis.datasource.core.splitter.DataSourceSplitKey;
import com.webank.wedatasphere.exchangis.datasource.core.splitter.DataSourceSplitStrategy;
import org.apache.linkis.datasource.client.impl.LinkisDataSourceRemoteClient;
import org.apache.linkis.datasource.client.impl.LinkisMetaDataRemoteClient;
Expand Down Expand Up @@ -72,8 +73,8 @@ default String splitStrategyName(){
* Split keys
* @return array
*/
default String[] splitKeys(){
return new String[]{};
default DataSourceSplitKey splitKey(){
return null;
}
/**
* Parameter config in
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package com.webank.wedatasphere.exchangis.datasource.core.splitter;

import java.util.List;
import java.util.Map;

import com.webank.wedatasphere.exchangis.common.util.json.JsonEntity;
import com.webank.wedatasphere.exchangis.datasource.core.utils.Json;
import org.apache.commons.lang.StringUtils;

import java.util.*;
import java.util.function.BiConsumer;

/**
* Split strategy: field split
Expand All @@ -13,8 +18,73 @@ public String name() {
}

@Override
public List<Map<String, Object>> getSplitValues(Map<String, Object> dataSourceParams, String[] splitKeys) {
public List<Map<String, Object>> getSplitValues(Map<String, Object> dataSourceParams,
DataSourceSplitKey splitKey) {
if (Objects.nonNull(splitKey) && Objects.nonNull(dataSourceParams)){
List<Map<String, Object>> result = new ArrayList<>();
doSplit(dataSourceParams, splitKey, (parent, prefix, splitPart) -> {
// splitPart.forEach((partKey, partValue) -> parent.set(prefix + partKey, partValue));
result.add(splitPart);
});
return result;
}
return Collections.singletonList(dataSourceParams);
}

return null;
@SuppressWarnings("unchecked")
private void doSplit(Map<String, Object> dataSourceParams,
DataSourceSplitKey splitKey, SplitPartConsumer splitPartConsumer){
JsonEntity configuration = JsonEntity.from(dataSourceParams);
List<String> keyPaths = JsonEntity.searchKeyPaths(configuration, "", splitKey.getSplitKey(),
StringUtils.splitPreserveAllTokens(splitKey.getSplitKey(), JsonEntity.SPLIT_CHAR).length);
if (!keyPaths.isEmpty()){
String keyPath = keyPaths.get(0);
String prefix = keyPath.substring(0, Math.max(keyPath.lastIndexOf(JsonEntity.SPLIT_CHAR), -1) + 1);
Object splitValues = configuration.get(keyPath);
if (List.class.isAssignableFrom(splitValues.getClass())){
// Empty the split node
configuration.set(splitKey.getSplitKey(), null);
((List<?>) splitValues).forEach(splitValue -> {
Map<String, Object> splitPart = null;
try {
splitPart = (Map<String, Object>)splitValue;
} catch (Exception e){
// Ignore the class cast exception
}
if (Objects.nonNull(splitPart)) {
splitPartConsumer.accept(configuration.getConfiguration(""), prefix, splitPart);
}
});

}
}
}

@FunctionalInterface
private interface SplitPartConsumer{
/**
* Accept
* @param parent parent node
* @param prefix prefix
* @param splitPart split part
*/
void accept(JsonEntity parent, String prefix, Map<String, Object> splitPart);
}
public static void main(String[] args) {
String json = "{\"conn_ins\":[{\"hello\":\"ok\", \"world\":\"right\"},{\"hello1\":\"ok\", \"world\":\"right\"}]}";
Map<String, Object> map = Json.fromJson(json, Map.class);
JsonEntity entity = JsonEntity.from(map);
List<String> paths = JsonEntity.searchKeyPaths(entity, "", "world");
for(String path : paths){
entity.set(path, "********");
}
System.out.println(entity.toJson());
// DataSourceSplitStrategy splitStrategy = new DataSourceFieldSplitStrategy();
// splitStrategy
// .getSplitValues(map,
// new DataSourceSplitKey("conn_ins", new String[]{})).forEach(v -> {
// System.out.println(Json.toJson(v, null));
// });
// System.out.println(Json.toJson(map, null));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.webank.wedatasphere.exchangis.datasource.core.splitter;

/**
* Split key for data source
*/
public class DataSourceSplitKey {
/**
* Split key
*/
private final String splitKey;

/**
* Split values key
*/
private final String[] splitValueKeys;

public DataSourceSplitKey(String splitKey, String[] splitValueKeys){
this.splitKey = splitKey;
this.splitValueKeys = splitValueKeys;
}

public String getSplitKey() {
return splitKey;
}

public String[] getSplitValueKeys() {
return splitValueKeys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ public interface DataSourceSplitStrategy {
* @param dataSourceParams data source params
* @return value list
*/
List<Map<String, Object>> getSplitValues(Map<String, Object> dataSourceParams, String[] splitKeys);
List<Map<String, Object>> getSplitValues(Map<String, Object> dataSourceParams, DataSourceSplitKey splitKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -804,12 +804,14 @@ public void testConnectByVo(String operator, DataSourceCreateVo vo)throws Exchan
Long dsModelId = vo.getDsModelId();
if (Objects.nonNull(dsModelId)) {
DataSourceModel dataSourceModel = modelMapper.selectOne(dsModelId);
// Merge eith dsModel keyDefine
DataSourceModelTypeKeyQuery dsModelTypeKeyQuery = new DataSourceModelTypeKeyQuery();
dsModelTypeKeyQuery.setDsTypeId(vo.getDataSourceTypeId());
List<DataSourceModelTypeKey> dsModelTypeKeys = dataSourceModelTypeKeyMapper.queryList(dsModelTypeKeyQuery);
Map<String, Object> dsConnectParams = dsKeyDefineUtil.mergeModelParamsIntoDs(vo.getConnectParams(), dataSourceModel.resolveParams(), dsModelTypeKeys);
vo.setConnectParams(dsConnectParams);
if (Objects.nonNull(dataSourceModel)) {
// Merge eith dsModel keyDefine
DataSourceModelTypeKeyQuery dsModelTypeKeyQuery = new DataSourceModelTypeKeyQuery();
dsModelTypeKeyQuery.setDsTypeId(vo.getDataSourceTypeId());
List<DataSourceModelTypeKey> dsModelTypeKeys = dataSourceModelTypeKeyMapper.queryList(dsModelTypeKeyQuery);
Map<String, Object> dsConnectParams = dsKeyDefineUtil.mergeModelParamsIntoDs(vo.getConnectParams(), dataSourceModel.resolveParams(), dsModelTypeKeys);
vo.setConnectParams(dsConnectParams);
}
}
Map<String, Object> payLoads = Json.fromJson(Json.toJson(vo, null), Map.class);
Optional.ofNullable(payLoads).ifPresent(pay -> pay.put("labels", pay.get("label")));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package com.webank.wedatasphere.exchangis.job.domain;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.webank.wedatasphere.exchangis.common.util.json.Json;
import com.webank.wedatasphere.exchangis.job.vo.ExchangisJobVo;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;

/**
* Contain the content and parameters
Expand All @@ -21,6 +27,11 @@ public class ExchangisJobInfo extends GenericExchangisJob {
*/
protected String jobParams;

/**
* Job parameter map
*/
@JsonIgnore
protected Map<String, String> jobParamsMap;
/**
* Job description
*/
Expand Down Expand Up @@ -72,6 +83,26 @@ public String getJobParams() {

public void setJobParams(String jobParams) {
this.jobParams = jobParams;
this.jobParamsMap = null;
}

public Map<String, String> getJobParamsMap() {
if (null == this.jobParamsMap){
if (StringUtils.isNotBlank(this.jobParams)){
try {
this.jobParamsMap = Json.fromJson(this.jobParams, null);
return this.jobParamsMap;
} catch (Exception e){
// Ignore the exception
}
}
this.jobParamsMap = new HashMap<>();
}
return jobParamsMap;
}

public void setJobParamsMap(Map<String, String> jobParamsMap) {
this.jobParamsMap = jobParamsMap;
}

public String getJobDesc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import com.webank.wedatasphere.exchangis.job.domain.params.JobParamSet;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

/**
Expand All @@ -18,12 +20,17 @@ public class SubExchangisJob extends GenericExchangisJob {
/**
* Split parts for source direction of job
*/
protected List<Map<String, Object>> sourceSplitParts = new ArrayList<>();
protected List<Map<String, Object>> sourceSplits = new ArrayList<>();

/**
* Sink parts for source direction of job
*/
protected List<Map<String, Object>> sinkSplitParts = new ArrayList<>();
protected List<Map<String, Object>> sinkSplits = new ArrayList<>();

/**
* JobParams
*/
protected final Map<String, Object> jobParams = new HashMap<>();

protected String sourceType;

Expand Down Expand Up @@ -104,6 +111,12 @@ public Map<String, Object> getParamsToMap(String realm, boolean isTemp){
Collectors.toMap(JobParam::getStrKey, JobParam::getValue));
}

public void copyParamSet(BiConsumer<String, JobParamSet> consumer){
this.realmParamSet.forEach((realm, paramSet) -> {
JobParamSet newSet = new JobParamSet(paramSet.toList(false));
consumer.accept(realm, newSet);
});
}
/**
* Get all and convert to map
* @return map
Expand All @@ -130,22 +143,35 @@ public List<ColumnFunction> getColumnFunctions() {
return columnFunctions;
}

public List<Map<String, Object>> getSourceSplitParts() {
return sourceSplitParts;
public List<Map<String, Object>> getSourceSplits() {
return sourceSplits;
}

public void setSourceSplits(List<Map<String, Object>> sourceSplits) {
this.sourceSplits = sourceSplits;
}

public void setSourceSplitParts(List<Map<String, Object>> sourceSplitParts) {
this.sourceSplitParts = sourceSplitParts;
public List<Map<String, Object>> getSinkSplits() {
return sinkSplits;
}

public List<Map<String, Object>> getSinkSplitParts() {
return sinkSplitParts;
public void setSinkSplits(List<Map<String, Object>> sinkSplits) {
this.sinkSplits = sinkSplits;
}

public void setSinkSplitParts(List<Map<String, Object>> sinkSplitParts) {
this.sinkSplitParts = sinkSplitParts;
public Map<String, Object> getJobParams() {
return jobParams;
}


/**
* Copy sub exchangis job
* @return job
*/
public SubExchangisJob copy(){
// Empty
return null;
}
/**
* Column definition
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
public class DefaultJobParam<T> implements JobParam<T> {
private String key;

private String mappingKey;

private T value;

private BiFunction<String, Object, T> valueLoader;
Expand All @@ -24,14 +26,23 @@ public DefaultJobParam(){
}

<U>DefaultJobParam(String key, BiFunction<String, U, T> valueLoader){
this(key, null, valueLoader);
}
<U>DefaultJobParam(String key, String mappingKey, BiFunction<String, U, T> valueLoader){
this.key = key;
this.mappingKey = mappingKey;
setValueLoader(valueLoader);
}
@Override
public String getStrKey() {
return key;
}

@Override
public String getMappingKey() {
return null;
}

@Override
public T getValue() {
return value;
Expand Down Expand Up @@ -87,6 +98,10 @@ public <U> void setValueLoader(BiFunction<String, U, T> valueLoader) {
this.valueLoader = (BiFunction<String, Object, T>) valueLoader;
}

public BiFunction<String, Object, T> getValueLoader() {
return valueLoader;
}

@Override
public boolean isTemp() {
return isTemp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ public interface JobParam<T> {
*/
String getStrKey();

/**
* Mapping key
* @return
*/
String getMappingKey();
/**
* Value of parameter
* @return nullable
Expand Down Expand Up @@ -46,6 +51,7 @@ public interface JobParam<T> {
*/
<U>void setValueLoader(BiFunction<String, U, T> valueLoader);

BiFunction<String, Object, T> getValueLoader();
/**
* Is temporary
* @return
Expand Down
Loading

0 comments on commit cf926a2

Please sign in to comment.