Skip to content

Commit

Permalink
[Bug] FIX paimon mysql cdc '2001-01-01T00:00:00+08:00' parse throw ja…
Browse files Browse the repository at this point in the history
…va.time.format.DateTimeParseException

java.time.format.DateTimeParseException: Text '2001-01-01T00:00:00+08:00' could not be parsed at index 19
	at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
	at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1851)
	at java.time.Instant.parse(Instant.java:395)
	at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.transformRawValue(DebeziumSchemaUtils.java:213)
	at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.transformRawValue(DebeziumSchemaUtils.java:90)
	at org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser.extractRow(MySqlRecordParser.java:240)
	at org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser.extractRecords(MySqlRecordParser.java:206)
	at org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser.flatMap(MySqlRecordParser.java:124)
	at org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser.flatMap(MySqlRecordParser.java:71)
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:109)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:78)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:40)
	at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
	at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:141)
	at org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema.deserialize(CdcDebeziumDeserializationSchema.java:78)
	at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:119)
	at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101)
	at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73)
	at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:748)

Co-Authored-By: Jingsong Lee <[email protected]>
  • Loading branch information
lysgithub0302 and JingsongLi committed Nov 1, 2024
1 parent 6ba7e39 commit 03e6ec3
Showing 1 changed file with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Base64;
Expand Down Expand Up @@ -210,7 +211,11 @@ else if (Date.SCHEMA_NAME.equals(className)) {
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
Instant.parse(rawValue).atZone(serverTimeZone).toLocalDateTime();
OffsetDateTime.parse(rawValue)
.toInstant()
.atZone(serverTimeZone)
.toLocalDateTime();

transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (MicroTime.SCHEMA_NAME.equals(className)) {
long microseconds = Long.parseLong(rawValue);
Expand Down

0 comments on commit 03e6ec3

Please sign in to comment.