Skip to content

Commit

Permalink
Extract schema change
Browse files Browse the repository at this point in the history
  • Loading branch information
wudi committed Nov 21, 2023
1 parent 0a2d85f commit 6f7ac76
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public boolean schemaChange(String database, String table, Map<String, Object> p
return false;
}

private Map<String, Object> buildRequestParam(boolean dropColumn, String columnName) {
public static Map<String, Object> buildRequestParam(boolean dropColumn, String columnName) {
Map<String, Object> params = new HashMap<>();
params.put("isDropColumn", dropColumn);
params.put("columnName", columnName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.doris.flink.sink.util;

import java.util.Map;

import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;

public class DeleteOperation {
public static void addDeleteSign(Map<String, Object> valueMap, boolean delete) {
if (delete) {
valueMap.put(DORIS_DELETE_SIGN, "1");
} else {
valueMap.put(DORIS_DELETE_SIGN, "0");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,21 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.CollectionUtils;

import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import org.apache.doris.flink.tools.cdc.oracle.OracleType;
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -65,15 +54,12 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> {

private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s";
private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
private static final String OP_READ = "r"; // snapshot read
private static final String OP_CREATE = "c"; // insert
private static final String OP_UPDATE = "u"; // update
Expand All @@ -98,6 +84,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
private SourceConnector sourceConnector;
private SchemaChangeManager schemaChangeManager;

public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
Expand All @@ -116,6 +103,7 @@ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
this.newSchemaChange = newSchemaChange;
this.firstLoad = true;
this.firstSchemaChange = true;
this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
}

public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Expand Down Expand Up @@ -209,7 +197,7 @@ public boolean schemaChangeV2(JsonNode recordRoot) {
DDLSchema ddlSchema = ddlSchemas.get(i);
String ddlSql = ddlSqlList.get(i);
boolean doSchemaChange = checkSchemaChange(ddlSchema);
status = doSchemaChange && execSchemaChange(ddlSql);
status = doSchemaChange && schemaChangeManager.execute(ddlSql);
LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
}
} catch (Exception ex) {
Expand All @@ -218,20 +206,6 @@ public boolean schemaChangeV2(JsonNode recordRoot) {
return status;
}

private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
Map<String, Object> param = buildRequestParam(ddlSchema);
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpGet);
if (!success) {
LOG.warn("schema change can not do table {}.{}", database, table);
}
return success;
}

@VisibleForTesting
public List<String> extractDDLList(JsonNode record) throws JsonProcessingException {
JsonNode historyRecord = extractHistoryRecord(record);
Expand Down Expand Up @@ -293,7 +267,7 @@ public boolean schemaChange(JsonNode recordRoot) {
return false;
}
boolean doSchemaChange = checkSchemaChange(ddl);
status = doSchemaChange && execSchemaChange(ddl);
status = doSchemaChange && schemaChangeManager.execute(ddl);
LOG.info("schema change status:{}", status);
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
Expand All @@ -311,36 +285,14 @@ protected boolean checkTable(JsonNode recordRoot) {
return sourceTableName.equals(dbTbl);
}

private void addDeleteSign(Map<String, Object> valueMap, boolean delete) {
if (delete) {
valueMap.put(DORIS_DELETE_SIGN, "1");
} else {
valueMap.put(DORIS_DELETE_SIGN, "0");
}
}

private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumentException {
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
Map<String, Object> param = buildRequestParam(ddl);
if (param.size() != 2) {
return false;
}
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpGet);
if (!success) {
LOG.warn("schema change can not do table {}.{}", database, table);
}
return success;
return schemaChangeManager.checkSchemaChange(database, table, param);
}

protected Map<String, Object> buildRequestParam(DDLSchema ddlSchema) {
Map<String, Object> params = new HashMap<>();
params.put("isDropColumn", ddlSchema.isDropColumn());
params.put("columnName", ddlSchema.getColumnName());
return params;
private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
Map<String, Object> param = SchemaChangeManager.buildRequestParam(ddlSchema.isDropColumn(), ddlSchema.getColumnName());
return schemaChangeManager.checkSchemaChange(database, table, param);
}

/**
Expand All @@ -362,19 +314,6 @@ protected Map<String, Object> buildRequestParam(String ddl) {
return params;
}

private boolean execSchemaChange(String ddl) throws IOException, IllegalArgumentException {
Map<String, String> param = new HashMap<>();
param.put("stmt", ddl);
String requestUrl = String.format(SCHEMA_CHANGE_API,
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database);
HttpPost httpPost = new HttpPost(requestUrl);
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpPost);
return success;
}

protected String extractDatabase(JsonNode record) {
if (record.get("source").has("schema")) {
// compatible with schema
Expand All @@ -388,25 +327,7 @@ protected String extractTable(JsonNode record) {
return extractJsonNode(record.get("source"), "table");
}

private boolean handleResponse(HttpUriRequest request) {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(request);
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, Object> responseMap = objectMapper.readValue(loadResult, Map.class);
String code = responseMap.getOrDefault("code", "-1").toString();
if (code.equals("0")) {
return true;
} else {
LOG.error("schema change response:{}", loadResult);
}
}
} catch (Exception e) {
LOG.error("http request error,", e);
}
return false;
}


private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null &&
Expand Down Expand Up @@ -455,10 +376,7 @@ public String extractDDL(JsonNode record) throws JsonProcessingException {
return null;
}

private String authHeader() {
return "Basic " + new String(Base64.encodeBase64(
(dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
}


@VisibleForTesting
public void fillOriginSchema(JsonNode columns) {
Expand Down

0 comments on commit 6f7ac76

Please sign in to comment.