diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java index 756875781153..10dbdcc8dc8c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java @@ -45,7 +45,10 @@ public List parseSchemaChange() { .forEach( dataField -> { DataField previous = previousDataFields.get(dataField.name()); - if (!Objects.equals(previous, dataField)) { + // When the order of the same field is different, its ID may also be + // different, + // so the comparison should not include the ID. + if (!dataFieldEqualsIgnoreId(previous, dataField)) { previousDataFields.put(dataField.name(), dataField); change.add(dataField); } @@ -53,6 +56,18 @@ public List parseSchemaChange() { return change; } + private boolean dataFieldEqualsIgnoreId(DataField dataField1, DataField dataField2) { + if (dataField1 == dataField2) { + return true; + } else if (dataField1 != null && dataField2 != null) { + return Objects.equals(dataField1.name(), dataField2.name()) + && Objects.equals(dataField1.type(), dataField2.type()) + && Objects.equals(dataField1.description(), dataField2.description()); + } else { + return false; + } + } + @Override public List parseRecords() { if (record.hasPayload()) {