Skip to content

Commit

Permalink
support database sync use single sink
Browse files Browse the repository at this point in the history
  • Loading branch information
wudi committed Nov 27, 2023
1 parent 1e2965d commit 03542e0
Show file tree
Hide file tree
Showing 11 changed files with 374 additions and 52 deletions.
2 changes: 1 addition & 1 deletion flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ under the License.
<revision>1.5.0-SNAPSHOT</revision>
<flink.version>1.18.0</flink.version>
<flink.major.version>1.18</flink.major.version>
<flink.sql.cdc.version>2.4.1</flink.sql.cdc.version>
<flink.sql.cdc.version>2.4.2</flink.sql.cdc.version>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>5.0.0</arrow.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import org.apache.doris.flink.tools.cdc.oracle.OracleType;
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,16 +88,20 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
private boolean ignoreUpdateBefore = true;
private SourceConnector sourceConnector;
private SchemaChangeManager schemaChangeManager;
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;

public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
String sourceTableName,
boolean newSchemaChange) {
this.dorisOptions = dorisOptions;
this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.database = tableInfo[0];
this.table = tableInfo[1];
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.database = tableInfo[0];
this.table = tableInfo[1];
}
this.sourceTableName = sourceTableName;
// Prevent loss of decimal data precision
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
Expand All @@ -120,10 +126,29 @@ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
}
}

public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
String sourceTableName,
boolean newSchemaChange,
DorisExecutionOptions executionOptions,
Map<String, String> tableMapping) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions);
this.tableMapping = tableMapping;
}

@Override
public Tuple2<String, byte[]> serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);

//Filter out table records that are not in tableMapping
String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
LOG.warn("filter table {}, because it is not listened, record detail is {}", cdcTableIdentifier, record);
return null;
}

String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
Expand Down Expand Up @@ -155,7 +180,8 @@ public Tuple2<String, byte[]> serialize(String record) throws IOException {
LOG.error("parse record fail, unknown op {} in {}", op, record);
return null;
}
return Tuple2.of(null, objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));

return Tuple2.of(dorisTableIdentifier, objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
}

