From 4c09a2435d8078f401563af0879d810690b053de Mon Sep 17 00:00:00 2001 From: chenxi0599 Date: Fri, 10 May 2024 13:56:38 +0800 Subject: [PATCH] [cdc] Fixed when the order of the same field differs, it is considered a schema change. (#3314) --- .../paimon/flink/sink/cdc/RichEventParser.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java index 756875781153..10dbdcc8dc8c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java @@ -45,7 +45,10 @@ public List parseSchemaChange() { .forEach( dataField -> { DataField previous = previousDataFields.get(dataField.name()); - if (!Objects.equals(previous, dataField)) { + // When the order of the same field is different, its ID may also be + // different, + // so the comparison should not include the ID. + if (!dataFieldEqualsIgnoreId(previous, dataField)) { previousDataFields.put(dataField.name(), dataField); change.add(dataField); } @@ -53,6 +56,18 @@ public List parseSchemaChange() { return change; } + private boolean dataFieldEqualsIgnoreId(DataField dataField1, DataField dataField2) { + if (dataField1 == dataField2) { + return true; + } else if (dataField1 != null && dataField2 != null) { + return Objects.equals(dataField1.name(), dataField2.name()) + && Objects.equals(dataField1.type(), dataField2.type()) + && Objects.equals(dataField1.description(), dataField2.description()); + } else { + return false; + } + } + @Override public List parseRecords() { if (record.hasPayload()) {