diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index b705bc9e1d88..564cc8671960 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -47,6 +47,7 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Base64; @@ -210,7 +211,11 @@ else if (Date.SCHEMA_NAME.equals(className)) { // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector // for implementation LocalDateTime localDateTime = - Instant.parse(rawValue).atZone(serverTimeZone).toLocalDateTime(); + OffsetDateTime.parse(rawValue) + .toInstant() + .atZone(serverTimeZone) + .toLocalDateTime(); + transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6); } else if (MicroTime.SCHEMA_NAME.equals(className)) { long microseconds = Long.parseLong(rawValue);