Skip to content

Commit

Permalink
[Improve](schemaChange)schema change type adapts to other connectors (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 authored Oct 11, 2023
1 parent c94c331 commit 222ce60
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.CollectionUtils;

import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import org.apache.doris.flink.tools.cdc.oracle.OracleType;
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
import org.apache.http.HttpHeaders;
Expand Down Expand Up @@ -73,26 +79,28 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
private static final String OP_UPDATE = "u"; // update
private static final String OP_DELETE = "d"; // delete

public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; //alter table tbl add cloumn aca int
private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; // alter table tbl add cloumn aca int
private static final String addDropDDLRegex
= "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
private final Pattern addDropDDLPattern;
private DorisOptions dorisOptions;
private ObjectMapper objectMapper = new ObjectMapper();
private String database;
private String table;
//table name of the cdc upstream, format is db.tbl
// table name of the cdc upstream, format is db.tbl
private String sourceTableName;
private boolean firstLoad;
private boolean firstSchemaChange;
private Map<String, FieldSchema> originFieldSchemaMap;
private final boolean newSchemaChange;
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
private SourceConnector sourceConnector;

public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
String sourceTableName,
boolean newSchemaChange) {
Pattern pattern,
String sourceTableName,
boolean newSchemaChange) {
this.dorisOptions = dorisOptions;
this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
Expand All @@ -109,13 +117,14 @@ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
}

