Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDC] extract common code #3050

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import javax.annotation.Nullable;

import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -198,7 +199,12 @@ protected Map<String, String> extractRowData(

String transformed =
DebeziumSchemaUtils.transformRawValue(
rawValue, debeziumType, className, typeMapping, record.get(fieldName));
rawValue,
debeziumType,
className,
typeMapping,
record.get(fieldName),
ZoneOffset.UTC);
resultMap.put(fieldName, transformed);

paimonFieldTypes.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Base64;
import java.util.Map;
Expand All @@ -61,7 +62,8 @@ public static String transformRawValue(
String debeziumType,
@Nullable String className,
TypeMapping typeMapping,
JsonNode origin) {
JsonNode origin,
ZoneId serverTimeZone) {
if (rawValue == null) {
return null;
}
Expand Down Expand Up @@ -138,9 +140,8 @@ else if (Date.SCHEMA_NAME.equals(className)) {
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
// TODO currently we cannot get zone id
LocalDateTime localDateTime =
Instant.parse(rawValue).atZone(ZoneOffset.UTC).toLocalDateTime();
Instant.parse(rawValue).atZone(serverTimeZone).toLocalDateTime();
transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (MicroTime.SCHEMA_NAME.equals(className)) {
long microseconds = Long.parseLong(rawValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,32 @@
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Bits;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import io.debezium.time.Date;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -78,8 +63,6 @@
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.decimalLogicalName;
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;

/**
Expand Down Expand Up @@ -258,105 +241,14 @@ private Map<String, String> extractRow(JsonNode recordRow) {

String className = field.getValue().name();
String oldValue = objectValue.asText();
String newValue = oldValue;

if (Bits.LOGICAL_NAME.equals(className)) {
// transform little-endian form to normal order
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types
byte[] littleEndian = Base64.getDecoder().decode(oldValue);
byte[] bigEndian = new byte[littleEndian.length];
for (int i = 0; i < littleEndian.length; i++) {
bigEndian[i] = littleEndian[littleEndian.length - 1 - i];
}
if (typeMapping.containsMode(TO_STRING)) {
newValue = StringUtils.bytesToBinaryString(bigEndian);
} else {
newValue = Base64.getEncoder().encodeToString(bigEndian);
}
} else if (("bytes".equals(mySqlType) && className == null)) {
// MySQL binary, varbinary, blob
newValue = new String(Base64.getDecoder().decode(oldValue));
} else if ("bytes".equals(mySqlType) && decimalLogicalName().equals(className)) {
// MySQL numeric, fixed, decimal
try {
new BigDecimal(oldValue);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Invalid big decimal value "
+ oldValue
+ ". Make sure that in the `customConverterConfigs` "
+ "of the JsonDebeziumDeserializationSchema you created, set '"
+ JsonConverterConfig.DECIMAL_FORMAT_CONFIG
+ "' to 'numeric'",
e);
}
}
// pay attention to the temporal types
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
else if (Date.SCHEMA_NAME.equals(className)) {
// MySQL date
newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
} else if (Timestamp.SCHEMA_NAME.equals(className)) {
// MySQL datetime (precision 0-3)

// display value of datetime is not affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
DateTimeUtils.toLocalDateTime(Long.parseLong(oldValue), ZoneOffset.UTC);
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
} else if (MicroTimestamp.SCHEMA_NAME.equals(className)) {
// MySQL datetime (precision 4-6)
long microseconds = Long.parseLong(oldValue);
long microsecondsPerSecond = 1_000_000;
long nanosecondsPerMicros = 1_000;
long seconds = microseconds / microsecondsPerSecond;
long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;

// display value of datetime is not affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
Instant.ofEpochSecond(seconds, nanoAdjustment)
.atZone(ZoneOffset.UTC)
.toLocalDateTime();
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (ZonedTimestamp.SCHEMA_NAME.equals(className)) {
// MySQL timestamp

// display value of timestamp is affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
Instant.parse(oldValue).atZone(serverTimeZone).toLocalDateTime();
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (MicroTime.SCHEMA_NAME.equals(className)) {
long microseconds = Long.parseLong(oldValue);
long microsecondsPerSecond = 1_000_000;
long nanosecondsPerMicros = 1_000;
long seconds = microseconds / microsecondsPerSecond;
long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;

newValue =
Instant.ofEpochSecond(seconds, nanoAdjustment)
.atZone(ZoneOffset.UTC)
.toLocalTime()
.toString();
} else if (Point.LOGICAL_NAME.equals(className)
|| Geometry.LOGICAL_NAME.equals(className)) {
try {
byte[] wkb = objectValue.get(Geometry.WKB_FIELD).binaryValue();
newValue = MySqlTypeUtils.convertWkbArray(wkb);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Failed to convert %s to geometry JSON.", objectValue),
e);
}
}

String newValue =
DebeziumSchemaUtils.transformRawValue(
oldValue,
mySqlType,
className,
typeMapping,
objectValue,
serverTimeZone);
resultMap.put(fieldName, newValue);
}

Expand Down
Loading