Skip to content

Commit

Permalink
[fix]Fix multi-table sink, schema change result is incorrect (apache#313
Browse files Browse the repository at this point in the history
)
  • Loading branch information
DongLiang-0 authored Feb 1, 2024
1 parent 0566b92 commit 99f2da9
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumDataChange;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl;
Expand All @@ -36,8 +37,10 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
Expand All @@ -58,7 +61,6 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
private final ObjectMapper objectMapper = new ObjectMapper();
// table name of the cdc upstream, format is db.tbl
private final String sourceTableName;
private boolean firstLoad;
private final boolean newSchemaChange;
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
Expand All @@ -71,6 +73,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
private String targetTableSuffix;
private JsonDebeziumDataChange dataChange;
private JsonDebeziumSchemaChange schemaChange;
private final Set<String> initTableSet = new HashSet<>();

public JsonDebeziumSchemaSerializer(
DorisOptions dorisOptions,
Expand All @@ -85,7 +88,6 @@ public JsonDebeziumSchemaSerializer(
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.newSchemaChange = newSchemaChange;
this.firstLoad = true;
}

public JsonDebeziumSchemaSerializer(
Expand Down Expand Up @@ -156,13 +158,24 @@ public DorisRecord serialize(String record) throws IOException {
return null;
}

if (firstLoad) {
schemaChange.init(recordRoot);
firstLoad = false;
this.tableMapping = schemaChange.getTableMapping();
String dorisTableName =
JsonDebeziumChangeUtils.getDorisTableIdentifier(
recordRoot, dorisOptions, tableMapping);
if (initSchemaChange(dorisTableName)) {
schemaChange.init(recordRoot, dorisTableName);
}
return dataChange.serialize(record, recordRoot, op);
}

private boolean initSchemaChange(String dorisTableName) {
if (initTableSet.contains(dorisTableName)) {
return false;
}
initTableSet.add(dorisTableName);
return true;
}

private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode)
? record.get(key).asText()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.writer.serializer.jsondebezium;

import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.tools.cdc.SourceSchema;

import java.util.Map;

public class JsonDebeziumChangeUtils {

public static String getDorisTableIdentifier(
JsonNode record, DorisOptions dorisOptions, Map<String, String> tableMapping) {
String identifier = getCdcTableIdentifier(record);
return getDorisTableIdentifier(identifier, dorisOptions, tableMapping);
}

public static String getDorisTableIdentifier(
String cdcTableIdentifier,
DorisOptions dorisOptions,
Map<String, String> tableMapping) {
if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
return dorisOptions.getTableIdentifier();
}
if (!CollectionUtil.isNullOrEmpty(tableMapping)
&& !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
&& tableMapping.get(cdcTableIdentifier) != null) {
return tableMapping.get(cdcTableIdentifier);
}
return null;
}

public static String getCdcTableIdentifier(JsonNode record) {
String db = extractJsonNode(record.get("source"), "db");
String schema = extractJsonNode(record.get("source"), "schema");
String table = extractJsonNode(record.get("source"), "table");
return SourceSchema.getString(db, schema, table);
}

public static String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode)
? record.get(key).asText()
: null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,15 @@