public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
String sourceTableName,
boolean newSchemaChange,
DorisExecutionOptions executionOptions){
Pattern pattern,
String sourceTableName,
boolean newSchemaChange,
DorisExecutionOptions executionOptions) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange);
if(executionOptions != null){
this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
if (executionOptions != null) {
this.lineDelimiter = executionOptions.getStreamLoadProp()
.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
}
}
Expand All @@ -126,7 +135,7 @@ public byte[] serialize(String record) throws IOException {
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
//schema change ddl
// schema change ddl
if (newSchemaChange) {
schemaChangeV2(recordRoot);
} else {
Expand Down Expand Up @@ -160,20 +169,21 @@ public byte[] serialize(String record) throws IOException {

/**
* Change the update event into two
*
* @param recordRoot
* @return
*/
private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException {
StringBuilder updateRow = new StringBuilder();
if(!ignoreUpdateBefore){
//convert delete
if (!ignoreUpdateBefore) {
// convert delete
Map<String, Object> beforeRow = extractBeforeRow(recordRoot);
addDeleteSign(beforeRow, true);
updateRow.append(objectMapper.writeValueAsString(beforeRow))
.append(this.lineDelimiter);
}

//convert insert
// convert insert
Map<String, Object> afterRow = extractAfterRow(recordRoot);
addDeleteSign(afterRow, false);
updateRow.append(objectMapper.writeValueAsString(afterRow));
Expand Down Expand Up @@ -207,14 +217,15 @@ public boolean schemaChangeV2(JsonNode recordRoot) {
}

private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
Map<String,Object> param = buildRequestParam(ddlSchema);
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
Map<String, Object> param = buildRequestParam(ddlSchema);
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpGet);
if (!success) {
LOG.warn("schema change can not do table {}.{}",database,table);
LOG.warn("schema change can not do table {}.{}", database, table);
}
return success;
}
Expand All @@ -224,44 +235,49 @@ public List<String> extractDDLList(JsonNode record) throws JsonProcessingExcepti
JsonNode historyRecord = extractHistoryRecord(record);
JsonNode tableChanges = historyRecord.get("tableChanges");
String ddl = extractJsonNode(historyRecord, "ddl");
if(Objects.isNull(tableChanges) || Objects.isNull(ddl)){
if (Objects.isNull(tableChanges) || Objects.isNull(ddl)) {
return new ArrayList<>();
}
LOG.debug("received debezium ddl :{}", ddl);
JsonNode tableChange = tableChanges.get(0);
Matcher matcher = addDropDDLPattern.matcher(ddl);
if (Objects.isNull(tableChange)|| !tableChange.get("type").asText().equals("ALTER") || !matcher.find()) {
if (Objects.isNull(tableChange) || !tableChange.get("type").asText().equals("ALTER")) {
return null;
}

JsonNode columns = tableChange.get("table").get("columns");
if (firstSchemaChange) {
sourceConnector = SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase());
fillOriginSchema(columns);
}
Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(updateFiledSchema, column);
}
SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap);
// In order to avoid operations such as rename or change, which may lead to the accidental deletion of the doris column.
Matcher matcher = addDropDDLPattern.matcher(ddl);
if (!matcher.find()) {
return null;
}
return SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
}

@VisibleForTesting
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;
try{
if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)){
try {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
String ddl = extractDDL(recordRoot);
if(StringUtils.isNullOrWhitespaceOnly(ddl)){
if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
boolean doSchemaChange = checkSchemaChange(ddl);
status = doSchemaChange && execSchemaChange(ddl);
LOG.info("schema change status:{}", status);
}catch (Exception ex){
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
}
return status;
Expand All @@ -278,25 +294,26 @@ protected boolean checkTable(JsonNode recordRoot) {
}

private void addDeleteSign(Map<String, Object> valueMap, boolean delete) {
if(delete){
if (delete) {
valueMap.put(DORIS_DELETE_SIGN, "1");
}else{
} else {
valueMap.put(DORIS_DELETE_SIGN, "0");
}
}

private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumentException {
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
Map<String,Object> param = buildRequestParam(ddl);
if(param.size() != 2){
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
Map<String, Object> param = buildRequestParam(ddl);
if (param.size() != 2) {
return false;
}
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpGet);
if (!success) {
LOG.warn("schema change can not do table {}.{}",database,table);
LOG.warn("schema change can not do table {}.{}", database, table);
}
return success;
}
Expand All @@ -316,9 +333,9 @@ protected Map<String, Object> buildRequestParam(DDLSchema ddlSchema) {
* }
*/
protected Map<String, Object> buildRequestParam(String ddl) {
Map<String,Object> params = new HashMap<>();
Map<String, Object> params = new HashMap<>();
Matcher matcher = addDropDDLPattern.matcher(ddl);
if(matcher.find()){
if (matcher.find()) {
String op = matcher.group(1);
String col = matcher.group(3);
params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
Expand All @@ -330,7 +347,8 @@ protected Map<String, Object> buildRequestParam(String ddl) {
private boolean execSchemaChange(String ddl) throws IOException, IllegalArgumentException {
Map<String, String> param = new HashMap<>();
param.put("stmt", ddl);
String requestUrl = String.format(SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database);
String requestUrl = String.format(SCHEMA_CHANGE_API,
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database);
HttpPost httpPost = new HttpPost(requestUrl);
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
Expand All @@ -340,10 +358,10 @@ private boolean execSchemaChange(String ddl) throws IOException, IllegalArgument
}

protected String extractDatabase(JsonNode record) {
if(record.get("source").has("schema")){
//compatible with schema
if (record.get("source").has("schema")) {
// compatible with schema
return extractJsonNode(record.get("source"), "schema");
}else{
} else {
return extractJsonNode(record.get("source"), "db");
}
}
Expand All @@ -366,7 +384,7 @@ private boolean handleResponse(HttpUriRequest request) {
LOG.error("schema change response:{}", loadResult);
}
}
}catch(Exception e){
} catch (Exception e) {
LOG.error("http request error,", e);
}
return false;
Expand All @@ -392,10 +410,10 @@ private Map<String, Object> extractRow(JsonNode recordRow) {
}

private JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException {
if(record.has("historyRecord")){
if (record.has("historyRecord")) {
return objectMapper.readTree(record.get("historyRecord").asText());
}
//The ddl passed by some scenes will not be included in the historyRecord, such as DebeziumSourceFunction
// The ddl passed by some scenes will not be included in the historyRecord, such as DebeziumSourceFunction
return record;
}

Expand All @@ -404,9 +422,9 @@ public String extractDDL(JsonNode record) throws JsonProcessingException {
String ddl = extractJsonNode(historyRecord, "ddl");
LOG.debug("received debezium ddl :{}", ddl);
if (!Objects.isNull(ddl)) {
//filter add/drop operation
// filter add/drop operation
Matcher matcher = addDropDDLPattern.matcher(ddl);
if(matcher.find()){
if (matcher.find()) {
String op = matcher.group(1);
String col = matcher.group(3);
String type = matcher.group(5);
Expand All @@ -420,7 +438,8 @@ public String extractDDL(JsonNode record) throws JsonProcessingException {
}

private String authHeader() {
return "Basic " + new String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
return "Basic " + new String(Base64.encodeBase64(
(dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
}

@VisibleForTesting
Expand All @@ -429,16 +448,12 @@ public void fillOriginSchema(JsonNode columns) {
for (JsonNode column : columns) {
String fieldName = column.get("name").asText();
if (originFieldSchemaMap.containsKey(fieldName)) {
JsonNode length = column.get("length");
JsonNode scale = column.get("scale");
String type = MysqlType.toDorisType(column.get("typeName").asText(),
length == null ? 0 : length.asInt(),
scale == null ? 0 : scale.asInt());
String dorisTypeName = buildDorisTypeName(column);
String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
fieldSchema.setName(fieldName);
fieldSchema.setTypeString(type);
fieldSchema.setTypeString(dorisTypeName);
fieldSchema.setComment(comment);
fieldSchema.setDefaultValue(defaultValue);
}
Expand All @@ -453,13 +468,36 @@ public void fillOriginSchema(JsonNode columns) {

private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) {
String fieldName = column.get("name").asText();
JsonNode length = column.get("length");
JsonNode scale = column.get("scale");
String type = MysqlType.toDorisType(column.get("typeName").asText(),
length == null ? 0 : length.asInt(), scale == null ? 0 : scale.asInt());
String dorisTypeName = buildDorisTypeName(column);
String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
filedSchemaMap.put(fieldName, new FieldSchema(fieldName, type, defaultValue, comment));
filedSchemaMap.put(fieldName, new FieldSchema(fieldName, dorisTypeName, defaultValue, comment));
}

@VisibleForTesting
public String buildDorisTypeName(JsonNode column) {
int length = column.get("length") == null ? 0 : column.get("length").asInt();
int scale = column.get("scale") == null ? 0 : column.get("scale").asInt();
String sourceTypeName = column.get("typeName").asText();
String dorisTypeName;
switch (sourceConnector) {
case MYSQL:
dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, scale);
break;
case ORACLE:
dorisTypeName = OracleType.toDorisType(sourceTypeName, length, scale);
break;
case POSTGRES:
dorisTypeName = PostgresType.toDorisType(sourceTypeName, length, scale);
break;
case SQLSERVER:
dorisTypeName = SqlServerType.toDorisType(sourceTypeName, length, scale);
break;
default:
String errMsg = "Not support " + sourceTypeName + " schema change.";
throw new UnsupportedOperationException(errMsg);
}
return dorisTypeName;
}

private String handleDefaultValue(String defaultValue) {
Expand Down Expand Up @@ -493,6 +531,11 @@ public Map<String, FieldSchema> getOriginFieldSchemaMap() {
return originFieldSchemaMap;
}

@VisibleForTesting
public void setSourceConnector(String sourceConnector) {
this.sourceConnector = SourceConnector.valueOf(sourceConnector.toUpperCase());
}

public static JsonDebeziumSchemaSerializer.Builder builder() {
return new JsonDebeziumSchemaSerializer.Builder();
}
Expand Down Expand Up @@ -533,7 +576,8 @@ public Builder setExecutionOptions(DorisExecutionOptions executionOptions) {
}

public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange, executionOptions);
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange,
executionOptions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class SchemaChangeHelper {

public static void compareSchema(Map<String, FieldSchema> updateFiledSchemaMap,
Map<String, FieldSchema> originFieldSchemaMap) {
dropFieldSchemas.clear();
addFieldSchemas.clear();
for (Entry<String, FieldSchema> updateFieldSchema : updateFiledSchemaMap.entrySet()) {
String columName = updateFieldSchema.getKey();
if (!originFieldSchemaMap.containsKey(columName)) {
Expand Down
Loading

0 comments on commit 222ce60

Please sign in to comment.