diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java index d4d5b47a25d6..96f1bc07429b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java @@ -33,6 +33,7 @@ import javax.annotation.Nullable; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -198,7 +199,12 @@ protected Map extractRowData( String transformed = DebeziumSchemaUtils.transformRawValue( - rawValue, debeziumType, className, typeMapping, record.get(fieldName)); + rawValue, + debeziumType, + className, + typeMapping, + record.get(fieldName), + ZoneOffset.UTC); resultMap.put(fieldName, transformed); paimonFieldTypes.put( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index d3f1b1196632..1aab6653d4d4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -43,6 +43,7 @@ import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Base64; import java.util.Map; @@ -61,7 +62,8 @@ public static String transformRawValue( String debeziumType, @Nullable String className, TypeMapping typeMapping, - JsonNode origin) { + JsonNode origin, + ZoneId serverTimeZone) { if (rawValue == null) { return null; } @@ -138,9 +140,8 @@ else if (Date.SCHEMA_NAME.equals(className)) { // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector // for implementation - // TODO currently we cannot get zone id LocalDateTime localDateTime = - Instant.parse(rawValue).atZone(ZoneOffset.UTC).toLocalDateTime(); + Instant.parse(rawValue).atZone(serverTimeZone).toLocalDateTime(); transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6); } else if (MicroTime.SCHEMA_NAME.equals(className)) { long microseconds = Long.parseLong(rawValue); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java index ee0049fd416c..72b42b46689b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java @@ -21,15 +21,14 @@ import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils; import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent; import org.apache.paimon.flink.sink.cdc.CdcRecord; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; -import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -37,31 +36,17 @@ import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; import io.debezium.connector.AbstractSourceInfo; -import io.debezium.data.Bits; -import io.debezium.data.geometry.Geometry; -import io.debezium.data.geometry.Point; import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.relational.history.TableChanges; -import io.debezium.time.Date; -import io.debezium.time.MicroTime; -import io.debezium.time.MicroTimestamp; -import io.debezium.time.Timestamp; -import io.debezium.time.ZonedTimestamp; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; -import org.apache.kafka.connect.json.JsonConverterConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; -import java.time.Instant; -import java.time.LocalDateTime; import java.time.ZoneId; -import java.time.ZoneOffset; import java.util.ArrayList; -import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -78,8 +63,6 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg; import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE; -import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING; -import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.decimalLogicalName; import static org.apache.paimon.utils.JsonSerdeUtil.isNull; /** @@ -258,105 +241,14 @@ private Map extractRow(JsonNode recordRow) { String className = field.getValue().name(); String oldValue = objectValue.asText(); - String newValue = oldValue; - - if (Bits.LOGICAL_NAME.equals(className)) { - // transform little-endian form to normal order - // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types - byte[] littleEndian = Base64.getDecoder().decode(oldValue); - byte[] bigEndian = new byte[littleEndian.length]; - for (int i = 0; i < littleEndian.length; i++) { - bigEndian[i] = littleEndian[littleEndian.length - 1 - i]; - } - if (typeMapping.containsMode(TO_STRING)) { - newValue = StringUtils.bytesToBinaryString(bigEndian); - } else { - newValue = Base64.getEncoder().encodeToString(bigEndian); - } - } else if (("bytes".equals(mySqlType) && className == null)) { - // MySQL binary, varbinary, blob - newValue = new String(Base64.getDecoder().decode(oldValue)); - } else if ("bytes".equals(mySqlType) && decimalLogicalName().equals(className)) { - // MySQL numeric, fixed, decimal - try { - new BigDecimal(oldValue); - } catch (NumberFormatException e) { - throw new IllegalArgumentException( - "Invalid big decimal value " - + oldValue - + ". Make sure that in the `customConverterConfigs` " - + "of the JsonDebeziumDeserializationSchema you created, set '" - + JsonConverterConfig.DECIMAL_FORMAT_CONFIG - + "' to 'numeric'", - e); - } - } - // pay attention to the temporal types - // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types - else if (Date.SCHEMA_NAME.equals(className)) { - // MySQL date - newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString(); - } else if (Timestamp.SCHEMA_NAME.equals(className)) { - // MySQL datetime (precision 0-3) - - // display value of datetime is not affected by timezone, see - // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and - // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector - // for implementation - LocalDateTime localDateTime = - DateTimeUtils.toLocalDateTime(Long.parseLong(oldValue), ZoneOffset.UTC); - newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3); - } else if (MicroTimestamp.SCHEMA_NAME.equals(className)) { - // MySQL datetime (precision 4-6) - long microseconds = Long.parseLong(oldValue); - long microsecondsPerSecond = 1_000_000; - long nanosecondsPerMicros = 1_000; - long seconds = microseconds / microsecondsPerSecond; - long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros; - - // display value of datetime is not affected by timezone, see - // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and - // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector - // for implementation - LocalDateTime localDateTime = - Instant.ofEpochSecond(seconds, nanoAdjustment) - .atZone(ZoneOffset.UTC) - .toLocalDateTime(); - newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6); - } else if (ZonedTimestamp.SCHEMA_NAME.equals(className)) { - // MySQL timestamp - - // display value of timestamp is affected by timezone, see - // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and - // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector - // for implementation - LocalDateTime localDateTime = - Instant.parse(oldValue).atZone(serverTimeZone).toLocalDateTime(); - newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6); - } else if (MicroTime.SCHEMA_NAME.equals(className)) { - long microseconds = Long.parseLong(oldValue); - long microsecondsPerSecond = 1_000_000; - long nanosecondsPerMicros = 1_000; - long seconds = microseconds / microsecondsPerSecond; - long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros; - - newValue = - Instant.ofEpochSecond(seconds, nanoAdjustment) - .atZone(ZoneOffset.UTC) - .toLocalTime() - .toString(); - } else if (Point.LOGICAL_NAME.equals(className) - || Geometry.LOGICAL_NAME.equals(className)) { - try { - byte[] wkb = objectValue.get(Geometry.WKB_FIELD).binaryValue(); - newValue = MySqlTypeUtils.convertWkbArray(wkb); - } catch (Exception e) { - throw new IllegalArgumentException( - String.format("Failed to convert %s to geometry JSON.", objectValue), - e); - } - } - + String newValue = + DebeziumSchemaUtils.transformRawValue( + oldValue, + mySqlType, + className, + typeMapping, + objectValue, + serverTimeZone); resultMap.put(fieldName, newValue); }