Skip to content

Commit

Permalink
[cdc] Extract common code (apache#3050)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjun0x01 authored and zhuangchong committed Mar 26, 2024
1 parent fc18149 commit f7dd9a2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import javax.annotation.Nullable;

import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -198,7 +199,12 @@ protected Map<String, String> extractRowData(

String transformed =
DebeziumSchemaUtils.transformRawValue(
rawValue, debeziumType, className, typeMapping, record.get(fieldName));
rawValue,
debeziumType,
className,
typeMapping,
record.get(fieldName),
ZoneOffset.UTC);
resultMap.put(fieldName, transformed);

paimonFieldTypes.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Base64;
import java.util.Map;
Expand All @@ -61,7 +62,8 @@ public static String transformRawValue(
String debeziumType,
@Nullable String className,
TypeMapping typeMapping,
JsonNode origin) {
JsonNode origin,
ZoneId serverTimeZone) {
if (rawValue == null) {
return null;
}
Expand Down Expand Up @@ -138,9 +140,8 @@ else if (Date.SCHEMA_NAME.equals(className)) {
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
// TODO currently we cannot get zone id
LocalDateTime localDateTime =
Instant.parse(rawValue).atZone(ZoneOffset.UTC).toLocalDateTime();
Instant.parse(rawValue).atZone(serverTimeZone).toLocalDateTime();
transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (MicroTime.SCHEMA_NAME.equals(className)) {
long microseconds = Long.parseLong(rawValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,32 @@
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Bits;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import io.debezium.time.Date;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -78,8 +63,6 @@
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.decimalLogicalName;
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;

/**
Expand Down Expand Up @@ -258,105 +241,14 @@ private Map<String, String> extractRow(JsonNode recordRow) {

String className = field.getValue().name();
String oldValue = objectValue.asText();
String newValue = oldValue;

if (Bits.LOGICAL_NAME.equals(className)) {
// transform little-endian form to normal order
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types
byte[] littleEndian = Base64.getDecoder().decode(oldValue);
byte[] bigEndian = new byte[littleEndian.length];
for (int i = 0; i < littleEndian.length; i++) {
bigEndian[i] = littleEndian[littleEndian.length - 1 - i];
}
if (typeMapping.containsMode(TO_STRING)) {
newValue = StringUtils.bytesToBinaryString(bigEndian);
} else {
newValue = Base64.getEncoder().encodeToString(bigEndian);
}
} else if (("bytes".equals(mySqlType) && className == null)) {
// MySQL binary, varbinary, blob
newValue = new String(Base64.getDecoder().decode(oldValue));
} else if ("bytes".equals(mySqlType) && decimalLogicalName().equals(className)) {
// MySQL numeric, fixed, decimal
try {
new BigDecimal(oldValue);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Invalid big decimal value "
+ oldValue
+ ". Make sure that in the `customConverterConfigs` "
+ "of the JsonDebeziumDeserializationSchema you created, set '"
+ JsonConverterConfig.DECIMAL_FORMAT_CONFIG
+ "' to 'numeric'",
e);
}
}
// pay attention to the temporal types
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
else if (Date.SCHEMA_NAME.equals(className)) {
// MySQL date
newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
} else if (Timestamp.SCHEMA_NAME.equals(className)) {
// MySQL datetime (precision 0-3)

// display value of datetime is not affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
DateTimeUtils.toLocalDateTime(Long.parseLong(oldValue), ZoneOffset.UTC);
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
} else if (MicroTimestamp.SCHEMA_NAME.equals(className)) {
// MySQL datetime (precision 4-6)
long microseconds = Long.parseLong(oldValue);
long microsecondsPerSecond = 1_000_000;
long nanosecondsPerMicros = 1_000;
long seconds = microseconds / microsecondsPerSecond;
long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;

// display value of datetime is not affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
Instant.ofEpochSecond(seconds, nanoAdjustment)
.atZone(ZoneOffset.UTC)
.toLocalDateTime();
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (ZonedTimestamp.SCHEMA_NAME.equals(className)) {
// MySQL timestamp

// display value of timestamp is affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
Instant.parse(oldValue).atZone(serverTimeZone).toLocalDateTime();
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (MicroTime.SCHEMA_NAME.equals(className)) {
long microseconds = Long.parseLong(oldValue);
long microsecondsPerSecond = 1_000_000;
long nanosecondsPerMicros = 1_000;
long seconds = microseconds / microsecondsPerSecond;
long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;

newValue =
Instant.ofEpochSecond(seconds, nanoAdjustment)
.atZone(ZoneOffset.UTC)
.toLocalTime()
.toString();
} else if (Point.LOGICAL_NAME.equals(className)
|| Geometry.LOGICAL_NAME.equals(className)) {
try {
byte[] wkb = objectValue.get(Geometry.WKB_FIELD).binaryValue();
newValue = MySqlTypeUtils.convertWkbArray(wkb);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Failed to convert %s to geometry JSON.", objectValue),
e);
}
}

String newValue =
DebeziumSchemaUtils.transformRawValue(
oldValue,
mySqlType,
className,
typeMapping,
objectValue,
serverTimeZone);
resultMap.put(fieldName, newValue);
}

Expand Down

0 comments on commit f7dd9a2

Please sign in to comment.