From 06c3043cfffe74c3426e3c8ef1ee0cefc262f092 Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Thu, 25 Jan 2024 14:43:42 +0800 Subject: [PATCH] fix --- .../cdc/format/debezium/DebeziumRecordParser.java | 15 +++++++++------ .../cdc/format/debezium/DebeziumSchemaUtils.java | 2 +- .../kafka/KafkaDebeziumSyncTableActionITCase.java | 3 +-- .../cdc/kafka/KafkaSyncTableActionITCase.java | 2 +- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java index e0a4347f07a9e..5aaff8ca65fba 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java @@ -120,13 +120,16 @@ private JsonNode getBefore(String op) { @Override protected void setRoot(String record) { JsonNode node = JsonSerdeUtil.fromJson(record, JsonNode.class); + + hasSchema = false; if (node.has(FIELD_SCHEMA)) { - hasSchema = true; - JsonNode schema = node.get(FIELD_SCHEMA); - parseSchema(schema); root = node.get(FIELD_PAYLOAD); + JsonNode schema = node.get(FIELD_SCHEMA); + if (!isNull(schema)) { + parseSchema(schema); + hasSchema = true; + } } else { - hasSchema = false; root = node; } } @@ -142,10 +145,10 @@ private void parseSchema(JsonNode schema) { ArrayNode fields = null; for (int i = 0; i < schemaFields.size(); i++) { JsonNode node = schemaFields.get(i); - if (node.get("field").equals("after")) { + if (getString(node, "field").equals("after")) { fields = getNodeAs(node, "fields", ArrayNode.class); break; - } else if (node.get("field").equals("before")) { + } else if (getString(node, "field").equals("before")) { if (fields == null) { fields = getNodeAs(node, "fields", ArrayNode.class); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index 214579accb7e0..c96aa6bc1f0e6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -157,7 +157,7 @@ else if (Date.SCHEMA_NAME.equals(className)) { } else if (Point.LOGICAL_NAME.equals(className) || Geometry.LOGICAL_NAME.equals(className)) { try { - JsonNode geoNode = JsonSerdeUtil.asSpecificNodeType(rawValue, JsonNode.class); + JsonNode geoNode = JsonSerdeUtil.fromJson(rawValue, JsonNode.class); byte[] wkb = geoNode.get(Geometry.WKB_FIELD).binaryValue(); transformed = MySqlTypeUtils.convertWkbArray(wkb); } catch (Exception e) { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index e5e030a60adb4..f9c5b1f3d494a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -25,7 +25,6 @@ public class KafkaDebeziumSyncTableActionITCase extends KafkaSyncTableActionITCase { private static final String DEBEZIUM = "debezium"; - private static final String DEBEZIUM_JSON_SCHEMA_INCLUDE = "debezium_json_schema_include"; @Test @Timeout(60) @@ -96,6 +95,6 @@ public void testKafkaBuildSchemaWithDelete() throws Exception { @Test @Timeout(60) public void testSchemaIncludeRecord1() throws Exception { - testSchemaIncludeRecord(DEBEZIUM_JSON_SCHEMA_INCLUDE); + testSchemaIncludeRecord(DEBEZIUM); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index da38b2a0ce7f2..0d28e57ce83e5 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -675,7 +675,7 @@ public void testSchemaIncludeRecord(String format) throws Exception { } Map kafkaConfig = getBasicKafkaConfig(); - kafkaConfig.put(VALUE_FORMAT.key(), format); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic); KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig)