From 03e6ec31474905a1dca505202c392d97d36bdd9c Mon Sep 17 00:00:00 2001 From: lysgithub0302 Date: Fri, 1 Nov 2024 11:15:57 +0800 Subject: [PATCH] [Bug] FIX paimon mysql cdc '2001-01-01T00:00:00+08:00' parse throw java.time.format.DateTimeParseException java.time.format.DateTimeParseException: Text '2001-01-01T00:00:00+08:00' could not be parsed at index 19 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1851) at java.time.Instant.parse(Instant.java:395) at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.transformRawValue(DebeziumSchemaUtils.java:213) at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.transformRawValue(DebeziumSchemaUtils.java:90) at org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser.extractRow(MySqlRecordParser.java:240) at org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser.extractRecords(MySqlRecordParser.java:206) at org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser.flatMap(MySqlRecordParser.java:124) at org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser.flatMap(MySqlRecordParser.java:71) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:109) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:78) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:40) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:141) at org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema.deserialize(CdcDebeziumDeserializationSchema.java:78) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:119) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) Co-Authored-By: Jingsong Lee <9601882+JingsongLi@users.noreply.github.com> --- .../action/cdc/format/debezium/DebeziumSchemaUtils.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index b705bc9e1d88..564cc8671960 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -47,6 +47,7 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Base64; @@ -210,7 +211,11 @@ else if (Date.SCHEMA_NAME.equals(className)) { // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector // for implementation LocalDateTime localDateTime = - Instant.parse(rawValue).atZone(serverTimeZone).toLocalDateTime(); + OffsetDateTime.parse(rawValue) + .toInstant() + .atZone(serverTimeZone) + .toLocalDateTime(); + transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6); } else if (MicroTime.SCHEMA_NAME.equals(className)) { long microseconds = Long.parseLong(rawValue);