Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Jan 25, 2024
1 parent ca3a43e commit 9d58d71
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 29 deletions.
8 changes: 5 additions & 3 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ If a message in a Kafka topic is a change event captured from another database u
The JSON sources possibly missing some information. For example, Ogg and Maxwell format standards don't contain field
types; When you write JSON sources into Flink Kafka sink, it will only reserve data and row type and drop other information.
The synchronization job will try best to handle the problem as follows:
1. If missing field types, Paimon will use 'STRING' type as default.
2. If missing database name or table name, you cannot do database synchronization, but you can still do table synchronization.
3. If missing primary keys, the job might create non primary key table. You can set primary keys when submit job in table
1. Usually, debezium-json contains 'schema' field, from which Paimon will retrieve data types. Make sure your debezium
json has this field, or Paimon will use 'STRING' type.
2. If missing field types, Paimon will use 'STRING' type as default.
3. If missing database name or table name, you cannot do database synchronization, but you can still do table synchronization.
4. If missing primary keys, the job might create non primary key table. You can set primary keys when submit job in table
synchronization.
{{< /hint >}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs;
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
import static org.apache.paimon.utils.JsonSerdeUtil.parseJsonMap;

/**
* The {@code DebeziumRecordParser} class extends the abstract {@link RecordParser} and is designed
Expand Down Expand Up @@ -120,13 +120,16 @@ private JsonNode getBefore(String op) {
@Override
protected void setRoot(String record) {
JsonNode node = JsonSerdeUtil.fromJson(record, JsonNode.class);

hasSchema = false;
if (node.has(FIELD_SCHEMA)) {
hasSchema = true;
JsonNode schema = node.get(FIELD_SCHEMA);
parseSchema(schema);
root = node.get(FIELD_PAYLOAD);
JsonNode schema = node.get(FIELD_SCHEMA);
if (!isNull(schema)) {
parseSchema(schema);
hasSchema = true;
}
} else {
hasSchema = false;
root = node;
}
}
Expand All @@ -142,10 +145,10 @@ private void parseSchema(JsonNode schema) {
ArrayNode fields = null;
for (int i = 0; i < schemaFields.size(); i++) {
JsonNode node = schemaFields.get(i);
if (node.get("field").equals("after")) {
if (getString(node, "field").equals("after")) {
fields = getNodeAs(node, "fields", ArrayNode.class);
break;
} else if (node.get("field").equals("before")) {
} else if (getString(node, "field").equals("before")) {
if (fields == null) {
fields = getNodeAs(node, "fields", ArrayNode.class);
}
Expand All @@ -159,11 +162,14 @@ private void parseSchema(JsonNode schema) {
debeziumTypes.put(field, getString(node, "type"));
classNames.put(field, getString(node, "name"));

String parametersString = getString(node, "parameters");
JsonNode parametersNode = node.get("parameters");
Map<String, String> parametersMap =
parametersString == null
isNull(parametersNode)
? Collections.emptyMap()
: parseJsonMap(parametersString, String.class);
: JsonSerdeUtil.convertValue(
parametersNode,
new TypeReference<HashMap<String, String>>() {});

parameters.put(field, parametersMap);
}
}
Expand All @@ -181,18 +187,18 @@ protected Map<String, String> extractRowData(
return super.extractRowData(record, paimonFieldTypes);
}

Map<String, String> recordMap =
JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, String>>() {});
Map<String, Object> recordMap =
JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>() {});
LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
for (Map.Entry<String, String> entry : recordMap.entrySet()) {
for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
String fieldName = entry.getKey();
String rawValue = entry.getValue();
String rawValue = Objects.toString(entry.getValue(), null);
String debeziumType = debeziumTypes.get(fieldName);
String className = classNames.get(fieldName);

String transformed =
DebeziumSchemaUtils.transformRawValue(
rawValue, debeziumType, className, typeMapping);
rawValue, debeziumType, className, typeMapping, record.get(fieldName));
resultMap.put(fieldName, transformed);

paimonFieldTypes.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -44,7 +44,6 @@
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Base64;
import java.util.Map;
Expand All @@ -62,12 +61,14 @@ public static String transformRawValue(
@Nullable String rawValue,
String debeziumType,
@Nullable String className,
TypeMapping typeMapping) {
TypeMapping typeMapping,
JsonNode origin) {
if (rawValue == null) {
return null;
}

String transformed = rawValue;

if (Bits.LOGICAL_NAME.equals(className)) {
// transform little-endian form to normal order
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types
Expand Down Expand Up @@ -139,8 +140,8 @@ else if (Date.SCHEMA_NAME.equals(className)) {
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
// TODO currently we cannot get zone id
ZoneId zoneId = ZoneId.systemDefault();
LocalDateTime localDateTime = Instant.parse(rawValue).atZone(zoneId).toLocalDateTime();
LocalDateTime localDateTime =
Instant.parse(rawValue).atZone(ZoneOffset.UTC).toLocalDateTime();
transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (MicroTime.SCHEMA_NAME.equals(className)) {
long microseconds = Long.parseLong(rawValue);
Expand All @@ -157,8 +158,7 @@ else if (Date.SCHEMA_NAME.equals(className)) {
} else if (Point.LOGICAL_NAME.equals(className)
|| Geometry.LOGICAL_NAME.equals(className)) {
try {
JsonNode geoNode = JsonSerdeUtil.asSpecificNodeType(rawValue, JsonNode.class);
byte[] wkb = geoNode.get(Geometry.WKB_FIELD).binaryValue();
byte[] wkb = origin.get(Geometry.WKB_FIELD).binaryValue();
transformed = MySqlTypeUtils.convertWkbArray(wkb);
} catch (Exception e) {
throw new IllegalArgumentException(
Expand All @@ -180,9 +180,14 @@ public static DataType toDataType(
String precision = parameters.get("connect.decimal.precision");
if (precision == null) {
return DataTypes.DECIMAL(20, 0);
}

int p = Integer.parseInt(precision);
if (p > DecimalType.MAX_PRECISION) {
return DataTypes.STRING();
} else {
int scale = Integer.parseInt(parameters.get("scale"));
return DataTypes.DECIMAL(Integer.parseInt(precision), scale);
return DataTypes.DECIMAL(p, scale);
}
case Date.SCHEMA_NAME:
return DataTypes.DATE();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ protected void waitForResult(
rowType);
List<String> sortedActual = new ArrayList<>(result);
Collections.sort(sortedActual);
System.out.println(sortedActual);
if (sortedExpected.equals(sortedActual)) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
public class KafkaDebeziumSyncTableActionITCase extends KafkaSyncTableActionITCase {

private static final String DEBEZIUM = "debezium";
private static final String DEBEZIUM_JSON_SCHEMA_INCLUDE = "debezium_json_schema_include";

@Test
@Timeout(60)
Expand Down Expand Up @@ -96,6 +95,11 @@ public void testKafkaBuildSchemaWithDelete() throws Exception {
@Test
@Timeout(60)
public void testSchemaIncludeRecord1() throws Exception {
testSchemaIncludeRecord(DEBEZIUM_JSON_SCHEMA_INCLUDE);
testSchemaIncludeRecord(DEBEZIUM);
}

@Test
public void testAllTypesWithSchema() throws Exception {
testAllTypesWithSchemaImpl(DEBEZIUM);
}
}
Loading

0 comments on commit 9d58d71

Please sign in to comment.