/**
Expand Down Expand Up @@ -187,6 +213,13 @@ public boolean schemaChangeV2(JsonNode recordRoot) {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}

// db,table
Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
if(tuple == null){
return false;
}

List<String> ddlSqlList = extractDDLList(recordRoot);
if (CollectionUtils.isEmpty(ddlSqlList)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
Expand All @@ -197,8 +230,8 @@ public boolean schemaChangeV2(JsonNode recordRoot) {
for (int i = 0; i < ddlSqlList.size(); i++) {
DDLSchema ddlSchema = ddlSchemas.get(i);
String ddlSql = ddlSqlList.get(i);
boolean doSchemaChange = checkSchemaChange(ddlSchema);
status = doSchemaChange && schemaChangeManager.execute(ddlSql, database);
boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema);
status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0);
LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
}
} catch (Exception ex) {
Expand All @@ -209,6 +242,7 @@ public boolean schemaChangeV2(JsonNode recordRoot) {

@VisibleForTesting
public List<String> extractDDLList(JsonNode record) throws JsonProcessingException {
String dorisTable = getDorisTableIdentifier(record);
JsonNode historyRecord = extractHistoryRecord(record);
JsonNode tableChanges = historyRecord.get("tableChanges");
String ddl = extractJsonNode(historyRecord, "ddl");
Expand All @@ -233,7 +267,7 @@ public List<String> extractDDLList(JsonNode record) throws JsonProcessingExcepti
String oldColumnName = renameMatcher.group(2);
String newColumnName = renameMatcher.group(3);
return SchemaChangeHelper.generateRenameDDLSql(
dorisOptions.getTableIdentifier(), oldColumnName, newColumnName, originFieldSchemaMap);
dorisTable, oldColumnName, newColumnName, originFieldSchemaMap);
}

// add/drop ddl
Expand All @@ -248,7 +282,7 @@ public List<String> extractDDLList(JsonNode record) throws JsonProcessingExcepti
if (!matcher.find()) {
return null;
}
return SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
return SchemaChangeHelper.generateDDLSql(dorisTable);
}

@VisibleForTesting
Expand All @@ -262,13 +296,20 @@ public boolean schemaChange(JsonNode recordRoot) {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
// db,table
Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
if(tuple == null){
return false;
}

String ddl = extractDDL(recordRoot);
if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
boolean doSchemaChange = checkSchemaChange(ddl);
status = doSchemaChange && schemaChangeManager.execute(ddl, database);

boolean doSchemaChange = checkSchemaChange(ddl, tuple.f0, tuple.f1);
status = doSchemaChange && schemaChangeManager.execute(ddl, tuple.f0);
LOG.info("schema change status:{}", status);
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
Expand All @@ -286,12 +327,48 @@ protected boolean checkTable(JsonNode recordRoot) {
return sourceTableName.equals(dbTbl);
}

private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumentException {
public String getCdcTableIdentifier(JsonNode record){
String db = extractJsonNode(record.get("source"), "db");
String schema = extractJsonNode(record.get("source"), "schema");
String table = extractJsonNode(record.get("source"), "table");
return SourceSchema.getString(db, schema, table);
}

public String getDorisTableIdentifier(String cdcTableIdentifier){
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
return dorisOptions.getTableIdentifier();
}
if(!CollectionUtil.isNullOrEmpty(tableMapping)
&& !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
&& tableMapping.get(cdcTableIdentifier) != null){
return tableMapping.get(cdcTableIdentifier);
}
return null;
}

protected String getDorisTableIdentifier(JsonNode record){
String identifier = getCdcTableIdentifier(record);
return getDorisTableIdentifier(identifier);
}

protected Tuple2<String, String> getDorisTableTuple(JsonNode record){
String identifier = getDorisTableIdentifier(record);
if(StringUtils.isNullOrWhitespaceOnly(identifier)){
return null;
}
String[] tableInfo = identifier.split("\\.");
if(tableInfo.length != 2){
return null;
}
return Tuple2.of(tableInfo[0], tableInfo[1]);
}

private boolean checkSchemaChange(String database, String table, String ddl) throws IOException, IllegalArgumentException {
Map<String, Object> param = buildRequestParam(ddl);
return schemaChangeManager.checkSchemaChange(database, table, param);
}

private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
Map<String, Object> param = SchemaChangeManager.buildRequestParam(ddlSchema.isDropColumn(), ddlSchema.getColumnName());
return schemaChangeManager.checkSchemaChange(database, table, param);
}
Expand Down Expand Up @@ -328,8 +405,6 @@ protected String extractTable(JsonNode record) {
return extractJsonNode(record.get("source"), "table");
}



private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null &&
!(record.get(key) instanceof NullNode) ? record.get(key).asText() : null;
Expand Down Expand Up @@ -369,7 +444,7 @@ public String extractDDL(JsonNode record) throws JsonProcessingException {
String col = matcher.group(3);
String type = matcher.group(5);
type = handleType(type);
ddl = String.format(EXECUTE_DDL, dorisOptions.getTableIdentifier(), op, col, type);
ddl = String.format(EXECUTE_DDL, getDorisTableIdentifier(record), op, col, type);
LOG.info("parse ddl:{}", ddl);
return ddl;
}
Expand Down Expand Up @@ -473,6 +548,11 @@ public void setSourceConnector(String sourceConnector) {
this.sourceConnector = SourceConnector.valueOf(sourceConnector.toUpperCase());
}

@VisibleForTesting
public void setTableMapping(Map<String, String> tableMapping) {
this.tableMapping = tableMapping;
}

public static JsonDebeziumSchemaSerializer.Builder builder() {
return new JsonDebeziumSchemaSerializer.Builder();
}
Expand All @@ -486,6 +566,7 @@ public static class Builder {
private String sourceTableName;
private boolean newSchemaChange;
private DorisExecutionOptions executionOptions;
private Map<String, String> tableMapping;

public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
Expand All @@ -512,9 +593,14 @@ public Builder setExecutionOptions(DorisExecutionOptions executionOptions) {
return this;
}

public Builder setTableMapping(Map<String, String> tableMapping) {
this.tableMapping = tableMapping;
return this;
}

public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange,
executionOptions);
executionOptions, tableMapping);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,28 @@ private static void syncDatabase(MultipleParameterTool params, DatabaseSync data
boolean createTableOnly = params.has("create-table-only");
boolean ignoreDefaultValue = params.has("ignore-default-value");
boolean useNewSchemaChange = params.has("use-new-schema-change");
boolean singleSink = params.has("single-sink");

Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
Map<String, String> tableMap = getConfigMap(params, "table-conf");
Configuration sinkConfig = Configuration.fromMap(sinkMap);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables,multiToOneOrigin,multiToOneTarget, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange);
databaseSync.setEnv(env)
.setDatabase(database)
.setConfig(config)
.setTablePrefix(tablePrefix)
.setTableSuffix(tableSuffix)
.setIncludingTables(includingTables)
.setExcludingTables(excludingTables)
.setMultiToOneOrigin(multiToOneOrigin)
.setMultiToOneTarget(multiToOneTarget)
.setIgnoreDefaultValue(ignoreDefaultValue)
.setSinkConfig(sinkConfig)
.setTableConfig(tableMap)
.setCreateTableOnly(createTableOnly)
.setNewSchemaChange(useNewSchemaChange)
.create();
databaseSync.build();
if(StringUtils.isNullOrWhitespaceOnly(jobName)){
jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db"));
Expand Down
Loading

0 comments on commit 03542e0

Please sign in to comment.