Skip to content

Commit

Permalink
[fix](cdc) single sink add tableprefix and tablesuffix (apache#301)
Browse files Browse the repository at this point in the history
Currently, when single-sink is enabled and CDC automatically creates a table, it cannot automatically obtain the suffix and suffix.
  • Loading branch information
JNSimba authored Jan 23, 2024
1 parent 6e7d67f commit c77f9d7
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.writer.serializer;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -66,6 +67,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
// create table properties
private Map<String, String> tableProperties;
private String targetDatabase;
private String targetTablePrefix;
private String targetTableSuffix;
private JsonDebeziumDataChange dataChange;
private JsonDebeziumSchemaChange schemaChange;

Expand Down Expand Up @@ -109,11 +112,15 @@ public JsonDebeziumSchemaSerializer(
DorisExecutionOptions executionOptions,
Map<String, String> tableMapping,
Map<String, String> tableProperties,
String targetDatabase) {
String targetDatabase,
String targetTablePrefix,
String targetTableSuffix) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions);
this.tableMapping = tableMapping;
this.tableProperties = tableProperties;
this.targetDatabase = targetDatabase;
this.targetTablePrefix = targetTablePrefix;
this.targetTableSuffix = targetTableSuffix;
init();
}

Expand All @@ -128,8 +135,9 @@ private void init() {
objectMapper,
pattern,
lineDelimiter,
ignoreUpdateBefore);

ignoreUpdateBefore,
targetTablePrefix,
targetTableSuffix);
this.schemaChange =
newSchemaChange
? new JsonDebeziumSchemaChangeImplV2(changeContext)
Expand Down Expand Up @@ -180,6 +188,8 @@ public static class Builder {
private Map<String, String> tableMapping;
private Map<String, String> tableProperties;
private String targetDatabase;
private String targetTablePrefix = "";
private String targetTableSuffix = "";

public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
Expand Down Expand Up @@ -221,6 +231,20 @@ public Builder setTargetDatabase(String targetDatabase) {
return this;
}

public Builder setTargetTablePrefix(String tablePrefix) {
if (!StringUtils.isNullOrWhitespaceOnly(tablePrefix)) {
this.targetTablePrefix = tablePrefix;
}
return this;
}

public Builder setTargetTableSuffix(String tableSuffix) {
if (!StringUtils.isNullOrWhitespaceOnly(tableSuffix)) {
this.targetTableSuffix = tableSuffix;
}
return this;
}

public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(
dorisOptions,
Expand All @@ -230,7 +254,9 @@ public JsonDebeziumSchemaSerializer build() {
executionOptions,
tableMapping,
tableProperties,
targetDatabase);
targetDatabase,
targetTablePrefix,
targetTableSuffix);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class JsonDebeziumChangeContext implements Serializable {
private final Pattern pattern;
private final String lineDelimiter;
private final boolean ignoreUpdateBefore;
private String targetTablePrefix;
private String targetTableSuffix;

public JsonDebeziumChangeContext(
DorisOptions dorisOptions,
Expand All @@ -48,7 +50,9 @@ public JsonDebeziumChangeContext(
ObjectMapper objectMapper,
Pattern pattern,
String lineDelimiter,
boolean ignoreUpdateBefore) {
boolean ignoreUpdateBefore,
String targetTablePrefix,
String targetTableSuffix) {
this.dorisOptions = dorisOptions;
this.tableMapping = tableMapping;
this.sourceTableName = sourceTableName;
Expand All @@ -58,6 +62,8 @@ public JsonDebeziumChangeContext(
this.pattern = pattern;
this.lineDelimiter = lineDelimiter;
this.ignoreUpdateBefore = ignoreUpdateBefore;
this.targetTablePrefix = targetTablePrefix;
this.targetTableSuffix = targetTableSuffix;
}

public DorisOptions getDorisOptions() {
Expand Down Expand Up @@ -95,4 +101,12 @@ public String getLineDelimiter() {
public boolean isIgnoreUpdateBefore() {
return ignoreUpdateBefore;
}

public String getTargetTablePrefix() {
return targetTablePrefix;
}

public String getTargetTableSuffix() {
return targetTableSuffix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange {
// create table properties
private final Map<String, String> tableProperties;
private String targetDatabase;
private String targetTablePrefix;
private String targetTableSuffix;

public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) {
this.addDropDDLPattern = Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE);
Expand All @@ -81,6 +83,14 @@ public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) {
this.tableProperties = changeContext.getTableProperties();
this.tableMapping = changeContext.getTableMapping();
this.objectMapper = changeContext.getObjectMapper();
this.targetTablePrefix =
changeContext.getTargetTablePrefix() == null
? ""
: changeContext.getTargetTablePrefix();
this.targetTableSuffix =
changeContext.getTargetTableSuffix() == null
? ""
: changeContext.getTargetTableSuffix();
}

@Override
Expand Down Expand Up @@ -253,7 +263,7 @@ private List<String> buildDistributeKeys(

private String getCreateTableIdentifier(JsonNode record) {
String table = extractJsonNode(record.get("source"), "table");
return targetDatabase + "." + table;
return targetDatabase + "." + targetTablePrefix + table + targetTableSuffix;
}

private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ public DorisSink<String> buildDorisSink(String table) {
.setTableMapping(tableMapping)
.setTableProperties(tableConfig)
.setTargetDatabase(database)
.setTargetTablePrefix(tablePrefix)
.setTargetTableSuffix(tableSuffix)
.build())
.setDorisOptions(dorisBuilder.build());
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public void setUp() {
objectMapper,
null,
lineDelimiter,
ignoreUpdateBefore);
ignoreUpdateBefore,
"",
"");
dataChange = new JsonDebeziumDataChange(changeContext);
}

Expand Down Expand Up @@ -109,7 +111,9 @@ public void testSerializeUpdateBefore() throws IOException {
objectMapper,
null,
lineDelimiter,
false);
false,
"",
"");
dataChange = new JsonDebeziumDataChange(changeContext);

// update t1 set name='doris-update' WHERE id =1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ public void setUp() {
objectMapper,
null,
lineDelimiter,
ignoreUpdateBefore);
ignoreUpdateBefore,
"",
"");
schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ public void setUp() {
objectMapper,
null,
lineDelimiter,
ignoreUpdateBefore);
ignoreUpdateBefore,
"",
"");
schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
private static final String TABLE_4 = "tbl4";

private static final MySQLContainer MYSQL_CONTAINER =
new MySQLContainer("mysql")
new MySQLContainer("mysql:8.0")
.withDatabaseName(DATABASE)
.withUsername(MYSQL_USER)
.withPassword(MYSQL_PASSWD);
Expand Down

0 comments on commit c77f9d7

Please sign in to comment.