Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
wudi committed Nov 30, 2023
1 parent 0eb3aa7 commit 9f57d05
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
private SchemaChangeManager schemaChangeManager;
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;
// create table properties
private Map<String, String> tableProperties;
private String targetDatabase;

public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
Expand Down Expand Up @@ -134,24 +137,19 @@ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
String sourceTableName,
boolean newSchemaChange,
DorisExecutionOptions executionOptions,
Map<String, String> tableMapping) {
Map<String, String> tableMapping,
Map<String, String> tableProperties,
String targetDatabase) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions);
this.tableMapping = tableMapping;
this.tableProperties = tableProperties;
this.targetDatabase = targetDatabase;
}

@Override
public DorisRecord serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);

//Filter out table records that are not in tableMapping
String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
LOG.warn("filter table {}, because it is not listened, record detail is {}", cdcTableIdentifier, record);
return null;
}

String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
Expand All @@ -166,6 +164,15 @@ public DorisRecord serialize(String record) throws IOException {
if (newSchemaChange && firstLoad) {
initOriginFieldSchema(recordRoot);
}

//Filter out table records that are not in tableMapping
String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
LOG.warn("filter table {}, because it is not listened, record detail is {}", cdcTableIdentifier, record);
return null;
}

Map<String, Object> valueMap;
switch (op) {
case OP_READ:
Expand Down Expand Up @@ -313,11 +320,16 @@ public List<String> extractDDLList(JsonNode record) throws IOException{

@VisibleForTesting
public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException {
if(sourceConnector == null){
sourceConnector = SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase());
}

String dorisTable = getCreateTableIdentifier(record);
JsonNode tableChange = extractTableChange(record);
JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames");
JsonNode columns = tableChange.get("table").get("columns");
String tblComment = tableChange.get("table").get("comment").asText();
JsonNode comment = tableChange.get("table").get("comment");
String tblComment = comment == null ? "" : comment.asText();
Map<String, FieldSchema> field = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(field, column);
Expand All @@ -333,6 +345,7 @@ public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessi
tableSchema.setKeys(pkList);
tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
tableSchema.setTableComment(tblComment);
tableSchema.setProperties(tableProperties);

String[] split = dorisTable.split("\\.");
Preconditions.checkArgument(split.length == 2);
Expand Down Expand Up @@ -402,9 +415,8 @@ public String getCdcTableIdentifier(JsonNode record){
}

public String getCreateTableIdentifier(JsonNode record){
String db = extractJsonNode(record.get("source"), "db");
String table = extractJsonNode(record.get("source"), "table");
return db + "." + table;
return targetDatabase + "." + table;
}

public String getDorisTableIdentifier(String cdcTableIdentifier){
Expand Down Expand Up @@ -657,6 +669,8 @@ public static class Builder {
private boolean newSchemaChange;
private DorisExecutionOptions executionOptions;
private Map<String, String> tableMapping;
private Map<String, String> tableProperties;
private String targetDatabase;

public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
Expand Down Expand Up @@ -688,9 +702,19 @@ public Builder setTableMapping(Map<String, String> tableMapping) {
return this;
}

public Builder setTableProperties(Map<String, String> tableProperties) {
this.tableProperties = tableProperties;
return this;
}

public Builder setTargetDatabase(String targetDatabase) {
this.targetDatabase = targetDatabase;
return this;
}

public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange,
executionOptions, tableMapping);
executionOptions, tableMapping, tableProperties, targetDatabase);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ public DorisSink<String> buildDorisSink(String table) {
.setNewSchemaChange(newSchemaChange)
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
.setTableProperties(tableConfig)
.setTargetDatabase(database)
.build())
.setDorisOptions(dorisBuilder.build());
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
.username(config.get(MySqlSourceOptions.USERNAME))
.password(config.get(MySqlSourceOptions.PASSWORD))
.databaseList(databaseName)
.tableList(tableName)
//default open add newly table
.scanNewlyAddedTableEnabled(true);
.tableList(tableName);

config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
config
Expand Down

0 comments on commit 9f57d05

Please sign in to comment.