Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Jan 25, 2024
1 parent ca3a43e commit 06c3043
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,16 @@ private JsonNode getBefore(String op) {
@Override
protected void setRoot(String record) {
JsonNode node = JsonSerdeUtil.fromJson(record, JsonNode.class);

hasSchema = false;
if (node.has(FIELD_SCHEMA)) {
hasSchema = true;
JsonNode schema = node.get(FIELD_SCHEMA);
parseSchema(schema);
root = node.get(FIELD_PAYLOAD);
JsonNode schema = node.get(FIELD_SCHEMA);
if (!isNull(schema)) {
parseSchema(schema);
hasSchema = true;
}
} else {
hasSchema = false;
root = node;
}
}
Expand All @@ -142,10 +145,10 @@ private void parseSchema(JsonNode schema) {
ArrayNode fields = null;
for (int i = 0; i < schemaFields.size(); i++) {
JsonNode node = schemaFields.get(i);
if (node.get("field").equals("after")) {
if (getString(node, "field").equals("after")) {
fields = getNodeAs(node, "fields", ArrayNode.class);
break;
} else if (node.get("field").equals("before")) {
} else if (getString(node, "field").equals("before")) {
if (fields == null) {
fields = getNodeAs(node, "fields", ArrayNode.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ else if (Date.SCHEMA_NAME.equals(className)) {
} else if (Point.LOGICAL_NAME.equals(className)
|| Geometry.LOGICAL_NAME.equals(className)) {
try {
JsonNode geoNode = JsonSerdeUtil.asSpecificNodeType(rawValue, JsonNode.class);
JsonNode geoNode = JsonSerdeUtil.fromJson(rawValue, JsonNode.class);
byte[] wkb = geoNode.get(Geometry.WKB_FIELD).binaryValue();
transformed = MySqlTypeUtils.convertWkbArray(wkb);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
public class KafkaDebeziumSyncTableActionITCase extends KafkaSyncTableActionITCase {

private static final String DEBEZIUM = "debezium";
private static final String DEBEZIUM_JSON_SCHEMA_INCLUDE = "debezium_json_schema_include";

@Test
@Timeout(60)
Expand Down Expand Up @@ -96,6 +95,6 @@ public void testKafkaBuildSchemaWithDelete() throws Exception {
@Test
@Timeout(60)
public void testSchemaIncludeRecord1() throws Exception {
testSchemaIncludeRecord(DEBEZIUM_JSON_SCHEMA_INCLUDE);
testSchemaIncludeRecord(DEBEZIUM);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ public void testSchemaIncludeRecord(String format) throws Exception {
}

Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format);
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
Expand Down

0 comments on commit 06c3043

Please sign in to comment.