package org.apache.doris.flink.sink.writer.serializer.jsondebezium;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.writer.ChangeEvent;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,7 +52,7 @@ public class JsonDebeziumDataChange implements ChangeEvent {
private final DorisOptions dorisOptions;
private final boolean ignoreUpdateBefore;
private final String lineDelimiter;
private JsonDebeziumChangeContext changeContext;
private final JsonDebeziumChangeContext changeContext;

public JsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) {
this.changeContext = changeContext;
Expand All @@ -68,8 +64,11 @@ public JsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) {

public DorisRecord serialize(String record, JsonNode recordRoot, String op) throws IOException {
// Filter out table records that are not in tableMapping
String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
Map<String, String> tableMapping = changeContext.getTableMapping();
String cdcTableIdentifier = JsonDebeziumChangeUtils.getCdcTableIdentifier(recordRoot);
String dorisTableIdentifier =
JsonDebeziumChangeUtils.getDorisTableIdentifier(
cdcTableIdentifier, dorisOptions, tableMapping);
if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
LOG.warn(
"filter table {}, because it is not listened, record detail is {}",
Expand Down Expand Up @@ -123,34 +122,6 @@ private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException
return updateRow.toString().getBytes(StandardCharsets.UTF_8);
}

@VisibleForTesting
public String getCdcTableIdentifier(JsonNode record) {
String db = extractJsonNode(record.get("source"), "db");
String schema = extractJsonNode(record.get("source"), "schema");
String table = extractJsonNode(record.get("source"), "table");
return SourceSchema.getString(db, schema, table);
}

@VisibleForTesting
public String getDorisTableIdentifier(String cdcTableIdentifier) {
if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
return dorisOptions.getTableIdentifier();
}
Map<String, String> tableMapping = changeContext.getTableMapping();
if (!CollectionUtil.isNullOrEmpty(tableMapping)
&& !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
&& tableMapping.get(cdcTableIdentifier) != null) {
return tableMapping.get(cdcTableIdentifier);
}
return null;
}

private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode)
? record.get(key).asText()
: null;
}

private Map<String, Object> extractBeforeRow(JsonNode record) {
return extractRow(record.get("before"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -60,7 +59,7 @@ public abstract class JsonDebeziumSchemaChange implements ChangeEvent {

public abstract boolean schemaChange(JsonNode recordRoot);

public abstract void init(JsonNode recordRoot);
public abstract void init(JsonNode recordRoot, String dorisTableName);

/** When cdc synchronizes multiple tables, it will capture multiple table schema changes. */
protected boolean checkTable(JsonNode recordRoot) {
Expand Down Expand Up @@ -89,26 +88,9 @@ protected String extractJsonNode(JsonNode record, String key) {
: null;
}

@VisibleForTesting
public String getDorisTableIdentifier(String cdcTableIdentifier) {
if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
return dorisOptions.getTableIdentifier();
}
if (!CollectionUtil.isNullOrEmpty(tableMapping)
&& !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
&& tableMapping.get(cdcTableIdentifier) != null) {
return tableMapping.get(cdcTableIdentifier);
}
return null;
}

protected String getDorisTableIdentifier(JsonNode record) {
String identifier = getCdcTableIdentifier(record);
return getDorisTableIdentifier(identifier);
}

protected Tuple2<String, String> getDorisTableTuple(JsonNode record) {
String identifier = getDorisTableIdentifier(record);
String identifier =
JsonDebeziumChangeUtils.getDorisTableIdentifier(record, dorisOptions, tableMapping);
if (StringUtils.isNullOrWhitespaceOnly(identifier)) {
return null;
}
Expand Down Expand Up @@ -136,6 +118,10 @@ protected JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingEx
return record;
}

public Map<String, String> getTableMapping() {
return tableMapping;
}

@VisibleForTesting
public void setSchemaChangeManager(SchemaChangeManager schemaChangeManager) {
this.schemaChangeManager = schemaChangeManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public JsonDebeziumSchemaChangeImpl(JsonDebeziumChangeContext changeContext) {
}

@Override
public void init(JsonNode recordRoot) {
public void init(JsonNode recordRoot, String dorisTableName) {
// do nothing
}

Expand Down Expand Up @@ -120,7 +120,14 @@ public String extractDDL(JsonNode record) throws JsonProcessingException {
String col = matcher.group(3);
String type = matcher.group(5);
type = handleType(type);
ddl = String.format(EXECUTE_DDL, getDorisTableIdentifier(record), op, col, type);
ddl =
String.format(
EXECUTE_DDL,
JsonDebeziumChangeUtils.getDorisTableIdentifier(
record, dorisOptions, tableMapping),
op,
col,
type);
LOG.info("parse ddl:{}", ddl);
return ddl;
}
Expand Down
Loading

0 comments on commit 99f2da9

Please sign in to comment.